summaryrefslogtreecommitdiff
path: root/tests/analysis
diff options
context:
space:
mode:
Diffstat (limited to 'tests/analysis')
-rw-r--r--tests/analysis/db/conn.py125
1 files changed, 125 insertions, 0 deletions
diff --git a/tests/analysis/db/conn.py b/tests/analysis/db/conn.py
new file mode 100644
index 0000000..39c660a
--- /dev/null
+++ b/tests/analysis/db/conn.py
@@ -0,0 +1,125 @@
+
+from chrysacase import ChrysalideTestCase
+from pychrysalide.analysis.db import certs
+from pychrysalide.analysis.db import AdminClient
+from pychrysalide.analysis.db import HubServer
+import os
+import shutil
+import tempfile
+
+
+class TestDbConnection(ChrysalideTestCase):
+ """TestCase for analysis.db."""
+
+ @classmethod
+ def setUpClass(cls):
+
+ super(TestDbConnection, cls).setUpClass()
+
+ cls._tmp_path = tempfile.mkdtemp()
+
+ cls._server_path = '%s/.chrysalide/servers/localhost-9999/' % cls._tmp_path
+ os.makedirs(cls._server_path)
+
+ cls._server_authorized_path = '%s/authorized/' % cls._server_path
+ os.makedirs(cls._server_authorized_path)
+
+ cls._client_path = '%s/.chrysalide/clients/' % cls._tmp_path
+ os.makedirs(cls._client_path)
+
+ cls._client_cert_path = '%s/localhost-9999/' % cls._client_path
+ os.makedirs(cls._client_cert_path)
+
+ cls.log('Using temporary directory "%s"' % cls._tmp_path)
+
+
+ @classmethod
+ def tearDownClass(cls):
+
+ super(TestDbConnection, cls).tearDownClass()
+
+ os.system('ls -laihR %s' % cls._tmp_path)
+
+ cls.log('Delete directory "%s"' % cls._tmp_path)
+
+ shutil.rmtree(cls._tmp_path)
+
+
+ def testServerListening(self):
+ """List binaries available from server."""
+
+
+ from pychrysalide import core
+ #core.set_verbosity(0)
+
+
+
+ identity = {
+
+ 'C': 'FR',
+ 'CN': 'Test authority'
+
+ }
+
+ ret = certs.build_keys_and_ca(self._server_path, 'ca', 3650 * 24 * 60 * 60, identity)
+ self.assertTrue(ret)
+
+ identity = {
+
+ 'C': 'FR',
+ 'CN': 'Test server'
+
+ }
+
+ ret = certs.build_keys_and_request(self._server_path, 'server', identity);
+ self.assertTrue(ret)
+
+
+ ret = certs.sign_cert('%s/server-csr.pem' % self._server_path, '%s/ca-cert.pem' % self._server_path, \
+ '%s/ca-key.pem' % self._server_path, '%s/server-cert.pem' % self._server_path, \
+ 3650 * 24 * 60 * 60)
+ self.assertTrue(ret)
+
+ identity = {
+
+ 'C': 'FR',
+ 'CN': 'Test admin'
+
+ }
+
+ ret = certs.build_keys_and_request(self._client_path, 'client', identity);
+ self.assertTrue(ret)
+
+ ret = certs.sign_cert('%s/client-csr.pem' % self._client_path, '%s/ca-cert.pem' % self._server_path, \
+ '%s/ca-key.pem' % self._server_path, '%s/client-cert.pem' % self._client_cert_path, \
+ 3650 * 24 * 60 * 60)
+ self.assertTrue(ret)
+
+ shutil.copy('%s/ca-cert.pem' % self._server_path,
+ '%s/ca-cert.pem' % self._client_cert_path)
+
+ shutil.copy('%s/client-cert.pem' % self._client_cert_path,
+ '%s/client-cert.pem' % self._server_authorized_path)
+
+
+ os.environ['XDG_CONFIG_HOME'] = self._tmp_path
+ os.environ['HOME'] = self._tmp_path
+
+ server = HubServer('localhost', '9999')
+
+ #print(server)
+
+ ret = server.start()
+
+ #print(ret)
+
+ admin = AdminClient()
+
+ #print(admin)
+
+
+ ret = admin.start('localhost', '9999')
+
+ #print('FINAL::', ret)
+
+ #print(server)