diff options
author | Cyrille Bagard <nocbos@gmail.com> | 2015-12-19 20:22:14 (GMT) |
---|---|---|
committer | Cyrille Bagard <nocbos@gmail.com> | 2015-12-19 20:22:14 (GMT) |
commit | 5710f9d5be56b427ccfa48f6a730d70396817efe (patch) | |
tree | 4b694774ce446295ef5b01a7df28bd8160b97025 /src/glibext | |
parent | 8ff010a34762737016624a68f593d0e6736d4349 (diff) |
Fixed several bugs when processing concurrent delayed works.
Diffstat (limited to 'src/glibext')
-rw-r--r-- | src/glibext/delayed.c | 203 | ||||
-rw-r--r-- | src/glibext/delayed.h | 11 |
2 files changed, 198 insertions, 16 deletions
diff --git a/src/glibext/delayed.c b/src/glibext/delayed.c index 967865b..a83fae4 100644 --- a/src/glibext/delayed.c +++ b/src/glibext/delayed.c @@ -78,11 +78,15 @@ typedef struct _GWorkGroup GMutex mutex; /* Verrou pour l'accès */ GCond cond; /* Réveil pour un traitement */ GCond wait_cond; /* Réveil d'attente de fin */ + gint pending; /* Tâches en cours d'exécution */ GThread **threads; /* Procédure de traitement */ guint threads_count; /* Nombre de procédures */ bool force_exit; /* Procédure d'arrêt */ + wait_for_incoming_works_cb callback; /* Encadre les attentes de fin */ + void *data; /* Données à associer */ + } GWorkGroup; /* File de traitement pour un type donné (classe) */ @@ -121,7 +125,13 @@ static void g_work_group_schedule(GWorkGroup *, GDelayedWork *); static void *g_work_group_process(GWorkGroup *); /* Attend que toutes les tâches d'un groupe soient traitées. */ -static void g_work_group_wait_for_completion(GWorkGroup *); +static void g_work_group_wait_for_completion(GWorkGroup *, GWorkQueue *); + +/* Modifie les conditions d'attente des fins d'exécutions. */ +static void g_work_group_set_extra_wait_callback(GWorkGroup *, wait_for_incoming_works_cb, void *); + +/* Force un réveil d'une attente en cours pour la confirmer. */ +static void g_work_group_wake_up_waiters(GWorkGroup *); @@ -166,6 +176,9 @@ static void g_work_queue_finalize(GWorkQueue *); /* Donne l'assurance de l'existence d'un groupe de travail. */ static GWorkGroup *g_work_queue_ensure_group_exists(GWorkQueue *, wgroup_id_t); +/* Fournit le groupe de travail correspondant à un identifiant. */ +static GWorkGroup *g_work_queue_find_group_for_id(GWorkQueue *, wgroup_id_t); + /* ---------------------------------------------------------------------------------- */ @@ -379,6 +392,8 @@ static void g_work_group_init(GWorkGroup *group) g_cond_init(&group->cond); g_cond_init(&group->wait_cond); + g_atomic_int_set(&group->pending, 0); + group->threads_count = g_get_num_processors(); group->threads = (GThread **)calloc(group->threads_count, sizeof(GThread *)); @@ -401,6 +416,9 @@ static void g_work_group_init(GWorkGroup *group) group->force_exit = false; + group->callback = NULL; + group->data = NULL; + } @@ -491,8 +509,11 @@ static GWorkGroup *g_work_group_new(wgroup_id_t id, GtkExtStatusBar *statusbar) result->id = id; - result->statusbar = statusbar; - g_object_ref(statusbar); + if (statusbar != NULL) + { + result->statusbar = statusbar; + g_object_ref(statusbar); + } return result; @@ -535,6 +556,8 @@ static void g_work_group_schedule(GWorkGroup *group, GDelayedWork *work) { g_mutex_lock(&group->mutex); + g_atomic_int_inc(&group->pending); + delayed_work_list_add_tail(work, &group->works); g_cond_signal(&group->cond); @@ -568,7 +591,10 @@ static void *g_work_group_process(GWorkGroup *group) g_cond_wait(&group->cond, &group->mutex); if (group->force_exit) + { + g_mutex_unlock(&group->mutex); break; + } work = group->works; delayed_work_list_del(work, &group->works); @@ -579,7 +605,8 @@ static void *g_work_group_process(GWorkGroup *group) g_object_unref(G_OBJECT(work)); - g_cond_broadcast(&group->wait_cond); + if (g_atomic_int_dec_and_test(&group->pending)) + g_cond_broadcast(&group->wait_cond); } @@ -591,6 +618,7 @@ static void *g_work_group_process(GWorkGroup *group) /****************************************************************************** * * * Paramètres : group = groupe dont les conclusions sont attendues. * +* queue = queue d'appartenance pour les appels externes. * * * * Description : Attend que toutes les tâches d'un groupe soient traitées. * * * @@ -600,18 +628,78 @@ static void *g_work_group_process(GWorkGroup *group) * * ******************************************************************************/ -static void g_work_group_wait_for_completion(GWorkGroup *group) +static void g_work_group_wait_for_completion(GWorkGroup *group, GWorkQueue *queue) { + wait_for_incoming_works_cb callback; /* Procédure complémentaire */ + + bool no_extra_check(GWorkQueue *_q, wgroup_id_t _id, void *_data) + { + return false; + } + + callback = group->callback != NULL ? group->callback : no_extra_check; + g_mutex_lock(&group->mutex); - while (dl_list_empty(group->works) && !group->force_exit) + /** + * On attend que : + * - la liste des tâches programmées soit vide. + * - il n'existe plus de tâche en cours. + * - rien n'indique que de nouvelles tâches supplémentaires vont arriver. + */ + + while ((g_atomic_int_get(&group->pending) > 0 || callback(queue, group->id, group->data)) + && !group->force_exit) + { g_cond_wait(&group->wait_cond, &group->mutex); + } g_mutex_unlock(&group->mutex); } +/****************************************************************************** +* * +* Paramètres : group = groupe dont les paramètres sont à modifier. * +* callback = éventuelle fonction à appeler ou NULL. * +* data = données devant accompagner l'appel. * +* * +* Description : Modifie les conditions d'attente des fins d'exécutions. * +* * +* Retour : - * +* * +* Remarques : - * +* * +******************************************************************************/ + +static void g_work_group_set_extra_wait_callback(GWorkGroup *group, wait_for_incoming_works_cb callback, void *data) +{ + group->callback = callback; + group->data = data; + +} + + +/****************************************************************************** +* * +* Paramètres : queue = gestionnaire de l'ensemble des groupes de travail.* +* id = identifiant d'un groupe de travail. * +* * +* Description : Force un réveil d'une attente en cours pour la confirmer. * +* * +* Retour : - * +* * +* Remarques : - * +* * +******************************************************************************/ + +static void g_work_group_wake_up_waiters(GWorkGroup *group) +{ + g_cond_broadcast(&group->wait_cond); + +} + /* ---------------------------------------------------------------------------------- */ @@ -816,7 +904,7 @@ static GWorkGroup *g_work_queue_ensure_group_exists(GWorkQueue *queue, wgroup_id queue->groups_count * sizeof(GWorkGroup *)); result = g_work_group_new(id, queue->statusbar); - queue->groups[i] = result; + queue->groups[queue->groups_count - 1] = result; } @@ -843,7 +931,7 @@ wgroup_id_t g_work_queue_define_work_group(GWorkQueue *queue) g_mutex_lock(&queue->mutex); - result = queue->generator++; + result = ++queue->generator; g_work_queue_ensure_group_exists(queue, result); @@ -932,35 +1020,118 @@ void g_work_queue_schedule_work(GWorkQueue *queue, GDelayedWork *work, wgroup_id * Paramètres : queue = gestionnaire de l'ensemble des groupes de travail. * * id = identifiant d'un groupe de travail. * * * -* Description : Attend que toutes les tâches d'un groupe soient traitées. * +* Description : Fournit le groupe de travail correspondant à un identifiant. * * * -* Retour : - * +* Retour : Eventuel groupe existant trouvé ou NULL si aucun. * * * * Remarques : - * * * ******************************************************************************/ -void g_work_queue_wait_for_completion(GWorkQueue *queue, wgroup_id_t id) +static GWorkGroup *g_work_queue_find_group_for_id(GWorkQueue *queue, wgroup_id_t id) { + GWorkGroup *result; /* Trouvaille à retourner */ size_t i; /* Boucle de parcours */ - GWorkGroup *group; /* Groupe de travail à attendre*/ - group = NULL; + result = NULL; g_mutex_lock(&queue->mutex); for (i = 0; i < queue->groups_count; i++) if (g_work_group_get_id(queue->groups[i]) == id) { - group = queue->groups[i]; - g_object_ref(G_OBJECT(group)); + result = queue->groups[i]; + g_object_ref(G_OBJECT(result)); + break; } g_mutex_unlock(&queue->mutex); + return result; + +} + + +/****************************************************************************** +* * +* Paramètres : queue = gestionnaire de l'ensemble des groupes de travail. * +* id = identifiant d'un groupe de travail. * +* * +* Description : Attend que toutes les tâches d'un groupe soient traitées. * +* * +* Retour : - * +* * +* Remarques : - * +* * +******************************************************************************/ + +void g_work_queue_wait_for_completion(GWorkQueue *queue, wgroup_id_t id) +{ + GWorkGroup *group; /* Groupe de travail à attendre*/ + + group = g_work_queue_find_group_for_id(queue, id); + + if (group != NULL) + { + g_work_group_wait_for_completion(group, queue); + g_object_unref(G_OBJECT(group)); + } + +} + + +/****************************************************************************** +* * +* Paramètres : queue = gestionnaire de l'ensemble des groupes de travail.* +* id = identifiant d'un groupe de travail. * +* callback = éventuelle fonction à appeler ou NULL. * +* data = données devant accompagner l'appel. * +* * +* Description : Modifie les conditions d'attente des fins d'exécutions. * +* * +* Retour : - * +* * +* Remarques : - * +* * +******************************************************************************/ + +void g_work_queue_set_extra_wait_callback(GWorkQueue *queue, wgroup_id_t id, wait_for_incoming_works_cb callback, void *data) +{ + GWorkGroup *group; /* Groupe de travail à traiter */ + + group = g_work_queue_find_group_for_id(queue, id); + + if (group != NULL) + { + g_work_group_set_extra_wait_callback(group, callback, data); + g_object_unref(G_OBJECT(group)); + } + +} + + +/****************************************************************************** +* * +* Paramètres : queue = gestionnaire de l'ensemble des groupes de travail.* +* id = identifiant d'un groupe de travail. * +* * +* Description : Force un réveil d'une attente en cours pour la confirmer. * +* * +* Retour : - * +* * +* Remarques : - * +* * +******************************************************************************/ + +void g_work_queue_wake_up_waiters(GWorkQueue *queue, wgroup_id_t id) +{ + GWorkGroup *group; /* Groupe de travail à traiter */ + + group = g_work_queue_find_group_for_id(queue, id); + if (group != NULL) { - g_work_group_wait_for_completion(group); + g_work_group_wake_up_waiters(group); g_object_unref(G_OBJECT(group)); } diff --git a/src/glibext/delayed.h b/src/glibext/delayed.h index 0a27c93..4902e68 100644 --- a/src/glibext/delayed.h +++ b/src/glibext/delayed.h @@ -106,5 +106,16 @@ void g_work_queue_schedule_work(GWorkQueue *, GDelayedWork *, wgroup_id_t); void g_work_queue_wait_for_completion(GWorkQueue *, wgroup_id_t); +/* Etudie le besoin d'attendre d'avantage de prochaines tâches. */ +typedef bool (* wait_for_incoming_works_cb) (GWorkQueue *, wgroup_id_t, void *); + + +/* Modifie les conditions d'attente des fins d'exécutions. */ +void g_work_queue_set_extra_wait_callback(GWorkQueue *, wgroup_id_t, wait_for_incoming_works_cb, void *); + +/* Force un réveil d'une attente en cours pour la confirmer. */ +void g_work_queue_wake_up_waiters(GWorkQueue *, wgroup_id_t); + + #endif /* _GLIBEXT_DELAYED_H */ |