From 706710aef28a0af4bb8aa343c2631a2139d00955 Mon Sep 17 00:00:00 2001
From: Cyrille Bagard <nocbos@gmail.com>
Date: Mon, 17 Dec 2018 20:00:29 +0100
Subject: Updated the connection protocol.

---
 src/analysis/db/cdb.c        | 111 ++++++++++++++++++++--------------------
 src/analysis/db/client.c     |  51 ++++++++++++++++++-
 src/analysis/db/collection.c | 117 ++++++++++++++++++++++++++++++++++++-------
 src/analysis/db/collection.h |   6 ++-
 src/analysis/db/protocol.h   |  41 ++++++++++++---
 5 files changed, 243 insertions(+), 83 deletions(-)

diff --git a/src/analysis/db/cdb.c b/src/analysis/db/cdb.c
index 829be19..c2855f7 100644
--- a/src/analysis/db/cdb.c
+++ b/src/analysis/db/cdb.c
@@ -922,14 +922,33 @@ static void *g_cdb_archive_process(GCdbArchive *archive)
                         status = g_db_collection_unpack(collec, &in_pbuf, archive->db);
                         if (!status) goto gcap_bad_exchange;
 
-                        printf("## CDB ## Got something for collection %p...\n", collec);
+                        break;
+
+                    case DBC_GET_ALL_ITEMS:
+
+                        init_packed_buffer(&out_pbuf);
+
+                        status = extend_packed_buffer(&out_pbuf, (uint32_t []) { DBC_SET_ALL_ITEMS },
+                                                      sizeof(uint32_t), true);
+                        if (!status) goto gcap_bad_reply;
+
+                        status = pack_all_collection_updates(archive->collections, &out_pbuf);
+                        if (!status) goto gcap_bad_reply;
 
-                        //GDbCollection *find_collection_in_list(GList *, uint32_t);
+                        status = ssl_send_packed_buffer(&out_pbuf, archive->clients[i].ssl_fd);
+                        if (!status) goto gcap_bad_reply;
 
-                        //static GGenConfig *find_collection_in_list(GList *list, uint32_t id)
+                        exit_packed_buffer(&out_pbuf);
 
                         break;
 
+                    case DBC_SET_ALL_ITEMS:
+                        asprintf(&msg, _("This command is not available on this side: 0x%08x"), command);
+                        LOG_ERROR(LMT_ERROR, msg);
+                        free(msg);
+                        goto gcap_bad_exchange;
+                        break;
+
                     case DBC_SET_LAST_ACTIVE:
 
                         status = update_activity_in_collections(archive->collections, &in_pbuf, archive->db);
