From e060d9a9f29c0d7708e7002c2e1fc4962020ba94 Mon Sep 17 00:00:00 2001 From: Cyrille Bagard Date: Fri, 1 Jun 2018 20:57:41 +0200 Subject: Improved the way threads are managed. --- src/core/queue.c | 107 ++++++++++++++++++++++++++++++++++++++++++++------ src/core/queue.h | 11 +++++- src/glibext/delayed.c | 99 +++++++++++++++++++++++++++++++++------------- src/glibext/delayed.h | 3 ++ 4 files changed, 177 insertions(+), 43 deletions(-) diff --git a/src/core/queue.c b/src/core/queue.c index 4439e61..9323803 100644 --- a/src/core/queue.c +++ b/src/core/queue.c @@ -25,12 +25,19 @@ #include +#include #include "global.h" +/* Mémorisation des groupes de travail */ +static wgroup_id_t *_global_group_ids = NULL; +static size_t _global_group_count = 0; +G_LOCK_DEFINE_STATIC(_gg_mutex); + + /****************************************************************************** * * * Paramètres : - * @@ -54,24 +61,24 @@ bool init_global_works(void) set_work_queue(queue); #ifndef NDEBUG - expected = g_work_queue_define_work_group(queue); + expected = setup_global_work_group(); assert(expected == DEFAULT_WORK_GROUP); #else - g_work_queue_define_work_group(queue); + setup_global_work_group(); #endif #ifndef NDEBUG - expected = g_work_queue_define_work_group(queue); + expected = setup_global_work_group(); assert(expected == LOADING_WORK_GROUP); #else - g_work_queue_define_work_group(queue); + setup_global_work_group(); #endif #ifndef NDEBUG - expected = g_work_queue_define_work_group(queue); + expected = setup_global_work_group(); assert(expected == STORAGE_WORK_GROUP); #else - g_work_queue_define_work_group(queue); + setup_global_work_group(); #endif return true; @@ -83,6 +90,76 @@ bool init_global_works(void) * * * Paramètres : - * * * +* Description : Constitue un nouveau groupe de travail global. * +* * +* Retour : Nouvel identifiant unique d'un nouveau groupe de travail. * +* * +* Remarques : - * +* * +******************************************************************************/ + +wgroup_id_t setup_global_work_group(void) +{ + wgroup_id_t result; /* Valeur à retourner */ + GWorkQueue *queue; /* Singleton pour tâches */ + + queue = get_work_queue(); + + result = g_work_queue_define_work_group(queue); + + G_LOCK(_gg_mutex); + + _global_group_ids = (wgroup_id_t *)realloc(_global_group_ids, + ++_global_group_count * sizeof(wgroup_id_t)); + + _global_group_ids[_global_group_count - 1] = result; + + G_UNLOCK(_gg_mutex); + + return result; + +} + + +/****************************************************************************** +* * +* Paramètres : count = quantité de threads à allouer. * +* * +* Description : Constitue un nouveau petit groupe de travail global. * +* * +* Retour : Nouvel identifiant unique d'un nouveau groupe de travail. * +* * +* Remarques : - * +* * +******************************************************************************/ + +wgroup_id_t setup_tiny_global_work_group(guint count) +{ + wgroup_id_t result; /* Valeur à retourner */ + GWorkQueue *queue; /* Singleton pour tâches */ + + queue = get_work_queue(); + + result = g_work_queue_define_tiny_work_group(queue, count); + + G_LOCK(_gg_mutex); + + _global_group_ids = (wgroup_id_t *)realloc(_global_group_ids, + ++_global_group_count * sizeof(wgroup_id_t)); + + _global_group_ids[_global_group_count - 1] = result; + + G_UNLOCK(_gg_mutex); + + return result; + +} + + +/****************************************************************************** +* * +* Paramètres : - * +* * * Description : Supprime les mécanismes de traitements parallèles. * * * * Retour : - * @@ -95,6 +172,16 @@ void exit_global_works(void) { GWorkQueue *queue; /* Singleton pour tâches */ + G_LOCK(_gg_mutex); + + if (_global_group_ids != NULL) + free(_global_group_ids); + + _global_group_ids = NULL; + _global_group_count = 0; + + G_UNLOCK(_gg_mutex); + queue = get_work_queue(); g_object_unref(G_OBJECT(queue)); @@ -118,14 +205,8 @@ void wait_for_all_global_works(void) { GWorkQueue *queue; /* Singleton pour tâches */ - static const wgroup_id_t group_ids[GLOBAL_WORK_GROUPS_COUNT] = { - DEFAULT_WORK_GROUP, - LOADING_WORK_GROUP, - STORAGE_WORK_GROUP - }; - queue = get_work_queue(); - g_work_queue_wait_for_all_completions(queue, group_ids, GLOBAL_WORK_GROUPS_COUNT); + g_work_queue_wait_for_all_completions(queue, _global_group_ids, _global_group_count); } diff --git a/src/core/queue.h b/src/core/queue.h index a1b82f2..8f1625e 100644 --- a/src/core/queue.h +++ b/src/core/queue.h @@ -28,6 +28,9 @@ #include +#include "../glibext/delayed.h" + + /** * Groupes d'exécution principaux. @@ -37,12 +40,16 @@ #define LOADING_WORK_GROUP 1 #define STORAGE_WORK_GROUP 2 -#define GLOBAL_WORK_GROUPS_COUNT 3 - /* Met en place les mécanismes de traitements parallèles. */ bool init_global_works(void); +/* Constitue un nouveau groupe de travail global. */ +wgroup_id_t setup_global_work_group(void); + +/* Constitue un nouveau petit groupe de travail global. */ +wgroup_id_t setup_tiny_global_work_group(guint); + /* Supprime les mécanismes de traitements parallèles. */ void exit_global_works(void); diff --git a/src/glibext/delayed.c b/src/glibext/delayed.c index e93edc2..caffbaa 100644 --- a/src/glibext/delayed.c +++ b/src/glibext/delayed.c @@ -25,6 +25,7 @@ #include +#include #include #include #include @@ -112,7 +113,7 @@ static void g_work_group_dispose(GWorkGroup *); static void g_work_group_finalize(GWorkGroup *); /* Crée un nouveau thread dédié à un type de travaux donné. */ -static GWorkGroup *g_work_group_new(wgroup_id_t); +static GWorkGroup *g_work_group_new(wgroup_id_t, const guint *); /* Fournit l'identifiant associé à un groupe de travail. */ static wgroup_id_t g_work_group_get_id(const GWorkGroup *); @@ -175,7 +176,7 @@ static void g_work_queue_dispose(GWorkQueue *); static void g_work_queue_finalize(GWorkQueue *); /* Donne l'assurance de l'existence d'un groupe de travail. */ -static bool g_work_queue_ensure_group_exists(GWorkQueue *, wgroup_id_t); +static bool g_work_queue_ensure_group_exists(GWorkQueue *, wgroup_id_t, const guint *); /* Fournit le groupe de travail correspondant à un identifiant. */ static GWorkGroup *g_work_queue_find_group_for_id(GWorkQueue *, wgroup_id_t); @@ -386,8 +387,7 @@ static void g_work_group_class_init(GWorkGroupClass *klass) static void g_work_group_init(GWorkGroup *group) { - guint i; /* Boucle de parcours */ - char name[16]; /* Désignation humaine */ + group->works = NULL; g_mutex_init(&group->mutex); g_cond_init(&group->cond); @@ -395,26 +395,8 @@ static void g_work_group_init(GWorkGroup *group) g_atomic_int_set(&group->pending, 0); + group->threads = NULL; group->threads_count = g_get_num_processors(); - - group->threads = (GThread **)calloc(group->threads_count, sizeof(GThread *)); - - for (i = 0; i < group->threads_count; i++) - { - snprintf(name, sizeof(name), "work_group_%u", i); - - group->threads[i] = g_thread_new(name, (GThreadFunc)g_work_group_process, group); - if (!group->threads[i]) - goto gwgi_error; - - } - - gwgi_error: - - group->threads_count = i; - - assert(i > 0); - group->force_exit = false; group->callback = NULL; @@ -498,7 +480,8 @@ static void g_work_group_finalize(GWorkGroup *group) /****************************************************************************** * * -* Paramètres : id = identifiant accordé au nouveau groupe. * +* Paramètres : id = identifiant accordé au nouveau groupe. * +* count = quantité de threads à allouer. * * * * Description : Crée un nouveau thread dédié à un type de travaux donné. * * * @@ -508,14 +491,39 @@ static void g_work_group_finalize(GWorkGroup *group) * * ******************************************************************************/ -static GWorkGroup *g_work_group_new(wgroup_id_t id) +static GWorkGroup *g_work_group_new(wgroup_id_t id, const guint *count) { GWorkGroup *result; /* Traiteur à retourner */ + guint i; /* Boucle de parcours */ + char name[16]; /* Désignation humaine */ result = g_object_new(G_TYPE_WORK_GROUP, NULL); result->id = id; + result->threads_count = g_get_num_processors(); + + if (count != NULL && *count < result->threads_count) + result->threads_count = *count; + + result->threads = (GThread **)calloc(result->threads_count, sizeof(GThread *)); + + for (i = 0; i < result->threads_count; i++) + { + snprintf(name, sizeof(name), "wgrp_%" PRIu64 "-%u", id, i); + + result->threads[i] = g_thread_new(name, (GThreadFunc)g_work_group_process, result); + if (!result->threads[i]) + goto start_error; + + } + + start_error: + + result->threads_count = i; + + assert(i > 0); + return result; } @@ -931,6 +939,7 @@ GWorkQueue *g_work_queue_new(void) * * * Paramètres : queue = gestionnaire de l'ensemble des groupes de travail. * * id = identifiant d'un groupe de travail. * +* count = quantité de threads à allouer. * * * * Description : Donne l'assurance de l'existence d'un groupe de travail. * * * @@ -940,7 +949,7 @@ GWorkQueue *g_work_queue_new(void) * * ******************************************************************************/ -static bool g_work_queue_ensure_group_exists(GWorkQueue *queue, wgroup_id_t id) +static bool g_work_queue_ensure_group_exists(GWorkQueue *queue, wgroup_id_t id, const guint *count) { bool found; /* Bilan des recherches */ size_t i; /* Boucle de parcours */ @@ -962,7 +971,7 @@ static bool g_work_queue_ensure_group_exists(GWorkQueue *queue, wgroup_id_t id) queue->groups = (GWorkGroup **)realloc(queue->groups, queue->groups_count * sizeof(GWorkGroup *)); - group = g_work_group_new(id); + group = g_work_group_new(id, count); queue->groups[queue->groups_count - 1] = group; } @@ -994,7 +1003,41 @@ wgroup_id_t g_work_queue_define_work_group(GWorkQueue *queue) do { result = queue->generator++; - created = g_work_queue_ensure_group_exists(queue, result); + created = g_work_queue_ensure_group_exists(queue, result, NULL); + } + while (!created); + + g_mutex_unlock(&queue->mutex); + + return result; + +} + + +/****************************************************************************** +* * +* Paramètres : queue = gestionnaire de l'ensemble des groupes de travail. * +* count = quantité de threads à allouer. * +* * +* Description : Constitue un nouveau petit groupe de travail. * +* * +* Retour : Nouvel identifiant unique d'un nouveau groupe de travail. * +* * +* Remarques : - * +* * +******************************************************************************/ + +wgroup_id_t g_work_queue_define_tiny_work_group(GWorkQueue *queue, guint count) +{ + wgroup_id_t result; /* Valeur à retourner */ + bool created; /* Bilan d'une tentative */ + + g_mutex_lock(&queue->mutex); + + do + { + result = queue->generator++; + created = g_work_queue_ensure_group_exists(queue, result, &count); } while (!created); diff --git a/src/glibext/delayed.h b/src/glibext/delayed.h index 4669d2d..0fb03bd 100644 --- a/src/glibext/delayed.h +++ b/src/glibext/delayed.h @@ -98,6 +98,9 @@ GWorkQueue *g_work_queue_new(void); /* Constitue un nouveau groupe de travail. */ wgroup_id_t g_work_queue_define_work_group(GWorkQueue *); +/* Constitue un nouveau petit groupe de travail. */ +wgroup_id_t g_work_queue_define_tiny_work_group(GWorkQueue *, guint); + /* Dissout un groupe de travail existant. */ void g_work_queue_delete_work_group(GWorkQueue *, wgroup_id_t); -- cgit v0.11.2-87-g4458