summaryrefslogtreecommitdiff
path: root/src/analysis
diff options
context:
space:
mode:
authorCyrille Bagard <nocbos@gmail.com>2018-12-17 19:00:29 (GMT)
committerCyrille Bagard <nocbos@gmail.com>2018-12-17 19:00:29 (GMT)
commit706710aef28a0af4bb8aa343c2631a2139d00955 (patch)
tree08be7ab2f974bdc3dab6ecde426c069adb4e077b /src/analysis
parent97fa09113c7988e4b4639190ba9bc51f9ced4d33 (diff)
Updated the connection protocol.
Diffstat (limited to 'src/analysis')
-rw-r--r--src/analysis/db/cdb.c111
-rw-r--r--src/analysis/db/client.c51
-rw-r--r--src/analysis/db/collection.c117
-rw-r--r--src/analysis/db/collection.h6
-rw-r--r--src/analysis/db/protocol.h41
5 files changed, 243 insertions, 83 deletions
diff --git a/src/analysis/db/cdb.c b/src/analysis/db/cdb.c
index 829be19..c2855f7 100644
--- a/src/analysis/db/cdb.c
+++ b/src/analysis/db/cdb.c
@@ -922,14 +922,33 @@ static void *g_cdb_archive_process(GCdbArchive *archive)
status = g_db_collection_unpack(collec, &in_pbuf, archive->db);
if (!status) goto gcap_bad_exchange;
- printf("## CDB ## Got something for collection %p...\n", collec);
+ break;
+
+ case DBC_GET_ALL_ITEMS:
+
+ init_packed_buffer(&out_pbuf);
+
+ status = extend_packed_buffer(&out_pbuf, (uint32_t []) { DBC_SET_ALL_ITEMS },
+ sizeof(uint32_t), true);
+ if (!status) goto gcap_bad_reply;
+
+ status = pack_all_collection_updates(archive->collections, &out_pbuf);
+ if (!status) goto gcap_bad_reply;
- //GDbCollection *find_collection_in_list(GList *, uint32_t);
+ status = ssl_send_packed_buffer(&out_pbuf, archive->clients[i].ssl_fd);
+ if (!status) goto gcap_bad_reply;
- //static GGenConfig *find_collection_in_list(GList *list, uint32_t id)
+ exit_packed_buffer(&out_pbuf);
break;
+ case DBC_SET_ALL_ITEMS:
+ asprintf(&msg, _("This command is not available on this side: 0x%08x"), command);
+ LOG_ERROR(LMT_ERROR, msg);
+ free(msg);
+ goto gcap_bad_exchange;
+ break;
+
case DBC_SET_LAST_ACTIVE:
status = update_activity_in_collections(archive->collections, &in_pbuf, archive->db);
@@ -1005,69 +1024,55 @@ static void *g_cdb_archive_process(GCdbArchive *archive)
void g_cdb_archive_add_client(GCdbArchive *archive, SSL *fd, const rle_string *user)
{
- packed_buffer out_pbuf; /* Tampon d'émission */
- bool status; /* Bilan d'un envoi de retour */
- GList *iter; /* Boucle de parcours */
- GDbCollection *collec; /* Collection visée manipulée */
volatile pthread_t *process_id; /* Identifiant de la procédure */
- /* Envoi des mises à jour au nouveau client... */
-
- init_packed_buffer(&out_pbuf);
-
- status = true;
+ /**
+ * La situation est un peu compliquée lors de l'accueil d'un nouveau client :
+ *
+ * - soit on envoie tous les éléments à prendre en compte, mais on doit
+ * bloquer la base de données jusqu'à l'intégration pleine et entière
+ * du client, afin que ce dernier ne loupe pas l'envoi d'un nouvel
+ * élément entre temps.
+ *
+ * - soit on intègre le client et celui ci demande des mises à jour
+ * collection par collection ; c'est également à lui qui revient le rejet
+ * des éléments envoyés en solitaires avant la réception de la base
+ * complète.
+ *
+ * On fait le choix du second scenario ici, du fait de la difficulté
+ * de maîtriser facilement la reconstitution d'une liste de clients dans
+ * g_cdb_archive_process() depuis un autre flot d'exécution.
+ */
+
+ /* Ajout dans la liste officielle */
- for (iter = g_list_first(archive->collections);
- iter != NULL && status;
- iter = g_list_next(iter))
- {
- collec = G_DB_COLLECTION(iter->data);
-
- status = g_db_collection_pack_all_updates(collec, &out_pbuf);
-
- }
+ g_mutex_lock(&archive->clients_access);
- if (status && get_packed_buffer_payload_length(&out_pbuf) > 0)
- status = ssl_send_packed_buffer(&out_pbuf, fd);
+ archive->clients = realloc(archive->clients, ++archive->count * sizeof(cdb_client));
- exit_packed_buffer(&out_pbuf);
+ archive->clients[archive->count - 1].ssl_fd = fd;
+ dup_into_rle_string(&archive->clients[archive->count - 1].user, get_rle_string(user));
- if (!status)
- LOG_ERROR(LMT_ERROR, _("Failed to add a client"));
+ /* Démarrage ou redémarrage du processus d'écoute */
- else
+ if (archive->process == NULL)
{
- /* Ajout dans la liste officielle */
+ archive->process = g_thread_new("cdb_process", (GThreadFunc)g_cdb_archive_process, archive);
- g_mutex_lock(&archive->clients_access);
-
- archive->clients = realloc(archive->clients, ++archive->count * sizeof(cdb_client));
+ /* On attend que le processus parallèle soit prêt */
- archive->clients[archive->count - 1].ssl_fd = fd;
- dup_into_rle_string(&archive->clients[archive->count - 1].user, get_rle_string(user));
+ process_id = &archive->process_id;
- /* Démarrage ou redémarrage du processus d'écoute */
-
- if (archive->process == NULL)
- {
- archive->process = g_thread_new("cdb_process", (GThreadFunc)g_cdb_archive_process, archive);
-
- /* On attend que le processus parallèle soit prêt */
-
- process_id = &archive->process_id;
-
- g_mutex_lock(&archive->id_access);
- while (process_id == 0)
- g_cond_wait(&archive->id_cond, &archive->id_access);
- g_mutex_unlock(&archive->id_access);
-
- }
- else
- pthread_kill(archive->process_id, SIGUSR1);
-
- g_mutex_unlock(&archive->clients_access);
+ g_mutex_lock(&archive->id_access);
+ while (process_id == 0)
+ g_cond_wait(&archive->id_cond, &archive->id_access);
+ g_mutex_unlock(&archive->id_access);
}
+ else
+ pthread_kill(archive->process_id, SIGUSR1);
+
+ g_mutex_unlock(&archive->clients_access);
}
diff --git a/src/analysis/db/client.c b/src/analysis/db/client.c
index 25fe64d..24198f6 100644
--- a/src/analysis/db/client.c
+++ b/src/analysis/db/client.c
@@ -68,6 +68,7 @@ struct _GDbClient
char *desc; /* Description du lien */
GMutex sending_lock; /* Concurrence des envois */
+ bool can_get_updates; /* Réception de maj possibles ?*/
GThread *update; /* Procédure de traitement */
};
@@ -519,6 +520,8 @@ static bool g_db_client_start_common(GDbClient *client, char *desc)
}
+ client->can_get_updates = false;
+
client->update = g_thread_try_new("cdb_client", (GThreadFunc)g_db_client_update, client, NULL);
if (client->update == NULL)
{
@@ -571,16 +574,44 @@ static bool g_db_client_start_common(GDbClient *client, char *desc)
static void *g_db_client_update(GDbClient *client)
{
+ packed_buffer out_pbuf; /* Tampon d'émission */
+ bool status; /* Bilan d'une opération */
struct pollfd fds; /* Surveillance des flux */
packed_buffer in_pbuf; /* Tampon de réception */
int ret; /* Bilan d'un appel */
uint32_t tmp32; /* Valeur sur 32 bits */
- bool status; /* Bilan d'une opération */
uint32_t command; /* Commande de la requête */
DBError error; /* Bilan d'une commande passée */
GDbCollection *collec; /* Collection visée au final */
char *msg; /* Message d'erreur à imprimer */
+ /**
+ * Avant toute chose, on demande un stage d'actualisation !
+ */
+
+ init_packed_buffer(&out_pbuf);
+
+ status = extend_packed_buffer(&out_pbuf, (uint32_t []) { DBC_GET_ALL_ITEMS }, sizeof(uint32_t), true);
+ if (!status)
+ {
+ exit_packed_buffer(&out_pbuf);
+ goto exit;
+ }
+
+ status = ssl_send_packed_buffer(&out_pbuf, client->tls_fd);
+ if (!status)
+ {
+ log_simple_message(LMT_INFO, _("Failed to get all updates"));
+ exit_packed_buffer(&out_pbuf);
+ goto exit;
+ }
+
+ exit_packed_buffer(&out_pbuf);
+
+ /**
+ * Phase d'écoute continue...
+ */
+
fds.fd = client->fd;
fds.events = POLLIN | POLLPRI;
@@ -631,11 +662,25 @@ static void *g_db_client_update(GDbClient *client)
collec = find_collection_in_list(client->collections, tmp32);
if (collec == NULL) goto gdcu_bad_exchange;
- status = g_db_collection_unpack(collec, &in_pbuf, NULL);
+ if (client->can_get_updates)
+ status = g_db_collection_unpack(collec, &in_pbuf, NULL);
+ else
+ status = _g_db_collection_unpack(collec, &in_pbuf, (DBAction []) { 0 }, NULL);
+
if (!status) goto gdcu_bad_exchange;
break;
+ case DBC_GET_ALL_ITEMS:
+ log_variadic_message(LMT_INFO,
+ _("This command is not available on this side: 0x%08x"), command);
+ goto gdcu_bad_exchange;
+ break;
+
+ case DBC_SET_ALL_ITEMS:
+ client->can_get_updates = true;
+ break;
+
}
continue;
@@ -654,6 +699,8 @@ static void *g_db_client_update(GDbClient *client)
}
+ exit:
+
g_db_client_stop(client);
exit_packed_buffer(&in_pbuf);
diff --git a/src/analysis/db/collection.c b/src/analysis/db/collection.c
index adb3ad1..2c6086a 100644
--- a/src/analysis/db/collection.c
+++ b/src/analysis/db/collection.c
@@ -271,9 +271,53 @@ const char *g_db_collection_get_name(const GDbCollection *collec)
+/******************************************************************************
+* *
+* Paramètres : collec = ensemble d'éléments à considérer. *
+* pbuf = paquet de données où venir puiser les infos. *
+* action = commande de la requête. [OUT] *
+* dest = élément de collection ou NULL pour un rejet. [OUT] *
+* *
+* Description : Réceptionne un élément depuis une requête réseau. *
+* *
+* Retour : Bilan de l'exécution de l'opération. *
+* *
+* Remarques : - *
+* *
+******************************************************************************/
+bool _g_db_collection_unpack(GDbCollection *collec, packed_buffer *pbuf, DBAction *action, GDbItem **dest)
+{
+ bool result; /* Bilan à faire remonter */
+ uint32_t tmp32; /* Valeur sur 32 bits */
+ GDbItem *item; /* Définition d'élément visé */
+ result = extract_packed_buffer(pbuf, &tmp32, sizeof(uint32_t), true);
+ if (!result) goto exit;
+
+ *action = tmp32;
+ result = (*action >= 0 && *action < DBA_COUNT);
+ if (!result) goto exit;
+
+ item = g_object_new(collec->type, NULL);
+
+ result = g_db_item_unpack(item, pbuf);
+ if (!result) goto exit;
+
+ if (dest != NULL)
+ *dest = item;
+ else
+ g_object_unref(G_OBJECT(item));
+
+ exit:
+
+ if (!result)
+ g_object_unref(G_OBJECT(item));
+
+ return result;
+
+}
/******************************************************************************
@@ -295,28 +339,13 @@ const char *g_db_collection_get_name(const GDbCollection *collec)
bool g_db_collection_unpack(GDbCollection *collec, packed_buffer *pbuf, sqlite3 *db)
{
bool result; /* Bilan à faire remonter */
- uint32_t tmp32; /* Valeur sur 32 bits */
- bool status; /* Bilan de lecture initiale */
DBAction action; /* Commande de la requête */
GDbItem *item; /* Définition d'élément visé */
GList *found; /* Test de présence existante */
timestamp_t inactive; /* Horodatage de désactivation */
- result = extract_packed_buffer(pbuf, &tmp32, sizeof(uint32_t), true);
- action = tmp32;
-
- if (action < 0 || action >= DBA_COUNT)
- result = false;
-
- if (!result)
- return result;
-
- item = g_object_new(collec->type, NULL);
-
- status = g_db_item_unpack(item, pbuf);
- if (!status) return false;
-
- result = false;
+ result = _g_db_collection_unpack(collec, pbuf, &action, &item);
+ if (!result) return false;
switch (action)
{
@@ -459,7 +488,10 @@ bool g_db_collection_pack_all_updates(GDbCollection *collec, packed_buffer *pbuf
result = true;
- /* TODO : lock ? */
+ /**
+ * La gestion des accès s'effectue depuis le seul appelant : la fonction
+ * g_cdb_archive_add_client().
+ */
for (iter = g_list_first(collec->items);
iter != NULL && result;
@@ -1529,6 +1561,48 @@ void lock_unlock_collections(GList *list, bool write, bool lock)
/******************************************************************************
* *
* Paramètres : list = ensemble de collectons à traiter. *
+* pbuf = paquet de données où venir inscrire des infos. *
+* *
+* Description : Collecte les informations utiles pour un nouvel arrivant. *
+* *
+* Retour : Bilan du déroulement des opérations. *
+* *
+* Remarques : - *
+* *
+******************************************************************************/
+
+bool pack_all_collection_updates(GList *list, packed_buffer *pbuf)
+{
+ bool result; /* Bilan à retourner */
+ GList *iter; /* Boucle de parcours */
+ GDbCollection *collec; /* Collection visée manipulée */
+
+ result = true;
+
+ /**
+ * Cette procédure n'est appelée que depuis g_cdb_archive_process(),
+ * qui bloque son exécution jusqu'à la fin des opérations.
+ *
+ * On a donc l'assurance d'un récupérer tous les éléments d'un coup,
+ * sans activité parallèle.
+ */
+
+ for (iter = g_list_first(list); iter != NULL && result; iter = g_list_next(iter))
+ {
+ collec = G_DB_COLLECTION(iter->data);
+
+ result = g_db_collection_pack_all_updates(collec, pbuf);
+
+ }
+
+ return result;
+
+}
+
+
+/******************************************************************************
+* *
+* Paramètres : list = ensemble de collectons à traiter. *
* pbuf = paquet de données où venir puiser les infos. *
* db = base de données à mettre à jour. *
* *
@@ -1553,7 +1627,12 @@ bool update_activity_in_collections(GList *list, packed_buffer *pbuf, sqlite3 *d
GList *i; /* Boucle de parcours #2 */
GDbItem *item; /* Elément collecté à manipuler*/
- /* TODO : lock ? */
+ /**
+ * Cette procédure n'est appelée que depuis g_cdb_archive_process(),
+ * qui bloque son exécution jusqu'à la fin des opérations.
+ *
+ * On a donc l'assurance d'un traitement global homgène des horodatages.
+ */
status = unpack_timestamp(&timestamp, pbuf);
if (!status) return false;
diff --git a/src/analysis/db/collection.h b/src/analysis/db/collection.h
index 0f0ad34..cbcf42c 100644
--- a/src/analysis/db/collection.h
+++ b/src/analysis/db/collection.h
@@ -71,7 +71,8 @@ uint32_t g_db_collection_get_feature(const GDbCollection *);
const char *g_db_collection_get_name(const GDbCollection *);
-
+/* Réceptionne un élément depuis une requête réseau. */
+bool _g_db_collection_unpack(GDbCollection *, packed_buffer *, DBAction *, GDbItem **);
/* Réceptionne et traite une requête réseau pour collection. */
bool g_db_collection_unpack(GDbCollection *, packed_buffer *, sqlite3 *);
@@ -157,6 +158,9 @@ void lock_unlock_collections(GList *, bool, bool);
#define rlock_collections(lst) lock_unlock_collections(lst, false, true);
#define runlock_collections(lst) lock_unlock_collections(lst, false, false);
+/* Collecte les informations utiles pour un nouvel arrivant. */
+bool pack_all_collection_updates(GList *, packed_buffer *);
+
/* Met à jour les statuts d'activité des éléments. */
bool update_activity_in_collections(GList *, packed_buffer *, sqlite3 *);
diff --git a/src/analysis/db/protocol.h b/src/analysis/db/protocol.h
index 025f92f..311a691 100644
--- a/src/analysis/db/protocol.h
+++ b/src/analysis/db/protocol.h
@@ -117,22 +117,50 @@ typedef enum _DBCommand
DBC_SAVE, /* Enregistrement de l'archive */
DBC_COLLECTION, /* Implication d'une collection*/
+ /**
+ * Gestion de la commande 'DBC_[GS]ET_ALL_ITEMS'.
+ *
+ * Un client qui se connecte à un serveur doit en premier lieu envoyer :
+ *
+ * [ Demande de mise à jour : DBC_GET_ALL_ITEMS ]
+ *
+ * Tant qu'il ne reçoit pas la commande DBC_SET_ALL_ITEMS depuis le
+ * serveur, toutes les actions sur une collection sont à rejeter car elles
+ * lui seront retransmises plus tard.
+ *
+ * De son côté, le serveur répond par une requête :
+ *
+ * [ Notification de maj : DBC_SET_ALL_ITEMS ]
+ *
+ * Dans la foulée, il enverra ensuite les éléments avec des paquets classiques :
+ *
+ * [ Traitement de collection : DBC_COLLECTION ]
+ * [ Action : DBA_ADD_ITEM ]
+ * ...
+ *
+ * Les traitements se réalisent dans :
+ * - g_db_client_update() pour la partie client.
+ * - g_cdb_archive_process() pour la partie serveur.
+ *
+ */
+ DBC_GET_ALL_ITEMS, /* Mise à jour à la connexion */
+ DBC_SET_ALL_ITEMS, /* Mise à jour à la connexion */
/**
* Gestion de la commande 'DBC_SET_LAST_ACTIVE'.
*
* Le client connecté envoie un paquet de la forme suivante :
*
- * [ Statut d'historique : DBC_SET_LAST_ACTIVE ]
- * [ <horodatage du dernier élément actif ]
+ * [ Statut d'historique : DBC_SET_LAST_ACTIVE ]
+ * [ <horodatage du dernier élément actif ]
*
* Le serveur s'exécute et notifie le client d'éventuels changements,
* avec une série de paquets de la forme :
*
- * [ Traitement de collection : DBC_COLLECTION ]
- * [ Action : DBC_SET_LAST_ACTIVE ]
- * [ <élément dont le statut a évolué> ]
+ * [ Traitement de collection : DBC_COLLECTION ]
+ * [ Action : DBC_SET_LAST_ACTIVE ]
+ * [ <élément dont le statut a évolué> ]
*
* Les traitements se réalisent dans :
* - g_db_collection_set_last_active() pour la partie serveur.
@@ -142,9 +170,6 @@ typedef enum _DBCommand
DBC_SET_LAST_ACTIVE, /* Définition du dernier actif */
-
-
-
DBC_COUNT
} DBCommand;