diff options
Diffstat (limited to 'src/analysis/db/cdb.c')
-rw-r--r-- | src/analysis/db/cdb.c | 92 |
1 files changed, 66 insertions, 26 deletions
diff --git a/src/analysis/db/cdb.c b/src/analysis/db/cdb.c index 75a8714..167fa0c 100644 --- a/src/analysis/db/cdb.c +++ b/src/analysis/db/cdb.c @@ -803,28 +803,38 @@ static bool g_cdb_archive_load_collections(GCdbArchive *archive) static void on_collection_changed(GDbCollection *collec, DBAction action, GDbItem *item, GCdbArchive *archive) { + packed_buffer pbuf; /* Tampon d'émission */ size_t i; /* Boucle de parcours */ bool status; /* Bilan d'un envoi de retour */ - g_mutex_lock(&archive->clients_access); + init_packed_buffer(&pbuf); - for (i = 0; i < archive->count; i++) - { - status = g_db_collection_send(collec, archive->clients[i].fd, action, item); + status = true; - if (!status) - { - /* TODO : close() */ - } + g_mutex_lock(&archive->clients_access); - } + for (i = 0; i < archive->count && status; i++) + status = g_db_collection_pack(collec, &pbuf, action, item); g_mutex_unlock(&archive->clients_access); + if (status) + status = send_packed_buffer(&pbuf, archive->clients[i].fd); + + exit_packed_buffer(&pbuf); + + if (!status) + goto occ_error; + + printf("CHANGED for %d clients !!\n", (int)archive->count); + occ_error: + + /* TODO : close() */ + ; } @@ -847,10 +857,12 @@ static void *g_cdb_archive_process(GCdbArchive *archive) nfds_t nfds; /* Quantité de ces flux */ nfds_t i; /* Boucle de parcours */ int ret; /* Bilan d'un appel */ - uint32_t val32; /* Valeur sur 32 bits */ + packed_buffer in_pbuf; /* Tampon de réception */ + uint32_t tmp32; /* Valeur sur 32 bits */ bool status; /* Bilan de lecture initiale */ uint32_t command; /* Commande de la requête */ DBError error; /* Bilan d'une opération */ + packed_buffer out_pbuf; /* Tampon d'émission */ GDbCollection *collec; /* Collection visée au final */ void interrupt_poll_with_sigusr1(int sig) { }; @@ -907,10 +919,13 @@ static void *g_cdb_archive_process(GCdbArchive *archive) /* Données présentes en entrée */ if (fds[i].revents & (POLLIN | POLLPRI)) { - status = safe_recv(fds[i].fd, &val32, sizeof(uint32_t), 0); + status = recv_packed_buffer(&in_pbuf, fds[i].fd); if (!status) goto gcap_bad_exchange; - command = be32toh(val32); + status = extract_packed_buffer(&in_pbuf, &tmp32, sizeof(uint32_t), true); + if (!status) goto gcap_bad_exchange; + + command = tmp32; switch (command) { @@ -918,23 +933,31 @@ static void *g_cdb_archive_process(GCdbArchive *archive) error = g_cdb_archive_write(archive); - if (!safe_send(fds[i].fd, (uint32_t []) { htobe32(DBC_SAVE) }, sizeof(uint32_t), 0)) - goto gcap_bad_exchange; + init_packed_buffer(&out_pbuf); + + status = extend_packed_buffer(&out_pbuf, (uint32_t []) { DBC_SAVE }, + sizeof(uint32_t), true); + if (!status) goto gcap_bad_reply; + + status = extend_packed_buffer(&out_pbuf, (uint32_t []) { error }, sizeof(uint32_t), true); + if (!status) goto gcap_bad_reply; - if (!safe_send(fds[i].fd, (uint32_t []) { htobe32(error) }, sizeof(uint32_t), 0)) - goto gcap_bad_exchange; + status = send_packed_buffer(&out_pbuf, fds[i].fd); + if (!status) goto gcap_bad_reply; + + exit_packed_buffer(&out_pbuf); break; case DBC_COLLECTION: - status = safe_recv(fds[i].fd, &val32, sizeof(uint32_t), 0); + status = extract_packed_buffer(&in_pbuf, &tmp32, sizeof(uint32_t), true); if (!status) goto gcap_bad_exchange; - collec = find_collection_in_list(archive->collections, be32toh(val32)); + collec = find_collection_in_list(archive->collections, tmp32); if (collec == NULL) goto gcap_bad_exchange; - status = g_db_collection_recv(collec, fds[i].fd, archive->db); + status = g_db_collection_unpack(collec, &in_pbuf, archive->db); if (!status) goto gcap_bad_exchange; printf("## CDB ## Got something for collection %p...\n", collec); @@ -947,7 +970,7 @@ static void *g_cdb_archive_process(GCdbArchive *archive) case DBC_SET_LAST_ACTIVE: - status = update_activity_in_collections(archive->collections, fds[i].fd, archive->db); + status = update_activity_in_collections(archive->collections, &in_pbuf, archive->db); if (!status) goto gcap_bad_exchange; break; @@ -959,12 +982,20 @@ static void *g_cdb_archive_process(GCdbArchive *archive) } + exit_packed_buffer(&in_pbuf); + continue; + gcap_bad_reply: + + exit_packed_buffer(&out_pbuf); + gcap_bad_exchange: printf("Bad exchange...\n"); + exit_packed_buffer(&in_pbuf); + /* TODO : close conn */ ; @@ -1018,6 +1049,8 @@ DBError g_cdb_archive_add_client(GCdbArchive *archive, int fd, const rle_string GDbCollection *collec; /* Collection visée manipulée */ volatile pthread_t *process_id; /* Identifiant de la procédure */ + packed_buffer out_pbuf; /* Tampon d'émission */ + bool status; /* Bilan d'un envoi de retour */ @@ -1056,20 +1089,27 @@ DBError g_cdb_archive_add_client(GCdbArchive *archive, int fd, const rle_string /* Envoi des mises à jour au nouveau client... */ + init_packed_buffer(&out_pbuf); + + status = true; + + + /* TODO : lock ? */ + for (iter = g_list_first(archive->collections); - iter != NULL; + iter != NULL && status; iter = g_list_next(iter)) { collec = G_DB_COLLECTION(iter->data); - if (!g_db_collection_send_all_updates(collec, fd)) - /* TODO */; - - + status = g_db_collection_pack_all_updates(collec, &out_pbuf); + } + if (status) + status = send_packed_buffer(&out_pbuf, fd); - } + exit_packed_buffer(&out_pbuf); |