summaryrefslogtreecommitdiff
path: root/src/glibext
diff options
context:
space:
mode:
authorCyrille Bagard <nocbos@gmail.com>2015-12-19 20:22:14 (GMT)
committerCyrille Bagard <nocbos@gmail.com>2015-12-19 20:22:14 (GMT)
commit5710f9d5be56b427ccfa48f6a730d70396817efe (patch)
tree4b694774ce446295ef5b01a7df28bd8160b97025 /src/glibext
parent8ff010a34762737016624a68f593d0e6736d4349 (diff)
Fixed several bugs when processing concurrent delayed works.
Diffstat (limited to 'src/glibext')
-rw-r--r--src/glibext/delayed.c203
-rw-r--r--src/glibext/delayed.h11
2 files changed, 198 insertions, 16 deletions
diff --git a/src/glibext/delayed.c b/src/glibext/delayed.c
index 967865b..a83fae4 100644
--- a/src/glibext/delayed.c
+++ b/src/glibext/delayed.c
@@ -78,11 +78,15 @@ typedef struct _GWorkGroup
GMutex mutex; /* Verrou pour l'accès */
GCond cond; /* Réveil pour un traitement */
GCond wait_cond; /* Réveil d'attente de fin */
+ gint pending; /* Tâches en cours d'exécution */
GThread **threads; /* Procédure de traitement */
guint threads_count; /* Nombre de procédures */
bool force_exit; /* Procédure d'arrêt */
+ wait_for_incoming_works_cb callback; /* Encadre les attentes de fin */
+ void *data; /* Données à associer */
+
} GWorkGroup;
/* File de traitement pour un type donné (classe) */
@@ -121,7 +125,13 @@ static void g_work_group_schedule(GWorkGroup *, GDelayedWork *);
static void *g_work_group_process(GWorkGroup *);
/* Attend que toutes les tâches d'un groupe soient traitées. */
-static void g_work_group_wait_for_completion(GWorkGroup *);
+static void g_work_group_wait_for_completion(GWorkGroup *, GWorkQueue *);
+
+/* Modifie les conditions d'attente des fins d'exécutions. */
+static void g_work_group_set_extra_wait_callback(GWorkGroup *, wait_for_incoming_works_cb, void *);
+
+/* Force un réveil d'une attente en cours pour la confirmer. */
+static void g_work_group_wake_up_waiters(GWorkGroup *);
@@ -166,6 +176,9 @@ static void g_work_queue_finalize(GWorkQueue *);
/* Donne l'assurance de l'existence d'un groupe de travail. */
static GWorkGroup *g_work_queue_ensure_group_exists(GWorkQueue *, wgroup_id_t);
+/* Fournit le groupe de travail correspondant à un identifiant. */
+static GWorkGroup *g_work_queue_find_group_for_id(GWorkQueue *, wgroup_id_t);
+
/* ---------------------------------------------------------------------------------- */
@@ -379,6 +392,8 @@ static void g_work_group_init(GWorkGroup *group)
g_cond_init(&group->cond);
g_cond_init(&group->wait_cond);
+ g_atomic_int_set(&group->pending, 0);
+
group->threads_count = g_get_num_processors();
group->threads = (GThread **)calloc(group->threads_count, sizeof(GThread *));
@@ -401,6 +416,9 @@ static void g_work_group_init(GWorkGroup *group)
group->force_exit = false;
+ group->callback = NULL;
+ group->data = NULL;
+
}
@@ -491,8 +509,11 @@ static GWorkGroup *g_work_group_new(wgroup_id_t id, GtkExtStatusBar *statusbar)
result->id = id;
- result->statusbar = statusbar;
- g_object_ref(statusbar);
+ if (statusbar != NULL)
+ {
+ result->statusbar = statusbar;
+ g_object_ref(statusbar);
+ }
return result;
@@ -535,6 +556,8 @@ static void g_work_group_schedule(GWorkGroup *group, GDelayedWork *work)
{
g_mutex_lock(&group->mutex);
+ g_atomic_int_inc(&group->pending);
+
delayed_work_list_add_tail(work, &group->works);
g_cond_signal(&group->cond);
@@ -568,7 +591,10 @@ static void *g_work_group_process(GWorkGroup *group)
g_cond_wait(&group->cond, &group->mutex);
if (group->force_exit)
+ {
+ g_mutex_unlock(&group->mutex);
break;
+ }
work = group->works;
delayed_work_list_del(work, &group->works);
@@ -579,7 +605,8 @@ static void *g_work_group_process(GWorkGroup *group)
g_object_unref(G_OBJECT(work));
- g_cond_broadcast(&group->wait_cond);
+ if (g_atomic_int_dec_and_test(&group->pending))
+ g_cond_broadcast(&group->wait_cond);
}
@@ -591,6 +618,7 @@ static void *g_work_group_process(GWorkGroup *group)
/******************************************************************************
* *
* Paramètres : group = groupe dont les conclusions sont attendues. *
+* queue = queue d'appartenance pour les appels externes. *
* *
* Description : Attend que toutes les tâches d'un groupe soient traitées. *
* *
@@ -600,18 +628,78 @@ static void *g_work_group_process(GWorkGroup *group)
* *
******************************************************************************/
-static void g_work_group_wait_for_completion(GWorkGroup *group)
+static void g_work_group_wait_for_completion(GWorkGroup *group, GWorkQueue *queue)
{
+ wait_for_incoming_works_cb callback; /* Procédure complémentaire */
+
+ bool no_extra_check(GWorkQueue *_q, wgroup_id_t _id, void *_data)
+ {
+ return false;
+ }
+
+ callback = group->callback != NULL ? group->callback : no_extra_check;
+
g_mutex_lock(&group->mutex);
- while (dl_list_empty(group->works) && !group->force_exit)
+ /**
+ * On attend que :
+ * - la liste des tâches programmées soit vide.
+ * - il n'existe plus de tâche en cours.
+ * - rien n'indique que de nouvelles tâches supplémentaires vont arriver.
+ */
+
+ while ((g_atomic_int_get(&group->pending) > 0 || callback(queue, group->id, group->data))
+ && !group->force_exit)
+ {
g_cond_wait(&group->wait_cond, &group->mutex);
+ }
g_mutex_unlock(&group->mutex);
}
+/******************************************************************************
+* *
+* Paramètres : group = groupe dont les paramètres sont à modifier. *
+* callback = éventuelle fonction à appeler ou NULL. *
+* data = données devant accompagner l'appel. *
+* *
+* Description : Modifie les conditions d'attente des fins d'exécutions. *
+* *
+* Retour : - *
+* *
+* Remarques : - *
+* *
+******************************************************************************/
+
+static void g_work_group_set_extra_wait_callback(GWorkGroup *group, wait_for_incoming_works_cb callback, void *data)
+{
+ group->callback = callback;
+ group->data = data;
+
+}
+
+
+/******************************************************************************
+* *
+* Paramètres : queue = gestionnaire de l'ensemble des groupes de travail.*
+* id = identifiant d'un groupe de travail. *
+* *
+* Description : Force un réveil d'une attente en cours pour la confirmer. *
+* *
+* Retour : - *
+* *
+* Remarques : - *
+* *
+******************************************************************************/
+
+static void g_work_group_wake_up_waiters(GWorkGroup *group)
+{
+ g_cond_broadcast(&group->wait_cond);
+
+}
+
/* ---------------------------------------------------------------------------------- */
@@ -816,7 +904,7 @@ static GWorkGroup *g_work_queue_ensure_group_exists(GWorkQueue *queue, wgroup_id
queue->groups_count * sizeof(GWorkGroup *));
result = g_work_group_new(id, queue->statusbar);
- queue->groups[i] = result;
+ queue->groups[queue->groups_count - 1] = result;
}
@@ -843,7 +931,7 @@ wgroup_id_t g_work_queue_define_work_group(GWorkQueue *queue)
g_mutex_lock(&queue->mutex);
- result = queue->generator++;
+ result = ++queue->generator;
g_work_queue_ensure_group_exists(queue, result);
@@ -932,35 +1020,118 @@ void g_work_queue_schedule_work(GWorkQueue *queue, GDelayedWork *work, wgroup_id
* Paramètres : queue = gestionnaire de l'ensemble des groupes de travail. *
* id = identifiant d'un groupe de travail. *
* *
-* Description : Attend que toutes les tâches d'un groupe soient traitées. *
+* Description : Fournit le groupe de travail correspondant à un identifiant. *
* *
-* Retour : - *
+* Retour : Eventuel groupe existant trouvé ou NULL si aucun. *
* *
* Remarques : - *
* *
******************************************************************************/
-void g_work_queue_wait_for_completion(GWorkQueue *queue, wgroup_id_t id)
+static GWorkGroup *g_work_queue_find_group_for_id(GWorkQueue *queue, wgroup_id_t id)
{
+ GWorkGroup *result; /* Trouvaille à retourner */
size_t i; /* Boucle de parcours */
- GWorkGroup *group; /* Groupe de travail à attendre*/
- group = NULL;
+ result = NULL;
g_mutex_lock(&queue->mutex);
for (i = 0; i < queue->groups_count; i++)
if (g_work_group_get_id(queue->groups[i]) == id)
{
- group = queue->groups[i];
- g_object_ref(G_OBJECT(group));
+ result = queue->groups[i];
+ g_object_ref(G_OBJECT(result));
+ break;
}
g_mutex_unlock(&queue->mutex);
+ return result;
+
+}
+
+
+/******************************************************************************
+* *
+* Paramètres : queue = gestionnaire de l'ensemble des groupes de travail. *
+* id = identifiant d'un groupe de travail. *
+* *
+* Description : Attend que toutes les tâches d'un groupe soient traitées. *
+* *
+* Retour : - *
+* *
+* Remarques : - *
+* *
+******************************************************************************/
+
+void g_work_queue_wait_for_completion(GWorkQueue *queue, wgroup_id_t id)
+{
+ GWorkGroup *group; /* Groupe de travail à attendre*/
+
+ group = g_work_queue_find_group_for_id(queue, id);
+
+ if (group != NULL)
+ {
+ g_work_group_wait_for_completion(group, queue);
+ g_object_unref(G_OBJECT(group));
+ }
+
+}
+
+
+/******************************************************************************
+* *
+* Paramètres : queue = gestionnaire de l'ensemble des groupes de travail.*
+* id = identifiant d'un groupe de travail. *
+* callback = éventuelle fonction à appeler ou NULL. *
+* data = données devant accompagner l'appel. *
+* *
+* Description : Modifie les conditions d'attente des fins d'exécutions. *
+* *
+* Retour : - *
+* *
+* Remarques : - *
+* *
+******************************************************************************/
+
+void g_work_queue_set_extra_wait_callback(GWorkQueue *queue, wgroup_id_t id, wait_for_incoming_works_cb callback, void *data)
+{
+ GWorkGroup *group; /* Groupe de travail à traiter */
+
+ group = g_work_queue_find_group_for_id(queue, id);
+
+ if (group != NULL)
+ {
+ g_work_group_set_extra_wait_callback(group, callback, data);
+ g_object_unref(G_OBJECT(group));
+ }
+
+}
+
+
+/******************************************************************************
+* *
+* Paramètres : queue = gestionnaire de l'ensemble des groupes de travail.*
+* id = identifiant d'un groupe de travail. *
+* *
+* Description : Force un réveil d'une attente en cours pour la confirmer. *
+* *
+* Retour : - *
+* *
+* Remarques : - *
+* *
+******************************************************************************/
+
+void g_work_queue_wake_up_waiters(GWorkQueue *queue, wgroup_id_t id)
+{
+ GWorkGroup *group; /* Groupe de travail à traiter */
+
+ group = g_work_queue_find_group_for_id(queue, id);
+
if (group != NULL)
{
- g_work_group_wait_for_completion(group);
+ g_work_group_wake_up_waiters(group);
g_object_unref(G_OBJECT(group));
}
diff --git a/src/glibext/delayed.h b/src/glibext/delayed.h
index 0a27c93..4902e68 100644
--- a/src/glibext/delayed.h
+++ b/src/glibext/delayed.h
@@ -106,5 +106,16 @@ void g_work_queue_schedule_work(GWorkQueue *, GDelayedWork *, wgroup_id_t);
void g_work_queue_wait_for_completion(GWorkQueue *, wgroup_id_t);
+/* Etudie le besoin d'attendre d'avantage de prochaines tâches. */
+typedef bool (* wait_for_incoming_works_cb) (GWorkQueue *, wgroup_id_t, void *);
+
+
+/* Modifie les conditions d'attente des fins d'exécutions. */
+void g_work_queue_set_extra_wait_callback(GWorkQueue *, wgroup_id_t, wait_for_incoming_works_cb, void *);
+
+/* Force un réveil d'une attente en cours pour la confirmer. */
+void g_work_queue_wake_up_waiters(GWorkQueue *, wgroup_id_t);
+
+
#endif /* _GLIBEXT_DELAYED_H */