@@ -1005,69 +1024,55 @@ static void *g_cdb_archive_process(GCdbArchive *archive)
 
 void g_cdb_archive_add_client(GCdbArchive *archive, SSL *fd, const rle_string *user)
 {
-    packed_buffer out_pbuf;                 /* Tampon d'émission           */
-    bool status;                            /* Bilan d'un envoi de retour  */
-    GList *iter;                            /* Boucle de parcours          */
-    GDbCollection *collec;                  /* Collection visée manipulée  */
     volatile pthread_t *process_id;         /* Identifiant de la procédure */
 
-    /* Envoi des mises à jour au nouveau client... */
-
-    init_packed_buffer(&out_pbuf);
-
-    status = true;
+    /**
+     * La situation est un peu compliquée lors de l'accueil d'un nouveau client :
+     *
+     *    - soit on envoie tous les éléments à prendre en compte, mais on doit
+     *      bloquer la base de données jusqu'à l'intégration pleine et entière
+     *      du client, afin que ce dernier ne loupe pas l'envoi d'un nouvel
+     *      élément entre temps.
+     *
+     *    - soit on intègre le client et celui ci demande des mises à jour
+     *      collection par collection ; c'est également à lui qui revient le rejet
+     *      des éléments envoyés en solitaires avant la réception de la base
+     *      complète.
+     *
+     * On fait le choix du second scenario ici, du fait de la difficulté
+     * de maîtriser facilement la reconstitution d'une liste de clients dans
+     * g_cdb_archive_process() depuis un autre flot d'exécution.
+     */
+
+    /* Ajout dans la liste officielle */
 
-    for (iter = g_list_first(archive->collections);
-         iter != NULL && status;
-         iter = g_list_next(iter))
-    {
-        collec = G_DB_COLLECTION(iter->data);
-
-        status = g_db_collection_pack_all_updates(collec, &out_pbuf);
-
-    }
+    g_mutex_lock(&archive->clients_access);
 
-    if (status && get_packed_buffer_payload_length(&out_pbuf) > 0)
-        status = ssl_send_packed_buffer(&out_pbuf, fd);
+    archive->clients = realloc(archive->clients, ++archive->count * sizeof(cdb_client));
 
-    exit_packed_buffer(&out_pbuf);
+    archive->clients[archive->count - 1].ssl_fd = fd;
+    dup_into_rle_string(&archive->clients[archive->count - 1].user, get_rle_string(user));
 
-    if (!status)
-        LOG_ERROR(LMT_ERROR, _("Failed to add a client"));
+    /* Démarrage ou redémarrage du processus d'écoute */
 
-    else
+    if (archive->process == NULL)
     {
-        /* Ajout dans la liste officielle */
+        archive->process = g_thread_new("cdb_process", (GThreadFunc)g_cdb_archive_process, archive);
 
-        g_mutex_lock(&archive->clients_access);
-
-        archive->clients = realloc(archive->clients, ++archive->count * sizeof(cdb_client));
+        /* On attend que le processus parallèle soit prêt */
 
-        archive->clients[archive->count - 1].ssl_fd = fd;
-        dup_into_rle_string(&archive->clients[archive->count - 1].user, get_rle_string(user));
+        process_id = &archive->process_id;
 
-        /* Démarrage ou redémarrage du processus d'écoute */
-
-        if (archive->process == NULL)
-        {
-            archive->process = g_thread_new("cdb_process", (GThreadFunc)g_cdb_archive_process, archive);
-
-            /* On attend que le processus parallèle soit prêt */
-
-            process_id = &archive->process_id;
-
-            g_mutex_lock(&archive->id_access);
-            while (process_id == 0)
-                g_cond_wait(&archive->id_cond, &archive->id_access);
-            g_mutex_unlock(&archive->id_access);
-
-        }
-        else
-            pthread_kill(archive->process_id, SIGUSR1);
-
-        g_mutex_unlock(&archive->clients_access);
+        g_mutex_lock(&archive->id_access);
+        while (process_id == 0)
+            g_cond_wait(&archive->id_cond, &archive->id_access);
+        g_mutex_unlock(&archive->id_access);
 
     }
+    else
+        pthread_kill(archive->process_id, SIGUSR1);
+
+    g_mutex_unlock(&archive->clients_access);
 
 }
 
diff --git a/src/analysis/db/client.c b/src/analysis/db/client.c
index 25fe64d..24198f6 100644
--- a/src/analysis/db/client.c
+++ b/src/analysis/db/client.c
@@ -68,6 +68,7 @@ struct _GDbClient
     char *desc;                             /* Description du lien         */
 
     GMutex sending_lock;                    /* Concurrence des envois      */
+    bool can_get_updates;                   /* Réception de maj possibles ?*/
     GThread *update;                        /* Procédure de traitement     */
 
 };
@@ -519,6 +520,8 @@ static bool g_db_client_start_common(GDbClient *client, char *desc)
 
     }
 
