from chrysacase import ChrysalideTestCase from pychrysalide.analysis.db import certs from pychrysalide.analysis.db import AdminClient from pychrysalide.analysis.db import HubServer import os import shutil import tempfile import threading class TestDbConnection(ChrysalideTestCase): """TestCase for analysis.db.""" @classmethod def setUpClass(cls): super(TestDbConnection, cls).setUpClass() cls._tmp_path = tempfile.mkdtemp() cls._server_path = '%s/.chrysalide/servers/localhost-9999/' % cls._tmp_path os.makedirs(cls._server_path) cls._server_authorized_path = '%s/authorized/' % cls._server_path os.makedirs(cls._server_authorized_path) cls._client_path = '%s/.chrysalide/clients/' % cls._tmp_path os.makedirs(cls._client_path) cls._client_cert_path = '%s/localhost-9999/' % cls._client_path os.makedirs(cls._client_cert_path) cls.log('Using temporary directory "%s"' % cls._tmp_path) @classmethod def tearDownClass(cls): super(TestDbConnection, cls).tearDownClass() os.system('ls -laihR %s' % cls._tmp_path) cls.log('Delete directory "%s"' % cls._tmp_path) shutil.rmtree(cls._tmp_path) def testServerListening(self): """List binaries available from server.""" from pychrysalide import core #core.set_verbosity(0) identity = { 'C': 'FR', 'CN': 'Test authority' } ret = certs.build_keys_and_ca(self._server_path, 'ca', 3650 * 24 * 60 * 60, identity) self.assertTrue(ret) identity = { 'C': 'FR', 'CN': 'Test server' } ret = certs.build_keys_and_request(self._server_path, 'server', identity); self.assertTrue(ret) ret = certs.sign_cert('%s/server-csr.pem' % self._server_path, '%s/ca-cert.pem' % self._server_path, \ '%s/ca-key.pem' % self._server_path, '%s/server-cert.pem' % self._server_path, \ 3650 * 24 * 60 * 60) self.assertTrue(ret) identity = { 'C': 'FR', 'CN': 'Test admin' } ret = certs.build_keys_and_request(self._client_path, 'client', identity); self.assertTrue(ret) ret = certs.sign_cert('%s/client-csr.pem' % self._client_path, '%s/ca-cert.pem' % self._server_path, \ '%s/ca-key.pem' % self._server_path, '%s/client-cert.pem' % self._client_cert_path, \ 3650 * 24 * 60 * 60) self.assertTrue(ret) shutil.copy('%s/ca-cert.pem' % self._server_path, '%s/ca-cert.pem' % self._client_cert_path) shutil.copy('%s/client-cert.pem' % self._client_cert_path, '%s/client-cert.pem' % self._server_authorized_path) os.environ['XDG_CONFIG_HOME'] = self._tmp_path os.environ['HOME'] = self._tmp_path server = HubServer('localhost', '9999') #print(server) ret = server.start() #print(ret) admin = AdminClient() ret = admin.start('localhost', '9999') self.assertTrue(ret) def _on_existing_binaries_updated(adm, evt): evt.set() event = threading.Event() admin.connect('existing-binaries-updated', _on_existing_binaries_updated, event) ret = admin.request_existing_binaries() self.assertTrue(ret) event.wait() self.assertEqual(len(admin.existing_binaries), 0)