diff options
Diffstat (limited to 'tests/analysis')
-rw-r--r-- | tests/analysis/cattribs.py | 42 | ||||
-rw-r--r-- | tests/analysis/contents/checksum.py | 2 | ||||
-rw-r--r-- | tests/analysis/contents/endian.py | 50 | ||||
-rw-r--r-- | tests/analysis/contents/memory.py | 18 | ||||
-rw-r--r-- | tests/analysis/contents/restricted.py | 84 | ||||
-rw-r--r-- | tests/analysis/db/analyst.py | 188 | ||||
-rw-r--r-- | tests/analysis/db/conn.py | 43 | ||||
-rw-r--r-- | tests/analysis/scan/booleans.py | 98 | ||||
-rw-r--r-- | tests/analysis/scan/common.py | 54 | ||||
-rw-r--r-- | tests/analysis/scan/examples.py | 70 | ||||
-rw-r--r-- | tests/analysis/scan/functions.py | 239 | ||||
-rw-r--r-- | tests/analysis/scan/fuzzing.py | 289 | ||||
-rw-r--r-- | tests/analysis/scan/grammar.py | 484 | ||||
-rw-r--r-- | tests/analysis/scan/matches.py | 64 | ||||
-rw-r--r-- | tests/analysis/scan/pyapi.py | 297 | ||||
-rw-r--r-- | tests/analysis/scan/scanning_hex.py | 716 | ||||
-rw-r--r-- | tests/analysis/scan/scanning_str.py | 194 | ||||
-rw-r--r-- | tests/analysis/scan/sets.py | 118 |
18 files changed, 2947 insertions, 103 deletions
diff --git a/tests/analysis/cattribs.py b/tests/analysis/cattribs.py index 1a7f7da..e388afc 100644 --- a/tests/analysis/cattribs.py +++ b/tests/analysis/cattribs.py @@ -15,10 +15,10 @@ class TestProjectFeatures(ChrysalideTestCase): def testEmptyContentAttributeSet(self): """Check properties of empty content attribute set.""" - attribs = ContentAttributes('') - self.assertIsNotNone(attribs) + attribs, filename = ContentAttributes('') - self.assertIsNone(attribs.filename) + self.assertIsNotNone(attribs) + self.assertIsNone(filename) self.assertEqual(len(attribs.keys), 0) @@ -33,16 +33,16 @@ class TestProjectFeatures(ChrysalideTestCase): 'dddd': '3', } - filename = 'filename' - path = filename + orig_filename = 'filename' + path = orig_filename for k in model.keys(): path += '&%s=%s' % (k, model[k]) - attribs = ContentAttributes(path) - self.assertIsNotNone(attribs) + attribs, filename = ContentAttributes(path) - self.assertEqual(attribs.filename, filename) + self.assertIsNotNone(attribs) + self.assertEqual(orig_filename, filename) kcount = 0 @@ -73,10 +73,10 @@ class TestProjectFeatures(ChrysalideTestCase): for k in model.keys(): path += '&e%s=%s' % (k, model[k]) - attribs = ContentAttributes(path) - self.assertIsNotNone(attribs) + attribs, filename = ContentAttributes(path) - self.assertIsNone(attribs.filename) + self.assertIsNotNone(attribs) + self.assertIsNone(filename) kcount = 0 @@ -92,31 +92,31 @@ class TestProjectFeatures(ChrysalideTestCase): path = '&&' - attribs = ContentAttributes(path) - self.assertIsNotNone(attribs) + attribs, filename = ContentAttributes(path) - self.assertIsNone(attribs.filename) + self.assertIsNotNone(attribs) + self.assertIsNone(filename) self.assertEqual(len(attribs.keys), 0) path = '&&&' - attribs = ContentAttributes(path) - self.assertIsNotNone(attribs) + attribs, filename = ContentAttributes(path) - self.assertIsNone(attribs.filename) + self.assertIsNotNone(attribs) + self.assertIsNone(filename) self.assertEqual(len(attribs.keys), 0) path = 'filename' - attribs = ContentAttributes(path) + attribs, filename = ContentAttributes(path) + self.assertIsNotNone(attribs) + self.assertEqual(filename, path) self.assertEqual(len(attribs.keys), 0) - self.assertEqual(attribs.filename, path) - def testContentAttributesKeyAccess(self): """Test some access keys for content attributes.""" @@ -130,7 +130,7 @@ class TestProjectFeatures(ChrysalideTestCase): for k in model.keys(): path += '&%s=%s' % (k, model[k]) - attribs = ContentAttributes(path) + attribs, _ = ContentAttributes(path) self.assertIsNotNone(attribs) with self.assertRaisesRegex(Exception, 'key must be a string value'): diff --git a/tests/analysis/contents/checksum.py b/tests/analysis/contents/checksum.py index 54e4630..fd0c3ed 100644 --- a/tests/analysis/contents/checksum.py +++ b/tests/analysis/contents/checksum.py @@ -56,7 +56,7 @@ class TestRestrictedContent(ChrysalideTestCase): fcnt = FileContent(self._out.name) self.assertIsNotNone(fcnt) - start = vmpa(4, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(4, vmpa.VmpaSpecialValue.NO_VIRTUAL) covered = mrange(start, 4) # 'BBBB' rcnt = RestrictedContent(fcnt, covered) diff --git a/tests/analysis/contents/endian.py b/tests/analysis/contents/endian.py index 77ed77a..b7c8bec 100644 --- a/tests/analysis/contents/endian.py +++ b/tests/analysis/contents/endian.py @@ -6,7 +6,7 @@ from chrysacase import ChrysalideTestCase -from pychrysalide import arch +from pychrysalide.analysis import BinContent from pychrysalide.analysis.contents import FileContent, RestrictedContent from pychrysalide.arch import vmpa import tempfile @@ -53,38 +53,38 @@ class TestEndianness(ChrysalideTestCase): # 16 bits - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) - val = fcnt.read_u16(start, arch.SRE_LITTLE_WORD) + val = fcnt.read_u16(start, BinContent.SourceEndian.LITTLE_WORD) self.assertEqual(val, 0x1516) - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) - val = fcnt.read_u16(start, arch.SRE_BIG_WORD) + val = fcnt.read_u16(start, BinContent.SourceEndian.BIG_WORD) self.assertEqual(val, 0x1615) # 32 bits - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) - val = fcnt.read_u32(start, arch.SRE_LITTLE_WORD) + val = fcnt.read_u32(start, BinContent.SourceEndian.LITTLE_WORD) self.assertEqual(val, 0x17181516) - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) - val = fcnt.read_u32(start, arch.SRE_BIG_WORD) + val = fcnt.read_u32(start, BinContent.SourceEndian.BIG_WORD) self.assertEqual(val, 0x16151817) # 64 bits - start = vmpa(0, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(0, vmpa.VmpaSpecialValue.NO_VIRTUAL) - val = fcnt.read_u64(start, arch.SRE_LITTLE_WORD) + val = fcnt.read_u64(start, BinContent.SourceEndian.LITTLE_WORD) self.assertEqual(val, 0x0708050603040102) - start = vmpa(0, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(0, vmpa.VmpaSpecialValue.NO_VIRTUAL) - val = fcnt.read_u64(start, arch.SRE_BIG_WORD) + val = fcnt.read_u64(start, BinContent.SourceEndian.BIG_WORD) self.assertEqual(val, 0x0201040306050807) @@ -95,36 +95,36 @@ class TestEndianness(ChrysalideTestCase): # 16 bits - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) - val = fcnt.read_u16(start, arch.SRE_LITTLE) + val = fcnt.read_u16(start, BinContent.SourceEndian.LITTLE) self.assertEqual(val, 0x1615) - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) - val = fcnt.read_u16(start, arch.SRE_BIG) + val = fcnt.read_u16(start, BinContent.SourceEndian.BIG) self.assertEqual(val, 0x1516) # 32 bits - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) - val = fcnt.read_u32(start, arch.SRE_LITTLE) + val = fcnt.read_u32(start, BinContent.SourceEndian.LITTLE) self.assertEqual(val, 0x18171615) - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) - val = fcnt.read_u32(start, arch.SRE_BIG) + val = fcnt.read_u32(start, BinContent.SourceEndian.BIG) self.assertEqual(val, 0x15161718) # 64 bits - start = vmpa(0, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(0, vmpa.VmpaSpecialValue.NO_VIRTUAL) - val = fcnt.read_u64(start, arch.SRE_LITTLE) + val = fcnt.read_u64(start, BinContent.SourceEndian.LITTLE) self.assertEqual(val, 0x0807060504030201) - start = vmpa(0, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(0, vmpa.VmpaSpecialValue.NO_VIRTUAL) - val = fcnt.read_u64(start, arch.SRE_BIG) + val = fcnt.read_u64(start, BinContent.SourceEndian.BIG) self.assertEqual(val, 0x0102030405060708) diff --git a/tests/analysis/contents/memory.py b/tests/analysis/contents/memory.py index 55ce035..f99e607 100644 --- a/tests/analysis/contents/memory.py +++ b/tests/analysis/contents/memory.py @@ -7,7 +7,7 @@ from chrysacase import ChrysalideTestCase -from pychrysalide import arch +from pychrysalide.analysis import BinContent from pychrysalide.analysis.contents import MemoryContent from pychrysalide.arch import vmpa, mrange @@ -29,7 +29,7 @@ class TestMemoryContent(ChrysalideTestCase): cnt = MemoryContent(data) - start = vmpa(4, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(4, vmpa.VmpaSpecialValue.NO_VIRTUAL) val = cnt.read_u8(start) self.assertEqual(val, 0x05) @@ -37,14 +37,14 @@ class TestMemoryContent(ChrysalideTestCase): val = cnt.read_u8(start) self.assertEqual(val, 0x06) - start = vmpa(14, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(14, vmpa.VmpaSpecialValue.NO_VIRTUAL) - val = cnt.read_u16(start, arch.SRE_LITTLE) + val = cnt.read_u16(start, BinContent.SourceEndian.LITTLE) self.assertEqual(val, 0x1817) - start = vmpa(10, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(10, vmpa.VmpaSpecialValue.NO_VIRTUAL) - val = cnt.read_u32(start, arch.SRE_LITTLE) + val = cnt.read_u32(start, BinContent.SourceEndian.LITTLE) self.assertEqual(val, 0x16150013) @@ -57,10 +57,10 @@ class TestMemoryContent(ChrysalideTestCase): with self.assertRaisesRegex(Exception, 'Invalid read access.'): - start = vmpa(1, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(1, vmpa.VmpaSpecialValue.NO_VIRTUAL) val = cnt.read_u8(start) with self.assertRaisesRegex(Exception, 'Invalid read access.'): - start = vmpa(0, vmpa.VMPA_NO_VIRTUAL) - val = cnt.read_u16(start, arch.SRE_LITTLE) + start = vmpa(0, vmpa.VmpaSpecialValue.NO_VIRTUAL) + val = cnt.read_u16(start, BinContent.SourceEndian.LITTLE) diff --git a/tests/analysis/contents/restricted.py b/tests/analysis/contents/restricted.py index 08aa968..023e600 100644 --- a/tests/analysis/contents/restricted.py +++ b/tests/analysis/contents/restricted.py @@ -7,7 +7,7 @@ from chrysacase import ChrysalideTestCase -from pychrysalide import arch +from pychrysalide.analysis import BinContent from pychrysalide.analysis.contents import FileContent, RestrictedContent from pychrysalide.arch import vmpa, mrange import tempfile @@ -52,7 +52,7 @@ class TestRestrictedContent(ChrysalideTestCase): fcnt = FileContent(self._out.name) - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) covered = mrange(start, 12) # 0x15 ... 0x28 rcnt = RestrictedContent(fcnt, covered) @@ -64,10 +64,10 @@ class TestRestrictedContent(ChrysalideTestCase): val = rcnt.read_u8(start) self.assertEqual(val, 0x16) - val = rcnt.read_u16(start, arch.SRE_LITTLE) + val = rcnt.read_u16(start, BinContent.SourceEndian.LITTLE) self.assertEqual(val, 0x1817) - val = rcnt.read_u32(start, arch.SRE_LITTLE) + val = rcnt.read_u32(start, BinContent.SourceEndian.LITTLE) self.assertEqual(val, 0x24232221) @@ -76,7 +76,7 @@ class TestRestrictedContent(ChrysalideTestCase): fcnt = FileContent(self._out.name) - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) covered = mrange(start, 12) # 0x15 ... 0x28 rcnt = RestrictedContent(fcnt, covered) @@ -100,42 +100,42 @@ class TestRestrictedContent(ChrysalideTestCase): fcnt = FileContent(self._out.name) - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) covered = mrange(start, 12) # 0x15 ... 0x28 rcnt = RestrictedContent(fcnt, covered) self.assertIsNotNone(rcnt) - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) val = rcnt.read_u8(start) self.assertEqual(val, 0x15) - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) - val = rcnt.read_u16(start, arch.SRE_LITTLE) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) + val = rcnt.read_u16(start, BinContent.SourceEndian.LITTLE) self.assertEqual(val, 0x1615) - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) - val = rcnt.read_u32(start, arch.SRE_LITTLE) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) + val = rcnt.read_u32(start, BinContent.SourceEndian.LITTLE) self.assertEqual(val, 0x18171615) - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) - val = rcnt.read_u64(start, arch.SRE_LITTLE) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) + val = rcnt.read_u64(start, BinContent.SourceEndian.LITTLE) self.assertEqual(val, 0x2423222118171615) - start = vmpa(23, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(23, vmpa.VmpaSpecialValue.NO_VIRTUAL) val = rcnt.read_u8(start) self.assertEqual(val, 0x28) - start = vmpa(22, vmpa.VMPA_NO_VIRTUAL) - val = rcnt.read_u16(start, arch.SRE_LITTLE) + start = vmpa(22, vmpa.VmpaSpecialValue.NO_VIRTUAL) + val = rcnt.read_u16(start, BinContent.SourceEndian.LITTLE) self.assertEqual(val, 0x2827) - start = vmpa(20, vmpa.VMPA_NO_VIRTUAL) - val = rcnt.read_u32(start, arch.SRE_LITTLE) + start = vmpa(20, vmpa.VmpaSpecialValue.NO_VIRTUAL) + val = rcnt.read_u32(start, BinContent.SourceEndian.LITTLE) self.assertEqual(val, 0x28272625) - start = vmpa(16, vmpa.VMPA_NO_VIRTUAL) - val = rcnt.read_u64(start, arch.SRE_LITTLE) + start = vmpa(16, vmpa.VmpaSpecialValue.NO_VIRTUAL) + val = rcnt.read_u64(start, BinContent.SourceEndian.LITTLE) self.assertEqual(val, 0x2827262524232221) @@ -144,41 +144,41 @@ class TestRestrictedContent(ChrysalideTestCase): fcnt = FileContent(self._out.name) - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) covered = mrange(start, 12) # 0x15 ... 0x28 rcnt = RestrictedContent(fcnt, covered) self.assertIsNotNone(rcnt) - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) val = rcnt.read_raw(start, 1) self.assertEqual(val, b'\x15') - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) val = rcnt.read_raw(start, 2) self.assertEqual(val, b'\x15\x16') - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) val = rcnt.read_raw(start, 4) self.assertEqual(val, b'\x15\x16\x17\x18') - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) val = rcnt.read_raw(start, 8) self.assertEqual(val, b'\x15\x16\x17\x18\x21\x22\x23\x24') - start = vmpa(23, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(23, vmpa.VmpaSpecialValue.NO_VIRTUAL) val = rcnt.read_raw(start, 1) self.assertEqual(val, b'\x28') - start = vmpa(22, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(22, vmpa.VmpaSpecialValue.NO_VIRTUAL) val = rcnt.read_raw(start, 2) self.assertEqual(val, b'\x27\x28') - start = vmpa(20, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(20, vmpa.VmpaSpecialValue.NO_VIRTUAL) val = rcnt.read_raw(start, 4) self.assertEqual(val, b'\x25\x26\x27\x28') - start = vmpa(16, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(16, vmpa.VmpaSpecialValue.NO_VIRTUAL) val = rcnt.read_raw(start, 8) self.assertEqual(val, b'\x21\x22\x23\x24\x25\x26\x27\x28') @@ -188,7 +188,7 @@ class TestRestrictedContent(ChrysalideTestCase): fcnt = FileContent(self._out.name) - start = vmpa(12, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) covered = mrange(start, 12) # 0x15 ... 0x28 rcnt = RestrictedContent(fcnt, covered) @@ -196,15 +196,29 @@ class TestRestrictedContent(ChrysalideTestCase): with self.assertRaisesRegex(Exception, 'Invalid read access.'): - start = vmpa(1, vmpa.VMPA_NO_VIRTUAL) + start = vmpa(1, vmpa.VmpaSpecialValue.NO_VIRTUAL) val = rcnt.read_u8(start) with self.assertRaisesRegex(Exception, 'Invalid read access.'): - start = vmpa(11, vmpa.VMPA_NO_VIRTUAL) - val = rcnt.read_u16(start, arch.SRE_LITTLE) + start = vmpa(11, vmpa.VmpaSpecialValue.NO_VIRTUAL) + val = rcnt.read_u16(start, BinContent.SourceEndian.LITTLE) with self.assertRaisesRegex(Exception, 'Invalid read access.'): - start = vmpa(23, vmpa.VMPA_NO_VIRTUAL) - val = rcnt.read_u16(start, arch.SRE_LITTLE) + start = vmpa(23, vmpa.VmpaSpecialValue.NO_VIRTUAL) + val = rcnt.read_u16(start, BinContent.SourceEndian.LITTLE) + + + def testDescription(self): + """Ensure restriction range is described.""" + + fcnt = FileContent(self._out.name) + + start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL) + covered = mrange(start, 1) + + rcnt = RestrictedContent(fcnt, covered) + self.assertIsNotNone(rcnt) + + self.assertTrue(rcnt.describe().endswith(' [0xc:0xd]')) diff --git a/tests/analysis/db/analyst.py b/tests/analysis/db/analyst.py new file mode 100644 index 0000000..7347810 --- /dev/null +++ b/tests/analysis/db/analyst.py @@ -0,0 +1,188 @@ + +from chrysacase import ChrysalideTestCase +import pychrysalide +from pychrysalide.analysis.contents import MemoryContent +from pychrysalide.analysis.db import certs +from pychrysalide.analysis.db import AdminClient, AnalystClient +from pychrysalide.analysis.db import HubServer +import os +import shutil +import sys +import tempfile +import threading + + +class TestDbConnection(ChrysalideTestCase): + """TestCase for analysis.db.""" + + @classmethod + def setUpClass(cls): + + super(TestDbConnection, cls).setUpClass() + + cls.log('Compile binary "strings" if needed...') + + fullname = sys.modules[cls.__module__].__file__ + dirpath = os.path.dirname(fullname) + + cls._bin_path = os.path.realpath(dirpath + '/../../format/elf/') + + os.system('make -C %s strings > /dev/null 2>&1' % cls._bin_path) + + cls._tmp_path = tempfile.mkdtemp() + + cls._server_path = '%s/.chrysalide/servers/localhost-9999/' % cls._tmp_path + os.makedirs(cls._server_path) + + cls._server_authorized_path = '%s/authorized/' % cls._server_path + os.makedirs(cls._server_authorized_path) + + cls._client_path = '%s/.chrysalide/clients/' % cls._tmp_path + os.makedirs(cls._client_path) + + cls._client_cert_path = '%s/localhost-9999/' % cls._client_path + os.makedirs(cls._client_cert_path) + + cls.log('Using temporary directory "%s"' % cls._tmp_path) + + + @classmethod + def tearDownClass(cls): + + super(TestDbConnection, cls).tearDownClass() + + cls.log('Delete built binaries...') + + os.system('make -C %s clean > /dev/null 2>&1' % cls._bin_path) + + ## os.system('ls -laihR %s' % cls._tmp_path) + + cls.log('Delete directory "%s"' % cls._tmp_path) + + shutil.rmtree(cls._tmp_path) + + + def testServerListening(self): + """List binaries available from server.""" + + + from pychrysalide import core + core.set_verbosity(0) + + + + identity = { + + 'C': 'FR', + 'CN': 'Test authority' + + } + + ret = certs.build_keys_and_ca(self._server_path, 'ca', 3650 * 24 * 60 * 60, identity) + self.assertTrue(ret) + + identity = { + + 'C': 'FR', + 'CN': 'Test server' + + } + + ret = certs.build_keys_and_request(self._server_path, 'server', identity); + self.assertTrue(ret) + + + ret = certs.sign_cert('%s/server-csr.pem' % self._server_path, '%s/ca-cert.pem' % self._server_path, \ + '%s/ca-key.pem' % self._server_path, '%s/server-cert.pem' % self._server_path, \ + 3650 * 24 * 60 * 60) + self.assertTrue(ret) + + identity = { + + 'C': 'FR', + 'CN': 'Test admin' + + } + + ret = certs.build_keys_and_request(self._client_path, 'client', identity); + self.assertTrue(ret) + + ret = certs.sign_cert('%s/client-csr.pem' % self._client_path, '%s/ca-cert.pem' % self._server_path, \ + '%s/ca-key.pem' % self._server_path, '%s/client-cert.pem' % self._client_cert_path, \ + 3650 * 24 * 60 * 60) + self.assertTrue(ret) + + shutil.copy('%s/ca-cert.pem' % self._server_path, + '%s/ca-cert.pem' % self._client_cert_path) + + shutil.copy('%s/client-cert.pem' % self._client_cert_path, + '%s/client-cert.pem' % self._server_authorized_path) + + + os.environ['XDG_CONFIG_HOME'] = self._tmp_path + os.environ['HOME'] = self._tmp_path + + server = HubServer('localhost', '9999') + + #print(server) + + ret = server.start() + + #print(ret) + + + + + # admin = AdminClient() + + # ret = admin.start('localhost', '9999') + # self.assertTrue(ret) + + # def _on_existing_binaries_updated(adm, evt): + # evt.set() + + # event = threading.Event() + + # admin.connect('existing-binaries-updated', _on_existing_binaries_updated, event) + + # ret = admin.request_existing_binaries() + # self.assertTrue(ret) + + # event.wait() + + # self.assertEqual(len(admin.existing_binaries), 0) + + + + cnt = MemoryContent(b'A' * 400 * 1024) + + + + analyst = AnalystClient(cnt.checksum, "elf", []) + + + + def _on_server_status_changed(analyst, hint, evt): + print(hint) + evt.set() + + + event = threading.Event() + + analyst.connect('server-status-changed', _on_server_status_changed, event) + + + + ret = analyst.start('localhost', '9999') + self.assertTrue(ret) + + + event.wait() + + event.clear() + + ret = analyst.send_content(cnt) + self.assertTrue(ret) + + + event.wait() diff --git a/tests/analysis/db/conn.py b/tests/analysis/db/conn.py index f388f60..248a036 100644 --- a/tests/analysis/db/conn.py +++ b/tests/analysis/db/conn.py @@ -1,7 +1,8 @@ from chrysacase import ChrysalideTestCase +from pychrysalide.analysis.contents import MemoryContent from pychrysalide.analysis.db import certs -from pychrysalide.analysis.db import AdminClient +from pychrysalide.analysis.db import AdminClient, AnalystClient from pychrysalide.analysis.db import HubServer import os import shutil @@ -51,7 +52,7 @@ class TestDbConnection(ChrysalideTestCase): from pychrysalide import core - #core.set_verbosity(0) + core.set_verbosity(0) @@ -117,21 +118,39 @@ class TestDbConnection(ChrysalideTestCase): - admin = AdminClient() + # admin = AdminClient() - ret = admin.start('localhost', '9999') - self.assertTrue(ret) + # ret = admin.start('localhost', '9999') + # self.assertTrue(ret) + + # def _on_existing_binaries_updated(adm, evt): + # evt.set() + + # event = threading.Event() + + # admin.connect('existing-binaries-updated', _on_existing_binaries_updated, event) + + # ret = admin.request_existing_binaries() + # self.assertTrue(ret) + + # event.wait() - def _on_existing_binaries_updated(adm, evt): - evt.set() + # self.assertEqual(len(admin.existing_binaries), 0) - event = threading.Event() - admin.connect('existing-binaries-updated', _on_existing_binaries_updated, event) - ret = admin.request_existing_binaries() + cnt = MemoryContent(b'A' * 400 * 1024) + + print(cnt) + + print(len(cnt.data)) + + + analyst = AnalystClient(cnt.checksum, []) + + ret = analyst.start('localhost', '9999') self.assertTrue(ret) - event.wait() - self.assertEqual(len(admin.existing_binaries), 0) + ret = analyst.send_content(cnt) + self.assertTrue(ret) diff --git a/tests/analysis/scan/booleans.py b/tests/analysis/scan/booleans.py new file mode 100644 index 0000000..aa3c1a3 --- /dev/null +++ b/tests/analysis/scan/booleans.py @@ -0,0 +1,98 @@ + +from common import RostTestClass + + +class TestRostBooleans(RostTestClass): + """TestCases for booleans and ROST.""" + + def testFinalCondition(self): + """Validate the final condition.""" + + rule = ''' +rule test { + + condition: + false + +} +''' + + self.check_rule_failure(rule) + + + rule = ''' +rule test { + + condition: + true + +} +''' + + self.check_rule_success(rule) + + + def testBasicBooleanOperations(self): + """Evaluate basic boolean operations.""" + + rule = ''' +rule test { + + condition: + true and false + +} +''' + + self.check_rule_failure(rule) + + + rule = ''' +rule test { + + condition: + true or false + +} +''' + + self.check_rule_success(rule) + + + def testImplicitCast(self): + """Imply implicit casts to booleans.""" + + rule = ''' +rule test { + + condition: + true and 0 + +} +''' + + self.check_rule_failure(rule) + + + rule = ''' +rule test { + + condition: + 1 or false + +} +''' + + self.check_rule_success(rule) + + + rule = ''' +rule test { + + condition: + 1 or () + +} +''' + + self.check_rule_success(rule) diff --git a/tests/analysis/scan/common.py b/tests/analysis/scan/common.py new file mode 100644 index 0000000..507b7e2 --- /dev/null +++ b/tests/analysis/scan/common.py @@ -0,0 +1,54 @@ + +from chrysacase import ChrysalideTestCase +from pychrysalide.analysis.contents import MemoryContent +from pychrysalide.analysis.scan import ContentScanner +from pychrysalide.analysis.scan import ScanOptions +from pychrysalide.analysis.scan.patterns.backends import AcismBackend + + +class RostTestClass(ChrysalideTestCase): + """TestCase for analysis.scan.ScanExpression.""" + + @classmethod + def setUpClass(cls): + + super(RostTestClass, cls).setUpClass() + + cls._options = ScanOptions() + cls._options.backend_for_data = AcismBackend + + cls._empty_content = MemoryContent(b'') + + + def _validate_rule_result(self, rule, content, expected): + """Check for scan success or failure.""" + + scanner = ContentScanner(rule) + ctx = scanner.analyze(self._options, content) + + self.assertIsNotNone(ctx) + + if expected: + self.assertTrue(ctx.has_match_for_rule('test')) + else: + self.assertFalse(ctx.has_match_for_rule('test')) + + return scanner, ctx + + + def check_rule_success(self, rule, content = None): + """Check for scan success.""" + + if content is None: + content = self._empty_content + + self._validate_rule_result(rule, content, True) + + + def check_rule_failure(self, rule, content = None): + """Check for scan failure.""" + + if content is None: + content = self._empty_content + + self._validate_rule_result(rule, content, False) diff --git a/tests/analysis/scan/examples.py b/tests/analysis/scan/examples.py new file mode 100644 index 0000000..74b5094 --- /dev/null +++ b/tests/analysis/scan/examples.py @@ -0,0 +1,70 @@ + +from common import RostTestClass +from pychrysalide.analysis.contents import MemoryContent + + +class TestRostExamples(RostTestClass): + """TestCases for the examples provides in the ROST documentation.""" + + def testComments(self): + """Ensure comments do not bother rule definitions.""" + + rule = ''' +/* + Multi-line header... +*/ + +rule test { // comment + + /* + * Some context + */ + + condition: /* List of condition(s) */ + true // Dummy condition + +} +''' + + self.check_rule_success(rule) + + + def testArithmeticPrecedence(self): + """Take care of arithmetic operators precedence.""" + + rule = ''' +rule test { // MulFirst + + condition: + 1 + 4 * (3 + 2) == 21 + and + (1 + 4) * (3 + 2) == 25 + +} +''' + + self.check_rule_success(rule) + + + def testUintCast(self): + """Process nested integer values from binary content.""" + + cnt = MemoryContent(b'\x4d\x5a\x00\x00' + b'\x50\x45\x00\x00' + 52 * b'\x00' + b'\x04\x00\x00\x00') + + rule = ''' +rule test { // IsPE + + condition: + + // MZ signature at offset 0 and ... + + uint16(0) == 0x5a4d and + + // ... PE signature at offset stored in the MZ header at offset 0x3c + + uint32(uint32(0x3c)) == 0x00004550 + +} +''' + + self.check_rule_success(rule, cnt) diff --git a/tests/analysis/scan/functions.py b/tests/analysis/scan/functions.py new file mode 100644 index 0000000..6aca957 --- /dev/null +++ b/tests/analysis/scan/functions.py @@ -0,0 +1,239 @@ + +from common import RostTestClass +from pychrysalide.analysis.contents import MemoryContent + + +class TestRostFunctions(RostTestClass): + """TestCases for the core functions of ROST.""" + + # Core + # ==== + + def testSetCounter(self): + """Count quantities and set sizes.""" + + rule = ''' +rule test { + + condition: + count("ABC") == 3 + and count("AB", "C") == count("ABC") + +} +''' + + self.check_rule_success(rule) + + + cnt = MemoryContent(b'\x01\x02\x02\x03\x03\x03') + + rule = ''' +rule test { + + bytes: + $int_01 = "\x01" + $int_02 = "\x02" + $int_3 = "\x03" + + condition: + count($int_0*, $int_3) == #int_* + +} +''' + + self.check_rule_success(rule, cnt) + + + def testDatasize(self): + """Handle the size of the provided data.""" + + cnt = MemoryContent(b'\x01\x02\x03\x04') + + cases = [ + 'datasize == 4', + 'uint16(0) == 0x201 and uint16(datasize - 2) == 0x0403', + ] + + for c in cases: + + rule = ''' +rule test { + + condition: + %s + +} +''' % c + + self.check_rule_success(rule, cnt) + + + def testMaxCommon(self): + """Count the largest quantity of same items in a set.""" + + cnt = MemoryContent(b'') + + cases = [ + [ '1', 1 ], + [ '1, 2, 3', 1 ], + [ '1, 2, 1, 3, 1', 3 ], + [ '1, "a", 2, 3, "a"', 2 ], + ] + + for c, q in cases: + + rule = ''' +rule test { + + condition: + maxcommon(%s) == %u + +} +''' % (c, q) + + self.check_rule_success(rule, cnt) + + + # Modules + # ======= + + def testConsole(self): + """Ensure logging always returns true.""" + + rule = ''' +rule test { + + condition: + console.log() + +} +''' + + self.check_rule_success(rule) + + + def testMagic(self): + """Scan text content with the Magic module.""" + + cnt = MemoryContent(b'aaaa') + + cases = [ + [ 'type', 'ASCII text, with no line terminators' ], + [ 'mime_encoding', 'us-ascii' ], + [ 'mime_type', 'text/plain' ], + ] + + for target, expected in cases: + + rule = ''' +rule test { + + condition: + magic.%s() == "%s" + +} +''' % (target, expected) + + self.check_rule_success(rule, cnt) + + + def testMathOperations(self): + """Perform math operations with core functions.""" + + rule = ''' +rule test { + + condition: + math.to_string(123) == "123" + and math.to_string(291, 16) == "0x123" + and math.to_string(-83, 8) == "-0123" + and math.to_string(123, 2) == "0b1111011" + +} +''' + + self.check_rule_success(rule) + + + def testStringOperations(self): + """Perform string operations with core functions.""" + + rule = ''' +rule test { + + condition: + string.lower("ABCd") == "abcd" and string.lower("123abc") == "123abc" + +} +''' + + self.check_rule_success(rule) + + + rule = ''' +rule test { + + condition: + string.upper("abcD") == "ABCD" and string.upper("123ABC") == "123ABC" + +} +''' + + self.check_rule_success(rule) + + + rule = ''' +rule test { + + condition: + string.to_int("123") == 123 + and string.to_int("123", 16) == 291 + and string.to_int("0x123") == 291 + and string.to_int("-0123") == -83 + +} +''' + + self.check_rule_success(rule) + + + rule = r''' +rule test { + + condition: + "A\x00B\x00C\x00D\x00" endswith string.wide("CD") + and "A\x00B\x00C\x00D\x00" contains string.wide("BC") + +} +''' + + self.check_rule_success(rule) + + + def testTime(self): + """Check current time.""" + + # Cf. https://www.epochconverter.com/ + + rule = ''' +rule test { + + condition: + time.make(2023, 8, 5, 22, 8, 41) == 0x64cec869 + +} +''' + + self.check_rule_success(rule) + + + rule = ''' +rule test { + + condition: + time.now() >= 0x64cec874 and time.now() <= time.now() + +} +''' + + self.check_rule_success(rule) diff --git a/tests/analysis/scan/fuzzing.py b/tests/analysis/scan/fuzzing.py new file mode 100644 index 0000000..1b9b25b --- /dev/null +++ b/tests/analysis/scan/fuzzing.py @@ -0,0 +1,289 @@ + +from common import RostTestClass +from pychrysalide.analysis.contents import MemoryContent +from pychrysalide.analysis.scan import ContentScanner +from pychrysalide.analysis.scan import ScanOptions +from pychrysalide.analysis.scan.patterns.backends import AcismBackend +from pychrysalide.analysis.scan.patterns.backends import BitapBackend + + +class TestRostFuzzingFixes(RostTestClass): + """TestCases to remember all the fixes for crashes identified by fuzzing.""" + + def testEmptyPatternListWithContent(self): + """Check no backend is run if there is no pattern to look for.""" + + content = MemoryContent(b'\n') + + rule = ''' +''' + + backends = [ + AcismBackend, # This one was segfaulting + BitapBackend, + ] + + for b in backends: + + options = ScanOptions() + options.backend_for_data = b + + scanner = ContentScanner(rule) + ctx = scanner.analyze(options, content) + + self.assertIsNotNone(ctx) + + + def testMandatoryCondition(self): + """Ensure a condition section exists in a rule.""" + + rule = ''' +rule test { + +} +''' + + with self.assertRaisesRegex(ValueError, 'Unable to create content scanner'): + + scanner = ContentScanner(rule) + + + def testNonExistingPattern(self): + """Avoid to count the matches of a non-existing pattern.""" + + rule = ''' +rule test { + + condition: + #badid + +} +''' + + with self.assertRaisesRegex(ValueError, 'Unable to create content scanner'): + + scanner = ContentScanner(rule) + + + def testNamespacesWithoutReductionCode(self): + """Clean the code for ROST namespaces.""" + + rule = ''' +rule test { + + condition: + console + +} +''' + + self.check_rule_failure(rule) + + + def testCallOnNonCallable(self): + """Reject calls on non callable expressions softly.""" + + rule = ''' +rule test { + + condition: + console.log().log() + +} +''' + + self.check_rule_failure(rule) + + + def testSelfReferencingRule(self): + """Reject any rule referencing itself as match condition.""" + + rule = ''' +rule test { + + condition: + test + +} +''' + + self.check_rule_failure(rule) + + + def testSelfReferencingRule(self): + """Expect only one argument for the not operator, even in debug mode.""" + + rule = ''' +rule test { + + condition: + not(0) + +} +''' + + self.check_rule_success(rule) + + + def testNoCommon(self): + """Handle the case where no common item is found from an empty set.""" + + rule = ''' +rule test { + + bytes: + $a = "a" + + condition: + maxcommon($a) == 0 + +} +''' + + self.check_rule_success(rule) + + + def testAAsAcharacter(self): + """Consider the 'a' character as a valid lowercase character.""" + + rule = ''' +rule test { + + bytes: + $a = "0000a0I0" nocase + + condition: + $a + +} +''' + + self.check_rule_failure(rule) + + + def testAAsAcharacter(self): + """Do not expect initialized trackers when there is no real defined search pattern.""" + + rule = ''' +rule test { + + bytes: + $a = {[0]} + + condition: + $a + +} +''' + + with self.assertRaisesRegex(ValueError, 'Unable to create content scanner'): + + scanner = ContentScanner(rule) + + + def testAllocations(self): + """Handle big alloctions for strings in conditions with regular expressions.""" + + rule = ''' +rule test { + + condition: + "%s" == "%s" + +} +''' % ("0" * (256 * 2 + 8), "0" * (256 * 2 + 8)) + + self.check_rule_success(rule) + + + def testFileFinalAccess(self): + """Ensure patterns found at the edges of scanned content do not crash the scanner.""" + + cnt = MemoryContent(bytes([ 0 for i in range(16) ])) + + rule = ''' +rule test { + + bytes: + $a = { 00 00 00 00 00 00 00 00 } + + condition: + $a + +} +''' + + self.check_rule_success(rule, cnt) + + + def testValidHexRangeMerge(self): + """Merge valid hexadecimal ranges.""" + + rule = ''' +rule test { + + bytes: + $a = { [0] ?? } + + condition: + $a + +} +''' + + with self.assertRaisesRegex(ValueError, 'Unable to create content scanner'): + + scanner = ContentScanner(rule) + + + rule = ''' +rule test { + + bytes: + $a = { [2] ?? } + + condition: + $a + +} +''' + + self.check_rule_failure(rule) + + + def testSmallBase64(self): + """Handle small base64 encodings which may produce few patterns.""" + + rule = ''' +rule test { + + bytes: + $a = "0" base64 + + condition: + $a + +} +''' + + self.check_rule_failure(rule) + + + def testCountIndex(self): + """Ban pattern count indexes from the grammer.""" + + rule = ''' +rule test { + + bytes: + $a = "1" + + condition: + #*[0] + +} +''' + + with self.assertRaisesRegex(ValueError, 'Unable to create content scanner'): + + scanner = ContentScanner(rule) diff --git a/tests/analysis/scan/grammar.py b/tests/analysis/scan/grammar.py new file mode 100644 index 0000000..14f67fa --- /dev/null +++ b/tests/analysis/scan/grammar.py @@ -0,0 +1,484 @@ + +import json + +from common import RostTestClass +from pychrysalide.analysis.contents import MemoryContent + + +class TestRostGrammar(RostTestClass): + """TestCases for the ROST grammar.""" + + def testRelationalExpressions(self): + """Build expressions with relational comparisons.""" + + cases = [ + + # Regular + [ '-1', '<=', '2', True ], + [ '-1', '<=', '2', True ], + [ '"aaa"', '==', '"aaa"', True ], + [ '"aaa"', '<', '"aaaa"', True ], + [ '""', '<', '"aaaa"', True ], + + # Cast + [ 'false', '==', '0', True ], + [ 'false', '==', '1', False ], + [ 'true', '!=', '0', True ], + [ '1', '==', 'true', True ], + [ 'false', '==', '()', True ], + [ 'true', '==', '(0,)', True ], + + ] + + for op1, kwd, op2, expected in cases: + + rule = ''' +rule test { + + condition: + %s %s %s + +} +''' % (op1, kwd, op2) + + if expected: + self.check_rule_success(rule) + else: + self.check_rule_failure(rule) + + + def testLogicalOperations(self): + """Evaluate some logical operations.""" + + cases = [ + [ 'true and false', False ], + [ 'false or false', False ], + [ 'true and true or false', True ], + [ 'false or true and false', False ], + [ '1 or false', True ], + ] + + for cond, expected in cases: + + rule = ''' +rule test { + + condition: + %s + +} +''' % (cond) + + if expected: + self.check_rule_success(rule) + else: + self.check_rule_failure(rule) + + + def testArithmeticOperations(self): + """Evaluate some arithmetic operations.""" + + cases = [ + + # Clever + '1 + 2 == 3', + '10 + -3 == 7', + '-3 + 10 == 7', + '-10 - 1 < 0', + '-10 - 1 == -11', + '(-10 - 1) == -11', + '(-1 - -10) == 9', + '-2 * -3 == 6', + '-2 * 3 == -6', + + # Legacy + '1 + 4 * 3 + 2 == 15', + '(1 + 4) * 3 + 2 == 17', + '1 + 4 * (3 + 2) == 21', + '(1 + 4) * (3 + 2) == 25', + + ] + + for c in cases: + + rule = ''' +rule test { + + condition: + %s + +} +''' % (c) + + self.check_rule_success(rule) + + + def testBasicStringsOperations(self): + """Build expressions with basic strings operations.""" + + cases = [ + + # Clever + [ '123---456', 'contains', '---', True ], + [ '123---456', 'contains', 'xxx', False ], + [ '---123---456', 'startswith', '---', True ], + [ '---123---456', 'startswith', 'xxx', False ], + [ '123---456---', 'endswith', '---', True ], + [ '123---456---', 'endswith', 'xxx', False ], + [ 'AAA---BBB', 'icontains', 'aaa', True ], + [ 'AAA---BBB', 'icontains', 'xxx', False ], + [ 'AAA---BBB', 'istartswith', 'aAa', True ], + [ 'AAA---BBB', 'istartswith', 'xxx', False ], + [ 'AAA---BBB', 'iendswith', 'bBb', True ], + [ 'AAA---BBB', 'iendswith', 'xxx', False ], + [ 'AzertY', 'iequals', 'AZERTY', True ], + [ 'AzertY', 'iequals', 'AZERTY-', False ], + + # Legacy + [ '123\t456', 'contains', '\t', True ], + [ '123-456', 'startswith', '1', True ], + [ '123-456', 'startswith', '1234', False ], + [ '123-456', 'endswith', '6', True ], + [ '123-456', 'endswith', '3456', False ], + + ] + + for op1, kwd, op2, expected in cases: + + rule = ''' +rule test { + + condition: + "%s" %s "%s" + +} +''' % (op1, kwd, op2) + + if expected: + self.check_rule_success(rule) + else: + self.check_rule_failure(rule) + + + def testSizeUnits(self): + """Evaluate size units.""" + + cases = [ + '1KB == 1024', + '2MB == 2 * 1024 * 1024', + '4Kb == (4 * 1024)', + '1KB <= 1024 and 1024 < 1MB', + ] + + for c in cases: + + rule = ''' +rule test { + + condition: + %s + +} +''' % (c) + + self.check_rule_success(rule) + + + def testPrivateRules(self): + """Ensure private rules remain silent.""" + + for private in [ True, False ]: + for state in [ True, False ]: + + rule = ''' +%srule silent { + + condition: + %s + +} + +rule test { + + condition: + silent + +} +''' % ('private ' if private else '', 'true' if state else 'false') + + scanner, ctx = self._validate_rule_result(rule, self._empty_content, state) + + data = scanner.convert_to_json(ctx) + jdata = json.loads(data) + + # Exemple : + # + # [{'bytes_patterns': [], 'matched': True, 'name': 'test'}, + # {'bytes_patterns': [], 'matched': True, 'name': 'silent'}] + + found = len([ j['name'] for j in jdata if j['name'] == 'silent' ]) > 0 + + self.assertTrue(private ^ found) + + + def testGlobalRules(self): + """Take global rules into account.""" + + for glob_state in [ True, False ]: + for state in [ True, False ]: + + rule = ''' +%srule silent { + + condition: + %s + +} + +rule test { + + condition: + true + +} +''' % ('global ' if glob_state else '', 'true' if state else 'false') + + expected = not(glob_state) or state + + if expected: + self.check_rule_success(rule) + else: + self.check_rule_failure(rule) + + + def testMatchCount(self): + """Ensure match count provides expected values.""" + + cnt = MemoryContent(b'\x01\x02\x02\x03\x03\x03') + + rule = ''' +rule test { + + bytes: + $int_01 = "\x01" + $int_02 = "\x02" + $int_03 = "\x03" + + condition: + #int_01 == count($int_01) and #int_01 == 1 + and #int_02 == count($int_02) and #int_02 == 2 + and #int_03 == count($int_03) and #int_03 == 3 + and #int_0* == count($int_0*) and #int_0* == 6 + +} +''' + + self.check_rule_success(rule, cnt) + + + def testBackingUpHandlers(self): + """Ensure handlers for backing up removals do not limit the grammar.""" + + cnt = MemoryContent(b'AB12') + + # Uncompleted token in rule definition: '?? ?? ' + + rule = ''' +rule test { + + bytes: + $a = { ?? ?? } + + condition: + #a == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '?? ' + + rule = ''' +rule test { + + bytes: + $a = { ?? 4? } + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '?? ?' + + rule = ''' +rule test { + + bytes: + $a = { ?? ?2 } + + condition: + #a == 2 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '?? ' + + rule = ''' +rule test { + + bytes: + $a = { ?? 42 } + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + # Uncompleted token in rule definition: '?1 ?' + + rule = ''' +rule test { + + bytes: + $a = { ?1 ?? } + + condition: + #a == 2 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '?1 4? ' + + rule = ''' +rule test { + + bytes: + $a = { ?1 4? } + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '?1 ?2 ' + + rule = ''' +rule test { + + bytes: + $a = { ?1 ?2 } + + condition: + #a == 2 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '?1 4' + + rule = ''' +rule test { + + bytes: + $a = { ?1 42 } + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + # Uncompleted token in rule definition: '41 ' + + rule = ''' +rule test { + + bytes: + $a = { 41 ?? } + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '41 4' + + rule = ''' +rule test { + + bytes: + $a = { 41 4? } + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '41 ' + + rule = ''' +rule test { + + bytes: + $a = { 41 ?2 } + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '41 42 ' + + rule = ''' +rule test { + + bytes: + $a = { 41 42 } + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + + +# TODO : test <haystack> matches <regex> + + + diff --git a/tests/analysis/scan/matches.py b/tests/analysis/scan/matches.py new file mode 100644 index 0000000..efcae4f --- /dev/null +++ b/tests/analysis/scan/matches.py @@ -0,0 +1,64 @@ + +from common import RostTestClass +from pychrysalide.analysis.contents import MemoryContent + + +class TestRostMatchs(RostTestClass): + """TestCases for the ROST pattern matching engine.""" + + def testCountMatches(self): + """Count matched patterns.""" + + cnt = MemoryContent(b'aaa aaa bbb aaa') + + rule = ''' +rule test { + + bytes: + $a = "aaa" + $b = "bbb" + + condition: + #a == 3 and #b < 2 + +} +''' + + self.check_rule_success(rule, cnt) + + + def testCountSameMatches(self): + """Count matches of similar patterns.""" + + cnt = MemoryContent(b'ABCDabcdABCDabcd') + + rule = ''' +rule test { + + bytes: + $a = "\x61\x62\x63\x64" + $b = "\x61\x62\x63\x64" + + condition: + #a == 2 and #b == 2 + +} +''' + + self.check_rule_success(rule, cnt) + + + rule = ''' +rule test { + + bytes: + $a = "\x61\x62\x63\x64" + $b = "\x61\x62\x63" + + condition: + #a == 2 and #b == 2 + +} +''' + + self.check_rule_success(rule, cnt) diff --git a/tests/analysis/scan/pyapi.py b/tests/analysis/scan/pyapi.py new file mode 100644 index 0000000..7a697b3 --- /dev/null +++ b/tests/analysis/scan/pyapi.py @@ -0,0 +1,297 @@ + +import binascii +import struct + +from chrysacase import ChrysalideTestCase +from gi._constants import TYPE_INVALID +from pychrysalide.analysis.scan import ScanExpression +from pychrysalide.analysis.scan import ScanOptions +from pychrysalide.analysis.scan import find_token_modifiers_for_name +from pychrysalide.glibext import ComparableItem + + +class TestRostPythonAPI(ChrysalideTestCase): + """TestCase for the ROST Python API.""" + + def testEmptyOptions(self): + """Check default scan options.""" + + ops = ScanOptions() + + self.assertEqual(ops.backend_for_data, TYPE_INVALID) + + + def testDirectInstancesOfExpression(self): + """Reject direct instances of ROST expressions.""" + + with self.assertRaisesRegex(RuntimeError, 'pychrysalide.analysis.scan.ScanExpression is an abstract class'): + + e = ScanExpression() + + + def testBooleanComparison(self): + """Compare custom scan expressions.""" + + class StrLenExpr(ScanExpression): + + def __init__(self, value): + super().__init__(ScanExpression.ScanReductionState.REDUCED) + self._value = value + + def _cmp_rich(self, other, op): + + if op == ComparableItem.RichCmpOperation.EQ: + return len(self._value) == len(other._value) + + + e0 = StrLenExpr('00000000000') + + e1 = StrLenExpr('00000000000') + + e2 = StrLenExpr('000000000000000000000000000') + + self.assertTrue(e0 == e1) + + # !? + # Python teste e0 != e1 (non implémenté), puis e1 != e0 (pareil) et en déduit une différence ! + # self.assertFalse(e0 != e1) + + self.assertFalse(e0 == e2) + + # TypeError: '<' not supported between instances of 'StrLenExpr' and 'StrLenExpr' + with self.assertRaisesRegex(TypeError, '\'<\' not supported between instances'): + self.assertTrue(e0 < e1) + + + def testBytePatternModifiers(self): + """Validate the bytes produced by modifiers.""" + + mod = find_token_modifiers_for_name('plain') + self.assertIsNotNone(mod) + + source = b'ABC' + transformed = mod.transform(source) + + self.assertEqual(source, transformed[0]) + + + mod = find_token_modifiers_for_name('hex') + self.assertIsNotNone(mod) + + source = b'ABC' + transformed = mod.transform(source) + + self.assertEqual(binascii.hexlify(source), transformed[0]) + + + mod = find_token_modifiers_for_name('rev') + self.assertIsNotNone(mod) + + source = b'ABC' + transformed = mod.transform(source) + + self.assertEqual(source[::-1], transformed[0]) + + + mod = find_token_modifiers_for_name('lower') + self.assertIsNotNone(mod) + + source = b'AbC' + transformed = mod.transform(source) + + self.assertEqual(source.lower(), transformed[0]) + + + mod = find_token_modifiers_for_name('upper') + self.assertIsNotNone(mod) + + source = b'AbC' + transformed = mod.transform(source) + + self.assertEqual(source.upper(), transformed[0]) + + + mod = find_token_modifiers_for_name('wide') + self.assertIsNotNone(mod) + + source = b'ABC' + transformed = mod.transform(source) + + self.assertEqual(source.decode('ascii'), transformed[0].decode('utf-16-le')) + + + mod = find_token_modifiers_for_name('base64') + self.assertIsNotNone(mod) + + source = b'ABC' + transformed = mod.transform(source) + + self.assertEqual(len(transformed), 3) + self.assertEqual(transformed[0], b'QUJD') + self.assertEqual(transformed[1], b'FCQ') + self.assertEqual(transformed[2], b'BQk') + + + def testClassicalAPIHashing(self): + """Reproduce classical API Hashing results.""" + + def b2i(t): + return struct.unpack('<I', t)[0] + + + # Example: + # - PlugX (2020) - https://vms.drweb.fr/virus/?i=21512304 + + mod = find_token_modifiers_for_name('crc32') + self.assertIsNotNone(mod) + + source = b'GetCurrentProcess\x00' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0x3690e66) + + + # Example: + # - GuLoader (2020) - https://www.crowdstrike.com/blog/guloader-malware-analysis/ + + mod = find_token_modifiers_for_name('djb2') + self.assertIsNotNone(mod) + + source = b'GetProcAddress' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0xcf31bb1f) + + + def testCustomAPIHashing(self): + """Reproduce custom API Hashing results.""" + + def b2i(t): + return struct.unpack('<I', t)[0] + + + # Example: + # Underminer Exploit Kit (2019) - https://jsac.jpcert.or.jp/archive/2019/pdf/JSAC2019_1_koike-nakajima_jp.pdf + + mod = find_token_modifiers_for_name('add1505-shl5') + self.assertIsNotNone(mod) + + source = b'LoadLibraryA' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0x5fbff0fb) + + + # Example: + # Enigma Stealer (2023) https://www.trendmicro.com/es_mx/research/23/b/enigma-stealer-targets-cryptocurrency-industry-with-fake-jobs.html + + mod = find_token_modifiers_for_name('enigma-murmur') + self.assertIsNotNone(mod) + + source = b'CreateMutexW' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0xfd43765a) + + + # Examples: + # - ShadowHammer (2019) - https://blog.f-secure.com/analysis-shadowhammer-asus-attack-first-stage-payload/ + # - ShadowHammer (2019) - https://securelist.com/operation-shadowhammer-a-high-profile-supply-chain-attack/90380/ + + mod = find_token_modifiers_for_name('imul21-add') + self.assertIsNotNone(mod) + + source = b'VirtualAlloc' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0xdf894b12) + + + # Examples: + # - Bottle Exploit Kit (2019) - https://nao-sec.org/2019/12/say-hello-to-bottle-exploit-kit.html + # - ShadowHammer (2019) - https://securelist.com/operation-shadowhammer-a-high-profile-supply-chain-attack/90380/ + + mod = find_token_modifiers_for_name('imul83-add') + self.assertIsNotNone(mod) + + source = b'GetProcAddress' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0x9ab9b854) + + + # Examples: + # - ?? (2021) - https://www.threatspike.com/blogs/reflective-dll-injection + # - Mustang Panda (2022) - https://blog.talosintelligence.com/mustang-panda-targets-europe/ + + mod = find_token_modifiers_for_name('ror13') + self.assertIsNotNone(mod) + + source = b'GetProcAddress' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0x7c0dfcaa) + + source = b'VirtualAlloc' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0x91afca54) + + + # Example: + # - Energetic Bear (2019) - https://insights.sei.cmu.edu/blog/api-hashing-tool-imagine-that/ + + mod = find_token_modifiers_for_name('sll1-add-hash32') + self.assertIsNotNone(mod) + + source = b'LoadLibraryA' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0x000d5786) + + + # Example: + # - SideWinder/WarHawk (2022) - https://www.zscaler.com/blogs/security-research/warhawk-new-backdoor-arsenal-sidewinder-apt-group + + mod = find_token_modifiers_for_name('sub42') + self.assertIsNotNone(mod) + + source = b'LoadLibraryA' + transformed = mod.transform(source) + + self.assertEqual(transformed[0], b'\x8e\xb1\xa3\xa6\x8e\xab\xa4\xb4\xa3\xb4\xbb\x83') + + + # Example: + # - TrickBot (2021) - https://medium.com/walmartglobaltech/trickbot-crews-new-cobaltstrike-loader-32c72b78e81c + + mod = find_token_modifiers_for_name('sub-index1') + self.assertIsNotNone(mod) + + source = b'raw.githubusercontent.com' + transformed = mod.transform(source) + + self.assertEqual(transformed[0], b'\x73\x63\x7a\x32\x6c\x6f\x7b\x70\x7e\x6c\x80\x7f\x72\x80\x72\x7f\x7f\x86\x78\x82\x89\x44\x7a\x87\x86') + + + def testBytePatternModifiersAPI(self): + """Validate the API for pattern modifiers.""" + + mod = find_token_modifiers_for_name('plain') + self.assertIsNotNone(mod) + + source = [ b'ABC', b'01234' ] + transformed = mod.transform(source) + + self.assertEqual(len(source), len(transformed)) + self.assertEqual(source[0], transformed[0]) + self.assertEqual(source[1], transformed[1]) + + + mod = find_token_modifiers_for_name('xor') + self.assertIsNotNone(mod) + + source = [ b'ABC' ] + transformed = mod.transform(source, 0x20) + + self.assertEqual(transformed[0], b'abc') diff --git a/tests/analysis/scan/scanning_hex.py b/tests/analysis/scan/scanning_hex.py new file mode 100644 index 0000000..4b0fda4 --- /dev/null +++ b/tests/analysis/scan/scanning_hex.py @@ -0,0 +1,716 @@ + +from common import RostTestClass +from pychrysalide.analysis.contents import MemoryContent + + +class TestRostScanningBinary(RostTestClass): + """TestCases for the bytes section syntax (binary).""" + + def testLonelyPatterns(self): + """Evaluate the most simple patterns.""" + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { 41 } + + condition: + #a == 1 and @a[0] == 0 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { 62 } + + condition: + #a == 1 and @a[0] == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { 66 } + + condition: + #a == 1 and @a[0] == 5 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ?1 } + + condition: + #a == 1 and @a[0] == 0 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ?2 } + + condition: + #a == 1 and @a[0] == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ?6 } + + condition: + #a == 1 and @a[0] == 5 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def ___testLonelyPatternsNot(self): + """Evaluate the most simple patterns (not version).""" + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ~41 } + + condition: + #a == 5 and @a[0] == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ~62 } + + condition: + #a == 5 and @a[0] == 0 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ~66 } + + condition: + #a == 5 and @a[4] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ~?1 } + + condition: + #a == 5 and @a[0] == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ~?2 } + + condition: + #a == 5 and @a[0] == 0 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ~?6 } + + condition: + #a == 5 and @a[4] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testSimpleHexPattern(self): + """Test a simple hex pattern.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 41 62 63 } + + condition: + #a == 1 and @a[0] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 2d 41 62 63 } + + condition: + #a == 1 and @a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testSimpleMaskedHexPattern(self): + """Test a simple masked hex pattern.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ?1 6? ?3 } + + condition: + #a == 1 and @a[0] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testHexPatternWithPlainAndMasked(self): + """Test hex patterns with plain and masked bytes.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 41 6? ?3 } + + condition: + #a == 1 and @a[0] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 4? 62 ?3 } + + condition: + #a == 1 and @a[0] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 4? ?2 63 } + + condition: + #a == 1 and @a[0] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 4? ?2 ?3 } + + condition: + #a == 1 and @a[0] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 2d 4? ?2 63 } + + condition: + #a == 1 and @a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 2d 4? 62 ?3 2d } + + condition: + #a == 1 and @a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 2? 41 6? 63 ?d } + + condition: + #a == 1 and @a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testHexPatternWithPlainAndHoles(self): + """Test hex patterns with plain bytes and holes.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 33 ?? 41 ?? 63 ?? 34 } + + condition: + #a == 1 and @a[0] == 2 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ?? 33 ?? 41 ?? 63 ?? 34 ?? } + + condition: + #a == 1 and @a[0] == 1 and !a[0] == 9 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ?? 33 [1-5] 63 ?? 34 ?? } + + condition: + #a == 1 and @a[0] == 1 and !a[0] == 9 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { [3-4] 41 ?? 63 ?? 34 ?? } + + condition: + #a == 1 and @a[0] == 1 and !a[0] == 9 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ?? 33 ?? 41 ?? 63 [3-] } + + condition: + #a == 1 and @a[0] == 1 and !a[0] == 9 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testHexPatternWithMaskedAndHoles(self): + """Test hex patterns with masked bytes and holes.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ?3 ?? 4? ?? 6? ?? ?4 } + + condition: + #a == 1 and @a[0] == 2 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ?? ?3 ?? 4? ?? 6? ?? ?4 ?? } + + condition: + #a == 1 and @a[0] == 1 and !a[0] == 9 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ?? ?3 [1-5] ?3 ?? ?4 ?? } + + condition: + #a == 1 and @a[0] == 1 and !a[0] == 9 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { [3-4] ?1 ?? ?3 ?? ?4 ?? } + + condition: + #a == 1 and @a[0] == 1 and !a[0] == 9 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ?? 3? ?? 4? ?? 6? [3-] } + + condition: + #a == 1 and @a[0] == 1 and !a[0] == 9 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testPipedPlainHexPatterns(self): + """Look for several patterns at once with piped definition.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 41 62 ( 63 | 64 | 65 ) } + + condition: + #a == 1 and @a[0] == 4 and !a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ( 41 | f2 | f3 ) 62 63 } + + condition: + #a == 1 and @a[0] == 4 and !a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 41 ( 61 | 62 | 63 ) 63 } + + condition: + #a == 1 and @a[0] == 4 and !a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ( 41 62 63 | 42 62 63 | 43 62 63 ) } + + condition: + #a == 1 and @a[0] == 4 and !a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testPipedMaskedHexPatterns(self): + """Look for several patterns at once with piped and masked definition.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 4? 6? ( ?3 | ?4 | ?5 ) } + + condition: + #a == 1 and @a[0] == 4 and !a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ( ?1 | ?2 | ?3 ) 6? 6? } + + condition: + #a == 1 and @a[0] == 4 and !a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 4? ( ?1 | ?2 | ?3 ) 6? } + + condition: + #a == 1 and @a[0] == 4 and !a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testDuplicatedResultsFiltering(self): + """Filter duplicated results.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ( 4? ?2 ?3 | 4? 6? 6? | ?3 6? ?3 ) } + + condition: + #a == 1 and @a[0] == 4 and !a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) diff --git a/tests/analysis/scan/scanning_str.py b/tests/analysis/scan/scanning_str.py new file mode 100644 index 0000000..75427a7 --- /dev/null +++ b/tests/analysis/scan/scanning_str.py @@ -0,0 +1,194 @@ + +from common import RostTestClass +from pychrysalide.analysis.contents import MemoryContent + + +class TestRostScanningStrings(RostTestClass): + """TestCases for the bytes section syntax (strings).""" + + def testSimpleStringPattern(self): + """Test a simple string pattern.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = "Abc" + + condition: + #a == 1 and @a[0] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testEscapedStringPattern(self): + """Test escaped string patterns.""" + + cnt = MemoryContent(b'\a\b\t\n\v\f\r' + bytes([ 0x1b ]) + b'"\\\xff') + + rule = r''' +rule test { + + bytes: + $a = "\a\b\t\n\v\f\r\e\"\\\xff" + + condition: + #a == 1 and @a[0] == 0 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'\a\b\t\n--123--\v\f\r' + bytes([ 0x1b ]) + b'"\\\xff') + + rule = r''' +rule test { + + bytes: + $a = "\a\b\t\n--123--\v\f\r\e\"\\\xff" + + condition: + #a == 1 and @a[0] == 0 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testStringModifiers(self): + """Check string modifiers output.""" + + cnt = MemoryContent(b'--414243--') + + rule = ''' +rule test { + + bytes: + $a = "ABC" hex + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'--ABC--') + + rule = ''' +rule test { + + bytes: + $a = "ABC" plain + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'--CBA--') + + rule = ''' +rule test { + + bytes: + $a = "ABC" rev + + condition: + #a == 1 + +} +''' + + + def testStringPatternFullword(self): + """Test a fullword string pattern.""" + + cnt = MemoryContent(b'ABCDEF 123 ') + + rule = ''' +rule test { + + bytes: + $a = "DEF" fullword + $b = "123" fullword + + condition: + #a == 0 and #b == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'DEF 123 ') + + rule = ''' +rule test { + + bytes: + $a = "DEF" fullword + $b = "123" fullword + + condition: + #a == 1 and #b == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'\tDEF 123 ') + + rule = ''' +rule test { + + bytes: + $a = "DEF" fullword + $b = "123" fullword + + condition: + #a == 1 and #b == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testStringPatternCase(self): + """Test a string pattern with case care.""" + + cnt = MemoryContent(b'abc123-Abc123Def456GHI...z0z1z2z3z4z5z6z7z8z9') + + rule = ''' +rule test { + + bytes: + $a = "Abc" nocase + $b = "ABC123DEF456GHI" nocase + $z = "z0z1z2z3z4z5z6z7z8z9" nocase + + condition: + #a == 2 and #b == 1 and #z == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) diff --git a/tests/analysis/scan/sets.py b/tests/analysis/scan/sets.py new file mode 100644 index 0000000..1d10fbf --- /dev/null +++ b/tests/analysis/scan/sets.py @@ -0,0 +1,118 @@ + +from common import RostTestClass + + +class TestRostSets(RostTestClass): + """TestCases for sets support in ROST.""" + + def testSetsAsBooleans(self): + """Convert sets to boolean.""" + + rule = ''' +rule test { + + condition: + () + +} +''' + + self.check_rule_failure(rule) + + + rule = ''' +rule test { + + condition: + (1, ) + +} +''' + + self.check_rule_success(rule) + + + rule = ''' +rule test { + + condition: + ("aaa", true, 123) + +} +''' + + self.check_rule_success(rule) + + + def testStringAsArray(self): + """Handle strings as arrays.""" + + rule = ''' +rule test { + + condition: + count("aaa") + +} +''' + + self.check_rule_success(rule) + + + rule = ''' +rule test { + + condition: + count("aaa") == 3 + +} +''' + + self.check_rule_success(rule) + + + def testSetsIntersections(self): + """Perform sets intersections.""" + + rule = ''' +rule test { + + condition: + ("aaa", "bbb") in ("AAA", "BBB", "aaa") + +} +''' + + self.check_rule_success(rule) + + + rule = ''' +rule test { + + condition: + ("aaa", "bbb") in ("123", ) + +} +''' + + self.check_rule_failure(rule) + + + + + + + + + + + + + + # TODO : + + # test : intersection(a, a) == a + + # test : "123" in "0123456789" + # test : "123" in "012987" + |