+    client->can_get_updates = false;
+
     client->update = g_thread_try_new("cdb_client", (GThreadFunc)g_db_client_update, client, NULL);
     if (client->update == NULL)
     {
@@ -571,16 +574,44 @@ static bool g_db_client_start_common(GDbClient *client, char *desc)
 
 static void *g_db_client_update(GDbClient *client)
 {
+    packed_buffer out_pbuf;                 /* Tampon d'émission           */
+    bool status;                            /* Bilan d'une opération       */
     struct pollfd fds;                      /* Surveillance des flux       */
     packed_buffer in_pbuf;                  /* Tampon de réception         */
     int ret;                                /* Bilan d'un appel            */
     uint32_t tmp32;                         /* Valeur sur 32 bits          */
-    bool status;                            /* Bilan d'une opération       */
     uint32_t command;                       /* Commande de la requête      */
     DBError error;                          /* Bilan d'une commande passée */
     GDbCollection *collec;                  /* Collection visée au final   */
     char *msg;                              /* Message d'erreur à imprimer */
 
+    /**
+     * Avant toute chose, on demande un stage d'actualisation !
+     */
+
+    init_packed_buffer(&out_pbuf);
+
+    status = extend_packed_buffer(&out_pbuf, (uint32_t []) { DBC_GET_ALL_ITEMS }, sizeof(uint32_t), true);
+    if (!status)
+    {
+        exit_packed_buffer(&out_pbuf);
+        goto exit;
+    }
+
+    status = ssl_send_packed_buffer(&out_pbuf, client->tls_fd);
+    if (!status)
+    {
+        log_simple_message(LMT_INFO, _("Failed to get all updates"));
+        exit_packed_buffer(&out_pbuf);
+        goto exit;
+    }
+
+    exit_packed_buffer(&out_pbuf);
+
+    /**
+     * Phase d'écoute continue...
+     */
+
     fds.fd = client->fd;
     fds.events = POLLIN | POLLPRI;
 
@@ -631,11 +662,25 @@ static void *g_db_client_update(GDbClient *client)
                     collec = find_collection_in_list(client->collections, tmp32);
                     if (collec == NULL) goto gdcu_bad_exchange;
 
-                    status = g_db_collection_unpack(collec, &in_pbuf, NULL);
+                    if (client->can_get_updates)
+                        status = g_db_collection_unpack(collec, &in_pbuf, NULL);
+                    else
+                        status = _g_db_collection_unpack(collec, &in_pbuf, (DBAction []) { 0 }, NULL);
+
                     if (!status) goto gdcu_bad_exchange;
 
                     break;
 
+                case DBC_GET_ALL_ITEMS:
+                    log_variadic_message(LMT_INFO,
+                                         _("This command is not available on this side: 0x%08x"), command);
+                    goto gdcu_bad_exchange;
+                    break;
+
+                case DBC_SET_ALL_ITEMS:
+                    client->can_get_updates = true;
+                    break;
+
             }
 
             continue;
@@ -654,6 +699,8 @@ static void *g_db_client_update(GDbClient *client)
 
     }
 
+ exit:
+
     g_db_client_stop(client);
 
     exit_packed_buffer(&in_pbuf);
diff --git a/src/analysis/db/collection.c b/src/analysis/db/collection.c
index adb3ad1..2c6086a 100644
--- a/src/analysis/db/collection.c
+++ b/src/analysis/db/collection.c
@@ -271,9 +271,53 @@ const char *g_db_collection_get_name(const GDbCollection *collec)
 
 
 
+/******************************************************************************
+*                                                                             *
+*  Paramètres  : collec = ensemble d'éléments à considérer.                   *
+*                pbuf   = paquet de données où venir puiser les infos.        *
+*                action = commande de la requête. [OUT]                       *
+*                dest   = élément de collection ou NULL pour un rejet. [OUT]  *
+*                                                                             *
+*  Description : Réceptionne un élément depuis une requête réseau.            *
+*                                                                             *
+*  Retour      : Bilan de l'exécution de l'opération.                         *
+*                                                                             *
+*  Remarques   : -                                                            *
+*                                                                             *
+******************************************************************************/
 
+bool _g_db_collection_unpack(GDbCollection *collec, packed_buffer *pbuf, DBAction *action, GDbItem **dest)
+{
+    bool result;                            /* Bilan à faire remonter      */
+    uint32_t tmp32;                         /* Valeur sur 32 bits          */
+    GDbItem *item;                          /* Définition d'élément visé   */
 
+    result = extract_packed_buffer(pbuf, &tmp32, sizeof(uint32_t), true);
+    if (!result) goto exit;
+
+    *action = tmp32;
 
+    result = (*action >= 0 && *action < DBA_COUNT);
+    if (!result) goto exit;
+
+    item = g_object_new(collec->type, NULL);
+
+    result = g_db_item_unpack(item, pbuf);
+    if (!result) goto exit;
+
+    if (dest != NULL)
+        *dest = item;
+    else
+        g_object_unref(G_OBJECT(item));
+
+ exit:
+
+    if (!result)
+        g_object_unref(G_OBJECT(item));
+
+    return result;
+
+}
 
 
 /******************************************************************************
@@ -295,28 +339,13 @@ const char *g_db_collection_get_name(const GDbCollection *collec)
 bool g_db_collection_unpack(GDbCollection *collec, packed_buffer *pbuf, sqlite3 *db)
 {
     bool result;                            /* Bilan à faire remonter      */
