diff options
author | Cyrille Bagard <nocbos@gmail.com> | 2022-02-21 06:54:10 (GMT) |
---|---|---|
committer | Cyrille Bagard <nocbos@gmail.com> | 2022-02-21 06:54:10 (GMT) |
commit | 8bf77ba6e5ef40d8bb936dc952ac2c8cc30aab3e (patch) | |
tree | 934661f2f30e0b8d23793f52084c736052e85f7c /tests/analysis/db | |
parent | 8ce244d136b32b43d8553866747c68b503b2d10a (diff) |
Make the server drive the network exchanges.
Diffstat (limited to 'tests/analysis/db')
-rw-r--r-- | tests/analysis/db/analyst.py | 188 | ||||
-rw-r--r-- | tests/analysis/db/conn.py | 43 |
2 files changed, 219 insertions, 12 deletions
diff --git a/tests/analysis/db/analyst.py b/tests/analysis/db/analyst.py new file mode 100644 index 0000000..7347810 --- /dev/null +++ b/tests/analysis/db/analyst.py @@ -0,0 +1,188 @@ + +from chrysacase import ChrysalideTestCase +import pychrysalide +from pychrysalide.analysis.contents import MemoryContent +from pychrysalide.analysis.db import certs +from pychrysalide.analysis.db import AdminClient, AnalystClient +from pychrysalide.analysis.db import HubServer +import os +import shutil +import sys +import tempfile +import threading + + +class TestDbConnection(ChrysalideTestCase): + """TestCase for analysis.db.""" + + @classmethod + def setUpClass(cls): + + super(TestDbConnection, cls).setUpClass() + + cls.log('Compile binary "strings" if needed...') + + fullname = sys.modules[cls.__module__].__file__ + dirpath = os.path.dirname(fullname) + + cls._bin_path = os.path.realpath(dirpath + '/../../format/elf/') + + os.system('make -C %s strings > /dev/null 2>&1' % cls._bin_path) + + cls._tmp_path = tempfile.mkdtemp() + + cls._server_path = '%s/.chrysalide/servers/localhost-9999/' % cls._tmp_path + os.makedirs(cls._server_path) + + cls._server_authorized_path = '%s/authorized/' % cls._server_path + os.makedirs(cls._server_authorized_path) + + cls._client_path = '%s/.chrysalide/clients/' % cls._tmp_path + os.makedirs(cls._client_path) + + cls._client_cert_path = '%s/localhost-9999/' % cls._client_path + os.makedirs(cls._client_cert_path) + + cls.log('Using temporary directory "%s"' % cls._tmp_path) + + + @classmethod + def tearDownClass(cls): + + super(TestDbConnection, cls).tearDownClass() + + cls.log('Delete built binaries...') + + os.system('make -C %s clean > /dev/null 2>&1' % cls._bin_path) + + ## os.system('ls -laihR %s' % cls._tmp_path) + + cls.log('Delete directory "%s"' % cls._tmp_path) + + shutil.rmtree(cls._tmp_path) + + + def testServerListening(self): + """List binaries available from server.""" + + + from pychrysalide import core + core.set_verbosity(0) + + + + identity = { + + 'C': 'FR', + 'CN': 'Test authority' + + } + + ret = certs.build_keys_and_ca(self._server_path, 'ca', 3650 * 24 * 60 * 60, identity) + self.assertTrue(ret) + + identity = { + + 'C': 'FR', + 'CN': 'Test server' + + } + + ret = certs.build_keys_and_request(self._server_path, 'server', identity); + self.assertTrue(ret) + + + ret = certs.sign_cert('%s/server-csr.pem' % self._server_path, '%s/ca-cert.pem' % self._server_path, \ + '%s/ca-key.pem' % self._server_path, '%s/server-cert.pem' % self._server_path, \ + 3650 * 24 * 60 * 60) + self.assertTrue(ret) + + identity = { + + 'C': 'FR', + 'CN': 'Test admin' + + } + + ret = certs.build_keys_and_request(self._client_path, 'client', identity); + self.assertTrue(ret) + + ret = certs.sign_cert('%s/client-csr.pem' % self._client_path, '%s/ca-cert.pem' % self._server_path, \ + '%s/ca-key.pem' % self._server_path, '%s/client-cert.pem' % self._client_cert_path, \ + 3650 * 24 * 60 * 60) + self.assertTrue(ret) + + shutil.copy('%s/ca-cert.pem' % self._server_path, + '%s/ca-cert.pem' % self._client_cert_path) + + shutil.copy('%s/client-cert.pem' % self._client_cert_path, + '%s/client-cert.pem' % self._server_authorized_path) + + + os.environ['XDG_CONFIG_HOME'] = self._tmp_path + os.environ['HOME'] = self._tmp_path + + server = HubServer('localhost', '9999') + + #print(server) + + ret = server.start() + + #print(ret) + + + + + # admin = AdminClient() + + # ret = admin.start('localhost', '9999') + # self.assertTrue(ret) + + # def _on_existing_binaries_updated(adm, evt): + # evt.set() + + # event = threading.Event() + + # admin.connect('existing-binaries-updated', _on_existing_binaries_updated, event) + + # ret = admin.request_existing_binaries() + # self.assertTrue(ret) + + # event.wait() + + # self.assertEqual(len(admin.existing_binaries), 0) + + + + cnt = MemoryContent(b'A' * 400 * 1024) + + + + analyst = AnalystClient(cnt.checksum, "elf", []) + + + + def _on_server_status_changed(analyst, hint, evt): + print(hint) + evt.set() + + + event = threading.Event() + + analyst.connect('server-status-changed', _on_server_status_changed, event) + + + + ret = analyst.start('localhost', '9999') + self.assertTrue(ret) + + + event.wait() + + event.clear() + + ret = analyst.send_content(cnt) + self.assertTrue(ret) + + + event.wait() diff --git a/tests/analysis/db/conn.py b/tests/analysis/db/conn.py index f388f60..248a036 100644 --- a/tests/analysis/db/conn.py +++ b/tests/analysis/db/conn.py @@ -1,7 +1,8 @@ from chrysacase import ChrysalideTestCase +from pychrysalide.analysis.contents import MemoryContent from pychrysalide.analysis.db import certs -from pychrysalide.analysis.db import AdminClient +from pychrysalide.analysis.db import AdminClient, AnalystClient from pychrysalide.analysis.db import HubServer import os import shutil @@ -51,7 +52,7 @@ class TestDbConnection(ChrysalideTestCase): from pychrysalide import core - #core.set_verbosity(0) + core.set_verbosity(0) @@ -117,21 +118,39 @@ class TestDbConnection(ChrysalideTestCase): - admin = AdminClient() + # admin = AdminClient() - ret = admin.start('localhost', '9999') - self.assertTrue(ret) + # ret = admin.start('localhost', '9999') + # self.assertTrue(ret) + + # def _on_existing_binaries_updated(adm, evt): + # evt.set() + + # event = threading.Event() + + # admin.connect('existing-binaries-updated', _on_existing_binaries_updated, event) + + # ret = admin.request_existing_binaries() + # self.assertTrue(ret) + + # event.wait() - def _on_existing_binaries_updated(adm, evt): - evt.set() + # self.assertEqual(len(admin.existing_binaries), 0) - event = threading.Event() - admin.connect('existing-binaries-updated', _on_existing_binaries_updated, event) - ret = admin.request_existing_binaries() + cnt = MemoryContent(b'A' * 400 * 1024) + + print(cnt) + + print(len(cnt.data)) + + + analyst = AnalystClient(cnt.checksum, []) + + ret = analyst.start('localhost', '9999') self.assertTrue(ret) - event.wait() - self.assertEqual(len(admin.existing_binaries), 0) + ret = analyst.send_content(cnt) + self.assertTrue(ret) |