diff options
-rw-r--r-- | src/analysis/db/cdb.c | 111 | ||||
-rw-r--r-- | src/analysis/db/client.c | 51 | ||||
-rw-r--r-- | src/analysis/db/collection.c | 117 | ||||
-rw-r--r-- | src/analysis/db/collection.h | 6 | ||||
-rw-r--r-- | src/analysis/db/protocol.h | 41 |
5 files changed, 243 insertions, 83 deletions
diff --git a/src/analysis/db/cdb.c b/src/analysis/db/cdb.c index 829be19..c2855f7 100644 --- a/src/analysis/db/cdb.c +++ b/src/analysis/db/cdb.c @@ -922,14 +922,33 @@ static void *g_cdb_archive_process(GCdbArchive *archive) status = g_db_collection_unpack(collec, &in_pbuf, archive->db); if (!status) goto gcap_bad_exchange; - printf("## CDB ## Got something for collection %p...\n", collec); + break; + + case DBC_GET_ALL_ITEMS: + + init_packed_buffer(&out_pbuf); + + status = extend_packed_buffer(&out_pbuf, (uint32_t []) { DBC_SET_ALL_ITEMS }, + sizeof(uint32_t), true); + if (!status) goto gcap_bad_reply; + + status = pack_all_collection_updates(archive->collections, &out_pbuf); + if (!status) goto gcap_bad_reply; - //GDbCollection *find_collection_in_list(GList *, uint32_t); + status = ssl_send_packed_buffer(&out_pbuf, archive->clients[i].ssl_fd); + if (!status) goto gcap_bad_reply; - //static GGenConfig *find_collection_in_list(GList *list, uint32_t id) + exit_packed_buffer(&out_pbuf); break; + case DBC_SET_ALL_ITEMS: + asprintf(&msg, _("This command is not available on this side: 0x%08x"), command); + LOG_ERROR(LMT_ERROR, msg); + free(msg); + goto gcap_bad_exchange; + break; + case DBC_SET_LAST_ACTIVE: status = update_activity_in_collections(archive->collections, &in_pbuf, archive->db); @@ -1005,69 +1024,55 @@ static void *g_cdb_archive_process(GCdbArchive *archive) void g_cdb_archive_add_client(GCdbArchive *archive, SSL *fd, const rle_string *user) { - packed_buffer out_pbuf; /* Tampon d'émission */ - bool status; /* Bilan d'un envoi de retour */ - GList *iter; /* Boucle de parcours */ - GDbCollection *collec; /* Collection visée manipulée */ volatile pthread_t *process_id; /* Identifiant de la procédure */ - /* Envoi des mises à jour au nouveau client... */ - - init_packed_buffer(&out_pbuf); - - status = true; + /** + * La situation est un peu compliquée lors de l'accueil d'un nouveau client : + * + * - soit on envoie tous les éléments à prendre en compte, mais on doit + * bloquer la base de données jusqu'à l'intégration pleine et entière + * du client, afin que ce dernier ne loupe pas l'envoi d'un nouvel + * élément entre temps. + * + * - soit on intègre le client et celui ci demande des mises à jour + * collection par collection ; c'est également à lui qui revient le rejet + * des éléments envoyés en solitaires avant la réception de la base + * complète. + * + * On fait le choix du second scenario ici, du fait de la difficulté + * de maîtriser facilement la reconstitution d'une liste de clients dans + * g_cdb_archive_process() depuis un autre flot d'exécution. + */ + + /* Ajout dans la liste officielle */ - for (iter = g_list_first(archive->collections); - iter != NULL && status; - iter = g_list_next(iter)) - { - collec = G_DB_COLLECTION(iter->data); - - status = g_db_collection_pack_all_updates(collec, &out_pbuf); - - } + g_mutex_lock(&archive->clients_access); - if (status && get_packed_buffer_payload_length(&out_pbuf) > 0) - status = ssl_send_packed_buffer(&out_pbuf, fd); + archive->clients = realloc(archive->clients, ++archive->count * sizeof(cdb_client)); - exit_packed_buffer(&out_pbuf); + archive->clients[archive->count - 1].ssl_fd = fd; + dup_into_rle_string(&archive->clients[archive->count - 1].user, get_rle_string(user)); - if (!status) - LOG_ERROR(LMT_ERROR, _("Failed to add a client")); + /* Démarrage ou redémarrage du processus d'écoute */ - else + if (archive->process == NULL) { - /* Ajout dans la liste officielle */ + archive->process = g_thread_new("cdb_process", (GThreadFunc)g_cdb_archive_process, archive); - g_mutex_lock(&archive->clients_access); - - archive->clients = realloc(archive->clients, ++archive->count * sizeof(cdb_client)); + /* On attend que le processus parallèle soit prêt */ - archive->clients[archive->count - 1].ssl_fd = fd; - dup_into_rle_string(&archive->clients[archive->count - 1].user, get_rle_string(user)); + process_id = &archive->process_id; - /* Démarrage ou redémarrage du processus d'écoute */ - - if (archive->process == NULL) - { - archive->process = g_thread_new("cdb_process", (GThreadFunc)g_cdb_archive_process, archive); - - /* On attend que le processus parallèle soit prêt */ - - process_id = &archive->process_id; - - g_mutex_lock(&archive->id_access); - while (process_id == 0) - g_cond_wait(&archive->id_cond, &archive->id_access); - g_mutex_unlock(&archive->id_access); - - } - else - pthread_kill(archive->process_id, SIGUSR1); - - g_mutex_unlock(&archive->clients_access); + g_mutex_lock(&archive->id_access); + while (process_id == 0) + g_cond_wait(&archive->id_cond, &archive->id_access); + g_mutex_unlock(&archive->id_access); } + else + pthread_kill(archive->process_id, SIGUSR1); + + g_mutex_unlock(&archive->clients_access); } diff --git a/src/analysis/db/client.c b/src/analysis/db/client.c index 25fe64d..24198f6 100644 --- a/src/analysis/db/client.c +++ b/src/analysis/db/client.c @@ -68,6 +68,7 @@ struct _GDbClient char *desc; /* Description du lien */ GMutex sending_lock; /* Concurrence des envois */ + bool can_get_updates; /* Réception de maj possibles ?*/ GThread *update; /* Procédure de traitement */ }; @@ -519,6 +520,8 @@ static bool g_db_client_start_common(GDbClient *client, char *desc) } + client->can_get_updates = false; + client->update = g_thread_try_new("cdb_client", (GThreadFunc)g_db_client_update, client, NULL); if (client->update == NULL) { @@ -571,16 +574,44 @@ static bool g_db_client_start_common(GDbClient *client, char *desc) static void *g_db_client_update(GDbClient *client) { + packed_buffer out_pbuf; /* Tampon d'émission */ + bool status; /* Bilan d'une opération */ struct pollfd fds; /* Surveillance des flux */ packed_buffer in_pbuf; /* Tampon de réception */ int ret; /* Bilan d'un appel */ uint32_t tmp32; /* Valeur sur 32 bits */ - bool status; /* Bilan d'une opération */ uint32_t command; /* Commande de la requête */ DBError error; /* Bilan d'une commande passée */ GDbCollection *collec; /* Collection visée au final */ char *msg; /* Message d'erreur à imprimer */ + /** + * Avant toute chose, on demande un stage d'actualisation ! + */ + + init_packed_buffer(&out_pbuf); + + status = extend_packed_buffer(&out_pbuf, (uint32_t []) { DBC_GET_ALL_ITEMS }, sizeof(uint32_t), true); + if (!status) + { + exit_packed_buffer(&out_pbuf); + goto exit; + } + + status = ssl_send_packed_buffer(&out_pbuf, client->tls_fd); + if (!status) + { + log_simple_message(LMT_INFO, _("Failed to get all updates")); + exit_packed_buffer(&out_pbuf); + goto exit; + } + + exit_packed_buffer(&out_pbuf); + + /** + * Phase d'écoute continue... + */ + fds.fd = client->fd; fds.events = POLLIN | POLLPRI; @@ -631,11 +662,25 @@ static void *g_db_client_update(GDbClient *client) collec = find_collection_in_list(client->collections, tmp32); if (collec == NULL) goto gdcu_bad_exchange; - status = g_db_collection_unpack(collec, &in_pbuf, NULL); + if (client->can_get_updates) + status = g_db_collection_unpack(collec, &in_pbuf, NULL); + else + status = _g_db_collection_unpack(collec, &in_pbuf, (DBAction []) { 0 }, NULL); + if (!status) goto gdcu_bad_exchange; break; + case DBC_GET_ALL_ITEMS: + log_variadic_message(LMT_INFO, + _("This command is not available on this side: 0x%08x"), command); + goto gdcu_bad_exchange; + break; + + case DBC_SET_ALL_ITEMS: + client->can_get_updates = true; + break; + } continue; @@ -654,6 +699,8 @@ static void *g_db_client_update(GDbClient *client) } + exit: + g_db_client_stop(client); exit_packed_buffer(&in_pbuf); diff --git a/src/analysis/db/collection.c b/src/analysis/db/collection.c index adb3ad1..2c6086a 100644 --- a/src/analysis/db/collection.c +++ b/src/analysis/db/collection.c @@ -271,9 +271,53 @@ const char *g_db_collection_get_name(const GDbCollection *collec) +/****************************************************************************** +* * +* Paramètres : collec = ensemble d'éléments à considérer. * +* pbuf = paquet de données où venir puiser les infos. * +* action = commande de la requête. [OUT] * +* dest = élément de collection ou NULL pour un rejet. [OUT] * +* * +* Description : Réceptionne un élément depuis une requête réseau. * +* * +* Retour : Bilan de l'exécution de l'opération. * +* * +* Remarques : - * +* * +******************************************************************************/ +bool _g_db_collection_unpack(GDbCollection *collec, packed_buffer *pbuf, DBAction *action, GDbItem **dest) +{ + bool result; /* Bilan à faire remonter */ + uint32_t tmp32; /* Valeur sur 32 bits */ + GDbItem *item; /* Définition d'élément visé */ + result = extract_packed_buffer(pbuf, &tmp32, sizeof(uint32_t), true); + if (!result) goto exit; + + *action = tmp32; + result = (*action >= 0 && *action < DBA_COUNT); + if (!result) goto exit; + + item = g_object_new(collec->type, NULL); + + result = g_db_item_unpack(item, pbuf); + if (!result) goto exit; + + if (dest != NULL) + *dest = item; + else + g_object_unref(G_OBJECT(item)); + + exit: + + if (!result) + g_object_unref(G_OBJECT(item)); + + return result; + +} /****************************************************************************** @@ -295,28 +339,13 @@ const char *g_db_collection_get_name(const GDbCollection *collec) bool g_db_collection_unpack(GDbCollection *collec, packed_buffer *pbuf, sqlite3 *db) { bool result; /* Bilan à faire remonter */ - uint32_t tmp32; /* Valeur sur 32 bits */ - bool status; /* Bilan de lecture initiale */ DBAction action; /* Commande de la requête */ GDbItem *item; /* Définition d'élément visé */ GList *found; /* Test de présence existante */ timestamp_t inactive; /* Horodatage de désactivation */ - result = extract_packed_buffer(pbuf, &tmp32, sizeof(uint32_t), true); - action = tmp32; - - if (action < 0 || action >= DBA_COUNT) - result = false; - - if (!result) - return result; - - item = g_object_new(collec->type, NULL); - - status = g_db_item_unpack(item, pbuf); - if (!status) return false; - - result = false; + result = _g_db_collection_unpack(collec, pbuf, &action, &item); + if (!result) return false; switch (action) { @@ -459,7 +488,10 @@ bool g_db_collection_pack_all_updates(GDbCollection *collec, packed_buffer *pbuf result = true; - /* TODO : lock ? */ + /** + * La gestion des accès s'effectue depuis le seul appelant : la fonction + * g_cdb_archive_add_client(). + */ for (iter = g_list_first(collec->items); iter != NULL && result; @@ -1529,6 +1561,48 @@ void lock_unlock_collections(GList *list, bool write, bool lock) /****************************************************************************** * * * Paramètres : list = ensemble de collectons à traiter. * +* pbuf = paquet de données où venir inscrire des infos. * +* * +* Description : Collecte les informations utiles pour un nouvel arrivant. * +* * +* Retour : Bilan du déroulement des opérations. * +* * +* Remarques : - * +* * +******************************************************************************/ + +bool pack_all_collection_updates(GList *list, packed_buffer *pbuf) +{ + bool result; /* Bilan à retourner */ + GList *iter; /* Boucle de parcours */ + GDbCollection *collec; /* Collection visée manipulée */ + + result = true; + + /** + * Cette procédure n'est appelée que depuis g_cdb_archive_process(), + * qui bloque son exécution jusqu'à la fin des opérations. + * + * On a donc l'assurance d'un récupérer tous les éléments d'un coup, + * sans activité parallèle. + */ + + for (iter = g_list_first(list); iter != NULL && result; iter = g_list_next(iter)) + { + collec = G_DB_COLLECTION(iter->data); + + result = g_db_collection_pack_all_updates(collec, pbuf); + + } + + return result; + +} + + +/****************************************************************************** +* * +* Paramètres : list = ensemble de collectons à traiter. * * pbuf = paquet de données où venir puiser les infos. * * db = base de données à mettre à jour. * * * @@ -1553,7 +1627,12 @@ bool update_activity_in_collections(GList *list, packed_buffer *pbuf, sqlite3 *d GList *i; /* Boucle de parcours #2 */ GDbItem *item; /* Elément collecté à manipuler*/ - /* TODO : lock ? */ + /** + * Cette procédure n'est appelée que depuis g_cdb_archive_process(), + * qui bloque son exécution jusqu'à la fin des opérations. + * + * On a donc l'assurance d'un traitement global homgène des horodatages. + */ status = unpack_timestamp(×tamp, pbuf); if (!status) return false; diff --git a/src/analysis/db/collection.h b/src/analysis/db/collection.h index 0f0ad34..cbcf42c 100644 --- a/src/analysis/db/collection.h +++ b/src/analysis/db/collection.h @@ -71,7 +71,8 @@ uint32_t g_db_collection_get_feature(const GDbCollection *); const char *g_db_collection_get_name(const GDbCollection *); - +/* Réceptionne un élément depuis une requête réseau. */ +bool _g_db_collection_unpack(GDbCollection *, packed_buffer *, DBAction *, GDbItem **); /* Réceptionne et traite une requête réseau pour collection. */ bool g_db_collection_unpack(GDbCollection *, packed_buffer *, sqlite3 *); @@ -157,6 +158,9 @@ void lock_unlock_collections(GList *, bool, bool); #define rlock_collections(lst) lock_unlock_collections(lst, false, true); #define runlock_collections(lst) lock_unlock_collections(lst, false, false); +/* Collecte les informations utiles pour un nouvel arrivant. */ +bool pack_all_collection_updates(GList *, packed_buffer *); + /* Met à jour les statuts d'activité des éléments. */ bool update_activity_in_collections(GList *, packed_buffer *, sqlite3 *); diff --git a/src/analysis/db/protocol.h b/src/analysis/db/protocol.h index 025f92f..311a691 100644 --- a/src/analysis/db/protocol.h +++ b/src/analysis/db/protocol.h @@ -117,22 +117,50 @@ typedef enum _DBCommand DBC_SAVE, /* Enregistrement de l'archive */ DBC_COLLECTION, /* Implication d'une collection*/ + /** + * Gestion de la commande 'DBC_[GS]ET_ALL_ITEMS'. + * + * Un client qui se connecte à un serveur doit en premier lieu envoyer : + * + * [ Demande de mise à jour : DBC_GET_ALL_ITEMS ] + * + * Tant qu'il ne reçoit pas la commande DBC_SET_ALL_ITEMS depuis le + * serveur, toutes les actions sur une collection sont à rejeter car elles + * lui seront retransmises plus tard. + * + * De son côté, le serveur répond par une requête : + * + * [ Notification de maj : DBC_SET_ALL_ITEMS ] + * + * Dans la foulée, il enverra ensuite les éléments avec des paquets classiques : + * + * [ Traitement de collection : DBC_COLLECTION ] + * [ Action : DBA_ADD_ITEM ] + * ... + * + * Les traitements se réalisent dans : + * - g_db_client_update() pour la partie client. + * - g_cdb_archive_process() pour la partie serveur. + * + */ + DBC_GET_ALL_ITEMS, /* Mise à jour à la connexion */ + DBC_SET_ALL_ITEMS, /* Mise à jour à la connexion */ /** * Gestion de la commande 'DBC_SET_LAST_ACTIVE'. * * Le client connecté envoie un paquet de la forme suivante : * - * [ Statut d'historique : DBC_SET_LAST_ACTIVE ] - * [ <horodatage du dernier élément actif ] + * [ Statut d'historique : DBC_SET_LAST_ACTIVE ] + * [ <horodatage du dernier élément actif ] * * Le serveur s'exécute et notifie le client d'éventuels changements, * avec une série de paquets de la forme : * - * [ Traitement de collection : DBC_COLLECTION ] - * [ Action : DBC_SET_LAST_ACTIVE ] - * [ <élément dont le statut a évolué> ] + * [ Traitement de collection : DBC_COLLECTION ] + * [ Action : DBC_SET_LAST_ACTIVE ] + * [ <élément dont le statut a évolué> ] * * Les traitements se réalisent dans : * - g_db_collection_set_last_active() pour la partie serveur. @@ -142,9 +170,6 @@ typedef enum _DBCommand DBC_SET_LAST_ACTIVE, /* Définition du dernier actif */ - - - DBC_COUNT } DBCommand; |