summaryrefslogtreecommitdiff
path: root/src/analysis/db/cdb.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/analysis/db/cdb.c')
-rw-r--r--src/analysis/db/cdb.c92
1 files changed, 66 insertions, 26 deletions
diff --git a/src/analysis/db/cdb.c b/src/analysis/db/cdb.c
index 75a8714..167fa0c 100644
--- a/src/analysis/db/cdb.c
+++ b/src/analysis/db/cdb.c
@@ -803,28 +803,38 @@ static bool g_cdb_archive_load_collections(GCdbArchive *archive)
static void on_collection_changed(GDbCollection *collec, DBAction action, GDbItem *item, GCdbArchive *archive)
{
+ packed_buffer pbuf; /* Tampon d'émission */
size_t i; /* Boucle de parcours */
bool status; /* Bilan d'un envoi de retour */
- g_mutex_lock(&archive->clients_access);
+ init_packed_buffer(&pbuf);
- for (i = 0; i < archive->count; i++)
- {
- status = g_db_collection_send(collec, archive->clients[i].fd, action, item);
+ status = true;
- if (!status)
- {
- /* TODO : close() */
- }
+ g_mutex_lock(&archive->clients_access);
- }
+ for (i = 0; i < archive->count && status; i++)
+ status = g_db_collection_pack(collec, &pbuf, action, item);
g_mutex_unlock(&archive->clients_access);
+ if (status)
+ status = send_packed_buffer(&pbuf, archive->clients[i].fd);
+
+ exit_packed_buffer(&pbuf);
+
+ if (!status)
+ goto occ_error;
+
+
printf("CHANGED for %d clients !!\n", (int)archive->count);
+ occ_error:
+
+ /* TODO : close() */
+ ;
}
@@ -847,10 +857,12 @@ static void *g_cdb_archive_process(GCdbArchive *archive)
nfds_t nfds; /* Quantité de ces flux */
nfds_t i; /* Boucle de parcours */
int ret; /* Bilan d'un appel */
- uint32_t val32; /* Valeur sur 32 bits */
+ packed_buffer in_pbuf; /* Tampon de réception */
+ uint32_t tmp32; /* Valeur sur 32 bits */
bool status; /* Bilan de lecture initiale */
uint32_t command; /* Commande de la requête */
DBError error; /* Bilan d'une opération */
+ packed_buffer out_pbuf; /* Tampon d'émission */
GDbCollection *collec; /* Collection visée au final */
void interrupt_poll_with_sigusr1(int sig) { };
@@ -907,10 +919,13 @@ static void *g_cdb_archive_process(GCdbArchive *archive)
/* Données présentes en entrée */
if (fds[i].revents & (POLLIN | POLLPRI))
{
- status = safe_recv(fds[i].fd, &val32, sizeof(uint32_t), 0);
+ status = recv_packed_buffer(&in_pbuf, fds[i].fd);
if (!status) goto gcap_bad_exchange;
- command = be32toh(val32);
+ status = extract_packed_buffer(&in_pbuf, &tmp32, sizeof(uint32_t), true);
+ if (!status) goto gcap_bad_exchange;
+
+ command = tmp32;
switch (command)
{
@@ -918,23 +933,31 @@ static void *g_cdb_archive_process(GCdbArchive *archive)
error = g_cdb_archive_write(archive);
- if (!safe_send(fds[i].fd, (uint32_t []) { htobe32(DBC_SAVE) }, sizeof(uint32_t), 0))
- goto gcap_bad_exchange;
+ init_packed_buffer(&out_pbuf);
+
+ status = extend_packed_buffer(&out_pbuf, (uint32_t []) { DBC_SAVE },
+ sizeof(uint32_t), true);
+ if (!status) goto gcap_bad_reply;
+
+ status = extend_packed_buffer(&out_pbuf, (uint32_t []) { error }, sizeof(uint32_t), true);
+ if (!status) goto gcap_bad_reply;
- if (!safe_send(fds[i].fd, (uint32_t []) { htobe32(error) }, sizeof(uint32_t), 0))
- goto gcap_bad_exchange;
+ status = send_packed_buffer(&out_pbuf, fds[i].fd);
+ if (!status) goto gcap_bad_reply;
+
+ exit_packed_buffer(&out_pbuf);
break;
case DBC_COLLECTION:
- status = safe_recv(fds[i].fd, &val32, sizeof(uint32_t), 0);
+ status = extract_packed_buffer(&in_pbuf, &tmp32, sizeof(uint32_t), true);
if (!status) goto gcap_bad_exchange;
- collec = find_collection_in_list(archive->collections, be32toh(val32));
+ collec = find_collection_in_list(archive->collections, tmp32);
if (collec == NULL) goto gcap_bad_exchange;
- status = g_db_collection_recv(collec, fds[i].fd, archive->db);
+ status = g_db_collection_unpack(collec, &in_pbuf, archive->db);
if (!status) goto gcap_bad_exchange;
printf("## CDB ## Got something for collection %p...\n", collec);
@@ -947,7 +970,7 @@ static void *g_cdb_archive_process(GCdbArchive *archive)
case DBC_SET_LAST_ACTIVE:
- status = update_activity_in_collections(archive->collections, fds[i].fd, archive->db);
+ status = update_activity_in_collections(archive->collections, &in_pbuf, archive->db);
if (!status) goto gcap_bad_exchange;
break;
@@ -959,12 +982,20 @@ static void *g_cdb_archive_process(GCdbArchive *archive)
}
+ exit_packed_buffer(&in_pbuf);
+
continue;
+ gcap_bad_reply:
+
+ exit_packed_buffer(&out_pbuf);
+
gcap_bad_exchange:
printf("Bad exchange...\n");
+ exit_packed_buffer(&in_pbuf);
+
/* TODO : close conn */
;
@@ -1018,6 +1049,8 @@ DBError g_cdb_archive_add_client(GCdbArchive *archive, int fd, const rle_string
GDbCollection *collec; /* Collection visée manipulée */
volatile pthread_t *process_id; /* Identifiant de la procédure */
+ packed_buffer out_pbuf; /* Tampon d'émission */
+ bool status; /* Bilan d'un envoi de retour */
@@ -1056,20 +1089,27 @@ DBError g_cdb_archive_add_client(GCdbArchive *archive, int fd, const rle_string
/* Envoi des mises à jour au nouveau client... */
+ init_packed_buffer(&out_pbuf);
+
+ status = true;
+
+
+ /* TODO : lock ? */
+
for (iter = g_list_first(archive->collections);
- iter != NULL;
+ iter != NULL && status;
iter = g_list_next(iter))
{
collec = G_DB_COLLECTION(iter->data);
- if (!g_db_collection_send_all_updates(collec, fd))
- /* TODO */;
-
-
+ status = g_db_collection_pack_all_updates(collec, &out_pbuf);
+ }
+ if (status)
+ status = send_packed_buffer(&out_pbuf, fd);
- }
+ exit_packed_buffer(&out_pbuf);