-    uint32_t tmp32;                         /* Valeur sur 32 bits          */
-    bool status;                            /* Bilan de lecture initiale   */
     DBAction action;                        /* Commande de la requête      */
     GDbItem *item;                          /* Définition d'élément visé   */
     GList *found;                           /* Test de présence existante  */
     timestamp_t inactive;                   /* Horodatage de désactivation */
 
-    result = extract_packed_buffer(pbuf, &tmp32, sizeof(uint32_t), true);
-    action = tmp32;
-
-    if (action < 0 || action >= DBA_COUNT)
-        result = false;
-
-    if (!result)
-        return result;
-
-    item = g_object_new(collec->type, NULL);
-
-    status = g_db_item_unpack(item, pbuf);
-    if (!status) return false;
-
-    result = false;
+    result = _g_db_collection_unpack(collec, pbuf, &action, &item);
+    if (!result) return false;
 
     switch (action)
     {
@@ -459,7 +488,10 @@ bool g_db_collection_pack_all_updates(GDbCollection *collec, packed_buffer *pbuf
 
     result = true;
 
-    /* TODO : lock ? */
+    /**
+     * La gestion des accès s'effectue depuis le seul appelant : la fonction
+     * g_cdb_archive_add_client().
+    */
 
     for (iter = g_list_first(collec->items);
          iter != NULL && result;
@@ -1529,6 +1561,48 @@ void lock_unlock_collections(GList *list, bool write, bool lock)
 /******************************************************************************
 *                                                                             *
 *  Paramètres  : list = ensemble de collectons à traiter.                     *
+*                pbuf = paquet de données où venir inscrire des infos.        *
+*                                                                             *
+*  Description : Collecte les informations utiles pour un nouvel arrivant.    *
+*                                                                             *
+*  Retour      : Bilan du déroulement des opérations.                         *
+*                                                                             *
+*  Remarques   : -                                                            *
+*                                                                             *
+******************************************************************************/
+
+bool pack_all_collection_updates(GList *list, packed_buffer *pbuf)
+{
+    bool result;                            /* Bilan à retourner           */
+    GList *iter;                            /* Boucle de parcours          */
+    GDbCollection *collec;                  /* Collection visée manipulée  */
+
+    result = true;
+
+    /**
+     * Cette procédure n'est appelée que depuis g_cdb_archive_process(),
+     * qui bloque son exécution jusqu'à la fin des opérations.
+     *
+     * On a donc l'assurance d'un récupérer tous les éléments d'un coup,
+     * sans activité parallèle.
+     */
+
+    for (iter = g_list_first(list); iter != NULL && result; iter = g_list_next(iter))
+    {
+        collec = G_DB_COLLECTION(iter->data);
+
+        result = g_db_collection_pack_all_updates(collec, pbuf);
+
+    }
+
+    return result;
+
+}
+
+
+/******************************************************************************
+*                                                                             *
+*  Paramètres  : list = ensemble de collectons à traiter.                     *
 *                pbuf = paquet de données où venir puiser les infos.          *
 *                db   = base de données à mettre à jour.                      *
 *                                                                             *
@@ -1553,7 +1627,12 @@ bool update_activity_in_collections(GList *list, packed_buffer *pbuf, sqlite3 *d
     GList *i;                               /* Boucle de parcours #2       */
     GDbItem *item;                          /* Elément collecté à manipuler*/
 
-    /* TODO : lock ? */
+    /**
+     * Cette procédure n'est appelée que depuis g_cdb_archive_process(),
+     * qui bloque son exécution jusqu'à la fin des opérations.
+     *
+     * On a donc l'assurance d'un traitement global homgène des horodatages.
+     */
 
     status = unpack_timestamp(&timestamp, pbuf);
     if (!status) return false;
diff --git a/src/analysis/db/collection.h b/src/analysis/db/collection.h
index 0f0ad34..cbcf42c 100644
--- a/src/analysis/db/collection.h
+++ b/src/analysis/db/collection.h
@@ -71,7 +71,8 @@ uint32_t g_db_collection_get_feature(const GDbCollection *);
 const char *g_db_collection_get_name(const GDbCollection *);
 
 
-
+/* Réceptionne un élément depuis une requête réseau. */
+bool _g_db_collection_unpack(GDbCollection *, packed_buffer *, DBAction *, GDbItem **);
 
 /* Réceptionne et traite une requête réseau pour collection. */
 bool g_db_collection_unpack(GDbCollection *, packed_buffer *, sqlite3 *);
@@ -157,6 +158,9 @@ void lock_unlock_collections(GList *, bool, bool);
 #define rlock_collections(lst) lock_unlock_collections(lst, false, true);
 #define runlock_collections(lst) lock_unlock_collections(lst, false, false);
 
+/* Collecte les informations utiles pour un nouvel arrivant. */
+bool pack_all_collection_updates(GList *, packed_buffer *);
+
 /* Met à jour les statuts d'activité des éléments. */
 bool update_activity_in_collections(GList *, packed_buffer *, sqlite3 *);
 
diff --git a/src/analysis/db/protocol.h b/src/analysis/db/protocol.h
index 025f92f..311a691 100644
--- a/src/analysis/db/protocol.h
+++ b/src/analysis/db/protocol.h
@@ -117,22 +117,50 @@ typedef enum _DBCommand
     DBC_SAVE,                               /* Enregistrement de l'archive */
     DBC_COLLECTION,                         /* Implication d'une collection*/
 
+    /**
+     * Gestion de la commande 'DBC_[GS]ET_ALL_ITEMS'.
+     *
+     * Un client qui se connecte à un serveur doit en premier lieu envoyer :
+     *
+     *    [ Demande de mise à jour : DBC_GET_ALL_ITEMS  ]
+     *
+     * Tant qu'il ne reçoit pas la commande DBC_SET_ALL_ITEMS depuis le
+     * serveur, toutes les actions sur une collection sont à rejeter car elles
+     * lui seront retransmises plus tard.
+     *
+     * De son côté, le serveur répond par une requête :
+     *
+     *    [ Notification de maj : DBC_SET_ALL_ITEMS     ]
+     *
+     * Dans la foulée, il enverra ensuite les éléments avec des paquets classiques :
+     *
+     *    [ Traitement de collection : DBC_COLLECTION   ]
+     *    [ Action : DBA_ADD_ITEM                       ]
+     *    ...
+     *
+     * Les traitements se réalisent dans :
+     *  - g_db_client_update() pour la partie client.
+     *  - g_cdb_archive_process() pour la partie serveur.
+     *
+     */
 
+    DBC_GET_ALL_ITEMS,                      /* Mise à jour à la connexion  */
+    DBC_SET_ALL_ITEMS,                      /* Mise à jour à la connexion  */
 
     /**
      * Gestion de la commande 'DBC_SET_LAST_ACTIVE'.
      *
      * Le client connecté envoie un paquet de la forme suivante :
      *
-     *    [ Statut d'historique : DBC_SET_LAST_ACTIVE ]
-     *    [ <horodatage du dernier élément actif      ]
+     *    [ Statut d'historique : DBC_SET_LAST_ACTIVE   ]
+     *    [ <horodatage du dernier élément actif        ]
      *
      * Le serveur s'exécute et notifie le client d'éventuels changements,
      * avec une série de paquets de la forme :
      *
-     *    [ Traitement de collection : DBC_COLLECTION ]
-     *    [ Action : DBC_SET_LAST_ACTIVE              ]
-     *    [ <élément dont le statut a évolué>         ]
+     *    [ Traitement de collection : DBC_COLLECTION   ]
+     *    [ Action : DBC_SET_LAST_ACTIVE                ]
+     *    [ <élément dont le statut a évolué>           ]
      *
      * Les traitements se réalisent dans :
      *  - g_db_collection_set_last_active() pour la partie serveur.
@@ -142,9 +170,6 @@ typedef enum _DBCommand
 
     DBC_SET_LAST_ACTIVE,                    /* Définition du dernier actif */
 
-
-
-
     DBC_COUNT
 
 } DBCommand;
-- 
cgit v0.11.2-87-g4458