summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/analysis/cattribs.py42
-rw-r--r--tests/analysis/contents/checksum.py2
-rw-r--r--tests/analysis/contents/endian.py50
-rw-r--r--tests/analysis/contents/memory.py18
-rw-r--r--tests/analysis/contents/restricted.py84
-rw-r--r--tests/analysis/db/analyst.py188
-rw-r--r--tests/analysis/db/conn.py43
-rw-r--r--tests/analysis/scan/booleans.py98
-rw-r--r--tests/analysis/scan/common.py54
-rw-r--r--tests/analysis/scan/examples.py70
-rw-r--r--tests/analysis/scan/functions.py239
-rw-r--r--tests/analysis/scan/fuzzing.py289
-rw-r--r--tests/analysis/scan/grammar.py484
-rw-r--r--tests/analysis/scan/matches.py64
-rw-r--r--tests/analysis/scan/pyapi.py297
-rw-r--r--tests/analysis/scan/scanning_hex.py716
-rw-r--r--tests/analysis/scan/scanning_str.py194
-rw-r--r--tests/analysis/scan/sets.py118
-rw-r--r--tests/common/bitfield.py174
-rw-r--r--tests/common/itoa.py28
-rw-r--r--tests/plugins/encodings/all.py23
-rw-r--r--tests/plugins/kaitai/__init__.py0
-rw-r--r--tests/plugins/kaitai/language.py2474
-rw-r--r--tests/plugins/kaitai/rost.py170
-rw-r--r--tests/plugins/yaml.py175
-rw-r--r--tests/plugins/yamlrdr.py277
26 files changed, 5991 insertions, 380 deletions
diff --git a/tests/analysis/cattribs.py b/tests/analysis/cattribs.py
index 1a7f7da..e388afc 100644
--- a/tests/analysis/cattribs.py
+++ b/tests/analysis/cattribs.py
@@ -15,10 +15,10 @@ class TestProjectFeatures(ChrysalideTestCase):
def testEmptyContentAttributeSet(self):
"""Check properties of empty content attribute set."""
- attribs = ContentAttributes('')
- self.assertIsNotNone(attribs)
+ attribs, filename = ContentAttributes('')
- self.assertIsNone(attribs.filename)
+ self.assertIsNotNone(attribs)
+ self.assertIsNone(filename)
self.assertEqual(len(attribs.keys), 0)
@@ -33,16 +33,16 @@ class TestProjectFeatures(ChrysalideTestCase):
'dddd': '3',
}
- filename = 'filename'
- path = filename
+ orig_filename = 'filename'
+ path = orig_filename
for k in model.keys():
path += '&%s=%s' % (k, model[k])
- attribs = ContentAttributes(path)
- self.assertIsNotNone(attribs)
+ attribs, filename = ContentAttributes(path)
- self.assertEqual(attribs.filename, filename)
+ self.assertIsNotNone(attribs)
+ self.assertEqual(orig_filename, filename)
kcount = 0
@@ -73,10 +73,10 @@ class TestProjectFeatures(ChrysalideTestCase):
for k in model.keys():
path += '&e%s=%s' % (k, model[k])
- attribs = ContentAttributes(path)
- self.assertIsNotNone(attribs)
+ attribs, filename = ContentAttributes(path)
- self.assertIsNone(attribs.filename)
+ self.assertIsNotNone(attribs)
+ self.assertIsNone(filename)
kcount = 0
@@ -92,31 +92,31 @@ class TestProjectFeatures(ChrysalideTestCase):
path = '&&'
- attribs = ContentAttributes(path)
- self.assertIsNotNone(attribs)
+ attribs, filename = ContentAttributes(path)
- self.assertIsNone(attribs.filename)
+ self.assertIsNotNone(attribs)
+ self.assertIsNone(filename)
self.assertEqual(len(attribs.keys), 0)
path = '&&&'
- attribs = ContentAttributes(path)
- self.assertIsNotNone(attribs)
+ attribs, filename = ContentAttributes(path)
- self.assertIsNone(attribs.filename)
+ self.assertIsNotNone(attribs)
+ self.assertIsNone(filename)
self.assertEqual(len(attribs.keys), 0)
path = 'filename'
- attribs = ContentAttributes(path)
+ attribs, filename = ContentAttributes(path)
+
self.assertIsNotNone(attribs)
+ self.assertEqual(filename, path)
self.assertEqual(len(attribs.keys), 0)
- self.assertEqual(attribs.filename, path)
-
def testContentAttributesKeyAccess(self):
"""Test some access keys for content attributes."""
@@ -130,7 +130,7 @@ class TestProjectFeatures(ChrysalideTestCase):
for k in model.keys():
path += '&%s=%s' % (k, model[k])
- attribs = ContentAttributes(path)
+ attribs, _ = ContentAttributes(path)
self.assertIsNotNone(attribs)
with self.assertRaisesRegex(Exception, 'key must be a string value'):
diff --git a/tests/analysis/contents/checksum.py b/tests/analysis/contents/checksum.py
index 54e4630..fd0c3ed 100644
--- a/tests/analysis/contents/checksum.py
+++ b/tests/analysis/contents/checksum.py
@@ -56,7 +56,7 @@ class TestRestrictedContent(ChrysalideTestCase):
fcnt = FileContent(self._out.name)
self.assertIsNotNone(fcnt)
- start = vmpa(4, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(4, vmpa.VmpaSpecialValue.NO_VIRTUAL)
covered = mrange(start, 4) # 'BBBB'
rcnt = RestrictedContent(fcnt, covered)
diff --git a/tests/analysis/contents/endian.py b/tests/analysis/contents/endian.py
index 77ed77a..b7c8bec 100644
--- a/tests/analysis/contents/endian.py
+++ b/tests/analysis/contents/endian.py
@@ -6,7 +6,7 @@
from chrysacase import ChrysalideTestCase
-from pychrysalide import arch
+from pychrysalide.analysis import BinContent
from pychrysalide.analysis.contents import FileContent, RestrictedContent
from pychrysalide.arch import vmpa
import tempfile
@@ -53,38 +53,38 @@ class TestEndianness(ChrysalideTestCase):
# 16 bits
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
- val = fcnt.read_u16(start, arch.SRE_LITTLE_WORD)
+ val = fcnt.read_u16(start, BinContent.SourceEndian.LITTLE_WORD)
self.assertEqual(val, 0x1516)
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
- val = fcnt.read_u16(start, arch.SRE_BIG_WORD)
+ val = fcnt.read_u16(start, BinContent.SourceEndian.BIG_WORD)
self.assertEqual(val, 0x1615)
# 32 bits
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
- val = fcnt.read_u32(start, arch.SRE_LITTLE_WORD)
+ val = fcnt.read_u32(start, BinContent.SourceEndian.LITTLE_WORD)
self.assertEqual(val, 0x17181516)
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
- val = fcnt.read_u32(start, arch.SRE_BIG_WORD)
+ val = fcnt.read_u32(start, BinContent.SourceEndian.BIG_WORD)
self.assertEqual(val, 0x16151817)
# 64 bits
- start = vmpa(0, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(0, vmpa.VmpaSpecialValue.NO_VIRTUAL)
- val = fcnt.read_u64(start, arch.SRE_LITTLE_WORD)
+ val = fcnt.read_u64(start, BinContent.SourceEndian.LITTLE_WORD)
self.assertEqual(val, 0x0708050603040102)
- start = vmpa(0, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(0, vmpa.VmpaSpecialValue.NO_VIRTUAL)
- val = fcnt.read_u64(start, arch.SRE_BIG_WORD)
+ val = fcnt.read_u64(start, BinContent.SourceEndian.BIG_WORD)
self.assertEqual(val, 0x0201040306050807)
@@ -95,36 +95,36 @@ class TestEndianness(ChrysalideTestCase):
# 16 bits
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
- val = fcnt.read_u16(start, arch.SRE_LITTLE)
+ val = fcnt.read_u16(start, BinContent.SourceEndian.LITTLE)
self.assertEqual(val, 0x1615)
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
- val = fcnt.read_u16(start, arch.SRE_BIG)
+ val = fcnt.read_u16(start, BinContent.SourceEndian.BIG)
self.assertEqual(val, 0x1516)
# 32 bits
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
- val = fcnt.read_u32(start, arch.SRE_LITTLE)
+ val = fcnt.read_u32(start, BinContent.SourceEndian.LITTLE)
self.assertEqual(val, 0x18171615)
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
- val = fcnt.read_u32(start, arch.SRE_BIG)
+ val = fcnt.read_u32(start, BinContent.SourceEndian.BIG)
self.assertEqual(val, 0x15161718)
# 64 bits
- start = vmpa(0, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(0, vmpa.VmpaSpecialValue.NO_VIRTUAL)
- val = fcnt.read_u64(start, arch.SRE_LITTLE)
+ val = fcnt.read_u64(start, BinContent.SourceEndian.LITTLE)
self.assertEqual(val, 0x0807060504030201)
- start = vmpa(0, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(0, vmpa.VmpaSpecialValue.NO_VIRTUAL)
- val = fcnt.read_u64(start, arch.SRE_BIG)
+ val = fcnt.read_u64(start, BinContent.SourceEndian.BIG)
self.assertEqual(val, 0x0102030405060708)
diff --git a/tests/analysis/contents/memory.py b/tests/analysis/contents/memory.py
index 55ce035..f99e607 100644
--- a/tests/analysis/contents/memory.py
+++ b/tests/analysis/contents/memory.py
@@ -7,7 +7,7 @@
from chrysacase import ChrysalideTestCase
-from pychrysalide import arch
+from pychrysalide.analysis import BinContent
from pychrysalide.analysis.contents import MemoryContent
from pychrysalide.arch import vmpa, mrange
@@ -29,7 +29,7 @@ class TestMemoryContent(ChrysalideTestCase):
cnt = MemoryContent(data)
- start = vmpa(4, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(4, vmpa.VmpaSpecialValue.NO_VIRTUAL)
val = cnt.read_u8(start)
self.assertEqual(val, 0x05)
@@ -37,14 +37,14 @@ class TestMemoryContent(ChrysalideTestCase):
val = cnt.read_u8(start)
self.assertEqual(val, 0x06)
- start = vmpa(14, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(14, vmpa.VmpaSpecialValue.NO_VIRTUAL)
- val = cnt.read_u16(start, arch.SRE_LITTLE)
+ val = cnt.read_u16(start, BinContent.SourceEndian.LITTLE)
self.assertEqual(val, 0x1817)
- start = vmpa(10, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(10, vmpa.VmpaSpecialValue.NO_VIRTUAL)
- val = cnt.read_u32(start, arch.SRE_LITTLE)
+ val = cnt.read_u32(start, BinContent.SourceEndian.LITTLE)
self.assertEqual(val, 0x16150013)
@@ -57,10 +57,10 @@ class TestMemoryContent(ChrysalideTestCase):
with self.assertRaisesRegex(Exception, 'Invalid read access.'):
- start = vmpa(1, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(1, vmpa.VmpaSpecialValue.NO_VIRTUAL)
val = cnt.read_u8(start)
with self.assertRaisesRegex(Exception, 'Invalid read access.'):
- start = vmpa(0, vmpa.VMPA_NO_VIRTUAL)
- val = cnt.read_u16(start, arch.SRE_LITTLE)
+ start = vmpa(0, vmpa.VmpaSpecialValue.NO_VIRTUAL)
+ val = cnt.read_u16(start, BinContent.SourceEndian.LITTLE)
diff --git a/tests/analysis/contents/restricted.py b/tests/analysis/contents/restricted.py
index 08aa968..023e600 100644
--- a/tests/analysis/contents/restricted.py
+++ b/tests/analysis/contents/restricted.py
@@ -7,7 +7,7 @@
from chrysacase import ChrysalideTestCase
-from pychrysalide import arch
+from pychrysalide.analysis import BinContent
from pychrysalide.analysis.contents import FileContent, RestrictedContent
from pychrysalide.arch import vmpa, mrange
import tempfile
@@ -52,7 +52,7 @@ class TestRestrictedContent(ChrysalideTestCase):
fcnt = FileContent(self._out.name)
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
covered = mrange(start, 12) # 0x15 ... 0x28
rcnt = RestrictedContent(fcnt, covered)
@@ -64,10 +64,10 @@ class TestRestrictedContent(ChrysalideTestCase):
val = rcnt.read_u8(start)
self.assertEqual(val, 0x16)
- val = rcnt.read_u16(start, arch.SRE_LITTLE)
+ val = rcnt.read_u16(start, BinContent.SourceEndian.LITTLE)
self.assertEqual(val, 0x1817)
- val = rcnt.read_u32(start, arch.SRE_LITTLE)
+ val = rcnt.read_u32(start, BinContent.SourceEndian.LITTLE)
self.assertEqual(val, 0x24232221)
@@ -76,7 +76,7 @@ class TestRestrictedContent(ChrysalideTestCase):
fcnt = FileContent(self._out.name)
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
covered = mrange(start, 12) # 0x15 ... 0x28
rcnt = RestrictedContent(fcnt, covered)
@@ -100,42 +100,42 @@ class TestRestrictedContent(ChrysalideTestCase):
fcnt = FileContent(self._out.name)
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
covered = mrange(start, 12) # 0x15 ... 0x28
rcnt = RestrictedContent(fcnt, covered)
self.assertIsNotNone(rcnt)
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
val = rcnt.read_u8(start)
self.assertEqual(val, 0x15)
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
- val = rcnt.read_u16(start, arch.SRE_LITTLE)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
+ val = rcnt.read_u16(start, BinContent.SourceEndian.LITTLE)
self.assertEqual(val, 0x1615)
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
- val = rcnt.read_u32(start, arch.SRE_LITTLE)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
+ val = rcnt.read_u32(start, BinContent.SourceEndian.LITTLE)
self.assertEqual(val, 0x18171615)
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
- val = rcnt.read_u64(start, arch.SRE_LITTLE)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
+ val = rcnt.read_u64(start, BinContent.SourceEndian.LITTLE)
self.assertEqual(val, 0x2423222118171615)
- start = vmpa(23, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(23, vmpa.VmpaSpecialValue.NO_VIRTUAL)
val = rcnt.read_u8(start)
self.assertEqual(val, 0x28)
- start = vmpa(22, vmpa.VMPA_NO_VIRTUAL)
- val = rcnt.read_u16(start, arch.SRE_LITTLE)
+ start = vmpa(22, vmpa.VmpaSpecialValue.NO_VIRTUAL)
+ val = rcnt.read_u16(start, BinContent.SourceEndian.LITTLE)
self.assertEqual(val, 0x2827)
- start = vmpa(20, vmpa.VMPA_NO_VIRTUAL)
- val = rcnt.read_u32(start, arch.SRE_LITTLE)
+ start = vmpa(20, vmpa.VmpaSpecialValue.NO_VIRTUAL)
+ val = rcnt.read_u32(start, BinContent.SourceEndian.LITTLE)
self.assertEqual(val, 0x28272625)
- start = vmpa(16, vmpa.VMPA_NO_VIRTUAL)
- val = rcnt.read_u64(start, arch.SRE_LITTLE)
+ start = vmpa(16, vmpa.VmpaSpecialValue.NO_VIRTUAL)
+ val = rcnt.read_u64(start, BinContent.SourceEndian.LITTLE)
self.assertEqual(val, 0x2827262524232221)
@@ -144,41 +144,41 @@ class TestRestrictedContent(ChrysalideTestCase):
fcnt = FileContent(self._out.name)
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
covered = mrange(start, 12) # 0x15 ... 0x28
rcnt = RestrictedContent(fcnt, covered)
self.assertIsNotNone(rcnt)
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
val = rcnt.read_raw(start, 1)
self.assertEqual(val, b'\x15')
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
val = rcnt.read_raw(start, 2)
self.assertEqual(val, b'\x15\x16')
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
val = rcnt.read_raw(start, 4)
self.assertEqual(val, b'\x15\x16\x17\x18')
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
val = rcnt.read_raw(start, 8)
self.assertEqual(val, b'\x15\x16\x17\x18\x21\x22\x23\x24')
- start = vmpa(23, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(23, vmpa.VmpaSpecialValue.NO_VIRTUAL)
val = rcnt.read_raw(start, 1)
self.assertEqual(val, b'\x28')
- start = vmpa(22, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(22, vmpa.VmpaSpecialValue.NO_VIRTUAL)
val = rcnt.read_raw(start, 2)
self.assertEqual(val, b'\x27\x28')
- start = vmpa(20, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(20, vmpa.VmpaSpecialValue.NO_VIRTUAL)
val = rcnt.read_raw(start, 4)
self.assertEqual(val, b'\x25\x26\x27\x28')
- start = vmpa(16, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(16, vmpa.VmpaSpecialValue.NO_VIRTUAL)
val = rcnt.read_raw(start, 8)
self.assertEqual(val, b'\x21\x22\x23\x24\x25\x26\x27\x28')
@@ -188,7 +188,7 @@ class TestRestrictedContent(ChrysalideTestCase):
fcnt = FileContent(self._out.name)
- start = vmpa(12, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
covered = mrange(start, 12) # 0x15 ... 0x28
rcnt = RestrictedContent(fcnt, covered)
@@ -196,15 +196,29 @@ class TestRestrictedContent(ChrysalideTestCase):
with self.assertRaisesRegex(Exception, 'Invalid read access.'):
- start = vmpa(1, vmpa.VMPA_NO_VIRTUAL)
+ start = vmpa(1, vmpa.VmpaSpecialValue.NO_VIRTUAL)
val = rcnt.read_u8(start)
with self.assertRaisesRegex(Exception, 'Invalid read access.'):
- start = vmpa(11, vmpa.VMPA_NO_VIRTUAL)
- val = rcnt.read_u16(start, arch.SRE_LITTLE)
+ start = vmpa(11, vmpa.VmpaSpecialValue.NO_VIRTUAL)
+ val = rcnt.read_u16(start, BinContent.SourceEndian.LITTLE)
with self.assertRaisesRegex(Exception, 'Invalid read access.'):
- start = vmpa(23, vmpa.VMPA_NO_VIRTUAL)
- val = rcnt.read_u16(start, arch.SRE_LITTLE)
+ start = vmpa(23, vmpa.VmpaSpecialValue.NO_VIRTUAL)
+ val = rcnt.read_u16(start, BinContent.SourceEndian.LITTLE)
+
+
+ def testDescription(self):
+ """Ensure restriction range is described."""
+
+ fcnt = FileContent(self._out.name)
+
+ start = vmpa(12, vmpa.VmpaSpecialValue.NO_VIRTUAL)
+ covered = mrange(start, 1)
+
+ rcnt = RestrictedContent(fcnt, covered)
+ self.assertIsNotNone(rcnt)
+
+ self.assertTrue(rcnt.describe().endswith(' [0xc:0xd]'))
diff --git a/tests/analysis/db/analyst.py b/tests/analysis/db/analyst.py
new file mode 100644
index 0000000..7347810
--- /dev/null
+++ b/tests/analysis/db/analyst.py
@@ -0,0 +1,188 @@
+
+from chrysacase import ChrysalideTestCase
+import pychrysalide
+from pychrysalide.analysis.contents import MemoryContent
+from pychrysalide.analysis.db import certs
+from pychrysalide.analysis.db import AdminClient, AnalystClient
+from pychrysalide.analysis.db import HubServer
+import os
+import shutil
+import sys
+import tempfile
+import threading
+
+
+class TestDbConnection(ChrysalideTestCase):
+ """TestCase for analysis.db."""
+
+ @classmethod
+ def setUpClass(cls):
+
+ super(TestDbConnection, cls).setUpClass()
+
+ cls.log('Compile binary "strings" if needed...')
+
+ fullname = sys.modules[cls.__module__].__file__
+ dirpath = os.path.dirname(fullname)
+
+ cls._bin_path = os.path.realpath(dirpath + '/../../format/elf/')
+
+ os.system('make -C %s strings > /dev/null 2>&1' % cls._bin_path)
+
+ cls._tmp_path = tempfile.mkdtemp()
+
+ cls._server_path = '%s/.chrysalide/servers/localhost-9999/' % cls._tmp_path
+ os.makedirs(cls._server_path)
+
+ cls._server_authorized_path = '%s/authorized/' % cls._server_path
+ os.makedirs(cls._server_authorized_path)
+
+ cls._client_path = '%s/.chrysalide/clients/' % cls._tmp_path
+ os.makedirs(cls._client_path)
+
+ cls._client_cert_path = '%s/localhost-9999/' % cls._client_path
+ os.makedirs(cls._client_cert_path)
+
+ cls.log('Using temporary directory "%s"' % cls._tmp_path)
+
+
+ @classmethod
+ def tearDownClass(cls):
+
+ super(TestDbConnection, cls).tearDownClass()
+
+ cls.log('Delete built binaries...')
+
+ os.system('make -C %s clean > /dev/null 2>&1' % cls._bin_path)
+
+ ## os.system('ls -laihR %s' % cls._tmp_path)
+
+ cls.log('Delete directory "%s"' % cls._tmp_path)
+
+ shutil.rmtree(cls._tmp_path)
+
+
+ def testServerListening(self):
+ """List binaries available from server."""
+
+
+ from pychrysalide import core
+ core.set_verbosity(0)
+
+
+
+ identity = {
+
+ 'C': 'FR',
+ 'CN': 'Test authority'
+
+ }
+
+ ret = certs.build_keys_and_ca(self._server_path, 'ca', 3650 * 24 * 60 * 60, identity)
+ self.assertTrue(ret)
+
+ identity = {
+
+ 'C': 'FR',
+ 'CN': 'Test server'
+
+ }
+
+ ret = certs.build_keys_and_request(self._server_path, 'server', identity);
+ self.assertTrue(ret)
+
+
+ ret = certs.sign_cert('%s/server-csr.pem' % self._server_path, '%s/ca-cert.pem' % self._server_path, \
+ '%s/ca-key.pem' % self._server_path, '%s/server-cert.pem' % self._server_path, \
+ 3650 * 24 * 60 * 60)
+ self.assertTrue(ret)
+
+ identity = {
+
+ 'C': 'FR',
+ 'CN': 'Test admin'
+
+ }
+
+ ret = certs.build_keys_and_request(self._client_path, 'client', identity);
+ self.assertTrue(ret)
+
+ ret = certs.sign_cert('%s/client-csr.pem' % self._client_path, '%s/ca-cert.pem' % self._server_path, \
+ '%s/ca-key.pem' % self._server_path, '%s/client-cert.pem' % self._client_cert_path, \
+ 3650 * 24 * 60 * 60)
+ self.assertTrue(ret)
+
+ shutil.copy('%s/ca-cert.pem' % self._server_path,
+ '%s/ca-cert.pem' % self._client_cert_path)
+
+ shutil.copy('%s/client-cert.pem' % self._client_cert_path,
+ '%s/client-cert.pem' % self._server_authorized_path)
+
+
+ os.environ['XDG_CONFIG_HOME'] = self._tmp_path
+ os.environ['HOME'] = self._tmp_path
+
+ server = HubServer('localhost', '9999')
+
+ #print(server)
+
+ ret = server.start()
+
+ #print(ret)
+
+
+
+
+ # admin = AdminClient()
+
+ # ret = admin.start('localhost', '9999')
+ # self.assertTrue(ret)
+
+ # def _on_existing_binaries_updated(adm, evt):
+ # evt.set()
+
+ # event = threading.Event()
+
+ # admin.connect('existing-binaries-updated', _on_existing_binaries_updated, event)
+
+ # ret = admin.request_existing_binaries()
+ # self.assertTrue(ret)
+
+ # event.wait()
+
+ # self.assertEqual(len(admin.existing_binaries), 0)
+
+
+
+ cnt = MemoryContent(b'A' * 400 * 1024)
+
+
+
+ analyst = AnalystClient(cnt.checksum, "elf", [])
+
+
+
+ def _on_server_status_changed(analyst, hint, evt):
+ print(hint)
+ evt.set()
+
+
+ event = threading.Event()
+
+ analyst.connect('server-status-changed', _on_server_status_changed, event)
+
+
+
+ ret = analyst.start('localhost', '9999')
+ self.assertTrue(ret)
+
+
+ event.wait()
+
+ event.clear()
+
+ ret = analyst.send_content(cnt)
+ self.assertTrue(ret)
+
+
+ event.wait()
diff --git a/tests/analysis/db/conn.py b/tests/analysis/db/conn.py
index f388f60..248a036 100644
--- a/tests/analysis/db/conn.py
+++ b/tests/analysis/db/conn.py
@@ -1,7 +1,8 @@
from chrysacase import ChrysalideTestCase
+from pychrysalide.analysis.contents import MemoryContent
from pychrysalide.analysis.db import certs
-from pychrysalide.analysis.db import AdminClient
+from pychrysalide.analysis.db import AdminClient, AnalystClient
from pychrysalide.analysis.db import HubServer
import os
import shutil
@@ -51,7 +52,7 @@ class TestDbConnection(ChrysalideTestCase):
from pychrysalide import core
- #core.set_verbosity(0)
+ core.set_verbosity(0)
@@ -117,21 +118,39 @@ class TestDbConnection(ChrysalideTestCase):
- admin = AdminClient()
+ # admin = AdminClient()
- ret = admin.start('localhost', '9999')
- self.assertTrue(ret)
+ # ret = admin.start('localhost', '9999')
+ # self.assertTrue(ret)
+
+ # def _on_existing_binaries_updated(adm, evt):
+ # evt.set()
+
+ # event = threading.Event()
+
+ # admin.connect('existing-binaries-updated', _on_existing_binaries_updated, event)
+
+ # ret = admin.request_existing_binaries()
+ # self.assertTrue(ret)
+
+ # event.wait()
- def _on_existing_binaries_updated(adm, evt):
- evt.set()
+ # self.assertEqual(len(admin.existing_binaries), 0)
- event = threading.Event()
- admin.connect('existing-binaries-updated', _on_existing_binaries_updated, event)
- ret = admin.request_existing_binaries()
+ cnt = MemoryContent(b'A' * 400 * 1024)
+
+ print(cnt)
+
+ print(len(cnt.data))
+
+
+ analyst = AnalystClient(cnt.checksum, [])
+
+ ret = analyst.start('localhost', '9999')
self.assertTrue(ret)
- event.wait()
- self.assertEqual(len(admin.existing_binaries), 0)
+ ret = analyst.send_content(cnt)
+ self.assertTrue(ret)
diff --git a/tests/analysis/scan/booleans.py b/tests/analysis/scan/booleans.py
new file mode 100644
index 0000000..aa3c1a3
--- /dev/null
+++ b/tests/analysis/scan/booleans.py
@@ -0,0 +1,98 @@
+
+from common import RostTestClass
+
+
+class TestRostBooleans(RostTestClass):
+ """TestCases for booleans and ROST."""
+
+ def testFinalCondition(self):
+ """Validate the final condition."""
+
+ rule = '''
+rule test {
+
+ condition:
+ false
+
+}
+'''
+
+ self.check_rule_failure(rule)
+
+
+ rule = '''
+rule test {
+
+ condition:
+ true
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ def testBasicBooleanOperations(self):
+ """Evaluate basic boolean operations."""
+
+ rule = '''
+rule test {
+
+ condition:
+ true and false
+
+}
+'''
+
+ self.check_rule_failure(rule)
+
+
+ rule = '''
+rule test {
+
+ condition:
+ true or false
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ def testImplicitCast(self):
+ """Imply implicit casts to booleans."""
+
+ rule = '''
+rule test {
+
+ condition:
+ true and 0
+
+}
+'''
+
+ self.check_rule_failure(rule)
+
+
+ rule = '''
+rule test {
+
+ condition:
+ 1 or false
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ rule = '''
+rule test {
+
+ condition:
+ 1 or ()
+
+}
+'''
+
+ self.check_rule_success(rule)
diff --git a/tests/analysis/scan/common.py b/tests/analysis/scan/common.py
new file mode 100644
index 0000000..507b7e2
--- /dev/null
+++ b/tests/analysis/scan/common.py
@@ -0,0 +1,54 @@
+
+from chrysacase import ChrysalideTestCase
+from pychrysalide.analysis.contents import MemoryContent
+from pychrysalide.analysis.scan import ContentScanner
+from pychrysalide.analysis.scan import ScanOptions
+from pychrysalide.analysis.scan.patterns.backends import AcismBackend
+
+
+class RostTestClass(ChrysalideTestCase):
+ """TestCase for analysis.scan.ScanExpression."""
+
+ @classmethod
+ def setUpClass(cls):
+
+ super(RostTestClass, cls).setUpClass()
+
+ cls._options = ScanOptions()
+ cls._options.backend_for_data = AcismBackend
+
+ cls._empty_content = MemoryContent(b'')
+
+
+ def _validate_rule_result(self, rule, content, expected):
+ """Check for scan success or failure."""
+
+ scanner = ContentScanner(rule)
+ ctx = scanner.analyze(self._options, content)
+
+ self.assertIsNotNone(ctx)
+
+ if expected:
+ self.assertTrue(ctx.has_match_for_rule('test'))
+ else:
+ self.assertFalse(ctx.has_match_for_rule('test'))
+
+ return scanner, ctx
+
+
+ def check_rule_success(self, rule, content = None):
+ """Check for scan success."""
+
+ if content is None:
+ content = self._empty_content
+
+ self._validate_rule_result(rule, content, True)
+
+
+ def check_rule_failure(self, rule, content = None):
+ """Check for scan failure."""
+
+ if content is None:
+ content = self._empty_content
+
+ self._validate_rule_result(rule, content, False)
diff --git a/tests/analysis/scan/examples.py b/tests/analysis/scan/examples.py
new file mode 100644
index 0000000..74b5094
--- /dev/null
+++ b/tests/analysis/scan/examples.py
@@ -0,0 +1,70 @@
+
+from common import RostTestClass
+from pychrysalide.analysis.contents import MemoryContent
+
+
+class TestRostExamples(RostTestClass):
+ """TestCases for the examples provides in the ROST documentation."""
+
+ def testComments(self):
+ """Ensure comments do not bother rule definitions."""
+
+ rule = '''
+/*
+ Multi-line header...
+*/
+
+rule test { // comment
+
+ /*
+ * Some context
+ */
+
+ condition: /* List of condition(s) */
+ true // Dummy condition
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ def testArithmeticPrecedence(self):
+ """Take care of arithmetic operators precedence."""
+
+ rule = '''
+rule test { // MulFirst
+
+ condition:
+ 1 + 4 * (3 + 2) == 21
+ and
+ (1 + 4) * (3 + 2) == 25
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ def testUintCast(self):
+ """Process nested integer values from binary content."""
+
+ cnt = MemoryContent(b'\x4d\x5a\x00\x00' + b'\x50\x45\x00\x00' + 52 * b'\x00' + b'\x04\x00\x00\x00')
+
+ rule = '''
+rule test { // IsPE
+
+ condition:
+
+ // MZ signature at offset 0 and ...
+
+ uint16(0) == 0x5a4d and
+
+ // ... PE signature at offset stored in the MZ header at offset 0x3c
+
+ uint32(uint32(0x3c)) == 0x00004550
+
+}
+'''
+
+ self.check_rule_success(rule, cnt)
diff --git a/tests/analysis/scan/functions.py b/tests/analysis/scan/functions.py
new file mode 100644
index 0000000..6aca957
--- /dev/null
+++ b/tests/analysis/scan/functions.py
@@ -0,0 +1,239 @@
+
+from common import RostTestClass
+from pychrysalide.analysis.contents import MemoryContent
+
+
+class TestRostFunctions(RostTestClass):
+ """TestCases for the core functions of ROST."""
+
+ # Core
+ # ====
+
+ def testSetCounter(self):
+ """Count quantities and set sizes."""
+
+ rule = '''
+rule test {
+
+ condition:
+ count("ABC") == 3
+ and count("AB", "C") == count("ABC")
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ cnt = MemoryContent(b'\x01\x02\x02\x03\x03\x03')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $int_01 = "\x01"
+ $int_02 = "\x02"
+ $int_3 = "\x03"
+
+ condition:
+ count($int_0*, $int_3) == #int_*
+
+}
+'''
+
+ self.check_rule_success(rule, cnt)
+
+
+ def testDatasize(self):
+ """Handle the size of the provided data."""
+
+ cnt = MemoryContent(b'\x01\x02\x03\x04')
+
+ cases = [
+ 'datasize == 4',
+ 'uint16(0) == 0x201 and uint16(datasize - 2) == 0x0403',
+ ]
+
+ for c in cases:
+
+ rule = '''
+rule test {
+
+ condition:
+ %s
+
+}
+''' % c
+
+ self.check_rule_success(rule, cnt)
+
+
+ def testMaxCommon(self):
+ """Count the largest quantity of same items in a set."""
+
+ cnt = MemoryContent(b'')
+
+ cases = [
+ [ '1', 1 ],
+ [ '1, 2, 3', 1 ],
+ [ '1, 2, 1, 3, 1', 3 ],
+ [ '1, "a", 2, 3, "a"', 2 ],
+ ]
+
+ for c, q in cases:
+
+ rule = '''
+rule test {
+
+ condition:
+ maxcommon(%s) == %u
+
+}
+''' % (c, q)
+
+ self.check_rule_success(rule, cnt)
+
+
+ # Modules
+ # =======
+
+ def testConsole(self):
+ """Ensure logging always returns true."""
+
+ rule = '''
+rule test {
+
+ condition:
+ console.log()
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ def testMagic(self):
+ """Scan text content with the Magic module."""
+
+ cnt = MemoryContent(b'aaaa')
+
+ cases = [
+ [ 'type', 'ASCII text, with no line terminators' ],
+ [ 'mime_encoding', 'us-ascii' ],
+ [ 'mime_type', 'text/plain' ],
+ ]
+
+ for target, expected in cases:
+
+ rule = '''
+rule test {
+
+ condition:
+ magic.%s() == "%s"
+
+}
+''' % (target, expected)
+
+ self.check_rule_success(rule, cnt)
+
+
+ def testMathOperations(self):
+ """Perform math operations with core functions."""
+
+ rule = '''
+rule test {
+
+ condition:
+ math.to_string(123) == "123"
+ and math.to_string(291, 16) == "0x123"
+ and math.to_string(-83, 8) == "-0123"
+ and math.to_string(123, 2) == "0b1111011"
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ def testStringOperations(self):
+ """Perform string operations with core functions."""
+
+ rule = '''
+rule test {
+
+ condition:
+ string.lower("ABCd") == "abcd" and string.lower("123abc") == "123abc"
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ rule = '''
+rule test {
+
+ condition:
+ string.upper("abcD") == "ABCD" and string.upper("123ABC") == "123ABC"
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ rule = '''
+rule test {
+
+ condition:
+ string.to_int("123") == 123
+ and string.to_int("123", 16) == 291
+ and string.to_int("0x123") == 291
+ and string.to_int("-0123") == -83
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ rule = r'''
+rule test {
+
+ condition:
+ "A\x00B\x00C\x00D\x00" endswith string.wide("CD")
+ and "A\x00B\x00C\x00D\x00" contains string.wide("BC")
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ def testTime(self):
+ """Check current time."""
+
+ # Cf. https://www.epochconverter.com/
+
+ rule = '''
+rule test {
+
+ condition:
+ time.make(2023, 8, 5, 22, 8, 41) == 0x64cec869
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ rule = '''
+rule test {
+
+ condition:
+ time.now() >= 0x64cec874 and time.now() <= time.now()
+
+}
+'''
+
+ self.check_rule_success(rule)
diff --git a/tests/analysis/scan/fuzzing.py b/tests/analysis/scan/fuzzing.py
new file mode 100644
index 0000000..1b9b25b
--- /dev/null
+++ b/tests/analysis/scan/fuzzing.py
@@ -0,0 +1,289 @@
+
+from common import RostTestClass
+from pychrysalide.analysis.contents import MemoryContent
+from pychrysalide.analysis.scan import ContentScanner
+from pychrysalide.analysis.scan import ScanOptions
+from pychrysalide.analysis.scan.patterns.backends import AcismBackend
+from pychrysalide.analysis.scan.patterns.backends import BitapBackend
+
+
+class TestRostFuzzingFixes(RostTestClass):
+ """TestCases to remember all the fixes for crashes identified by fuzzing."""
+
+ def testEmptyPatternListWithContent(self):
+ """Check no backend is run if there is no pattern to look for."""
+
+ content = MemoryContent(b'\n')
+
+ rule = '''
+'''
+
+ backends = [
+ AcismBackend, # This one was segfaulting
+ BitapBackend,
+ ]
+
+ for b in backends:
+
+ options = ScanOptions()
+ options.backend_for_data = b
+
+ scanner = ContentScanner(rule)
+ ctx = scanner.analyze(options, content)
+
+ self.assertIsNotNone(ctx)
+
+
+ def testMandatoryCondition(self):
+ """Ensure a condition section exists in a rule."""
+
+ rule = '''
+rule test {
+
+}
+'''
+
+ with self.assertRaisesRegex(ValueError, 'Unable to create content scanner'):
+
+ scanner = ContentScanner(rule)
+
+
+ def testNonExistingPattern(self):
+ """Avoid to count the matches of a non-existing pattern."""
+
+ rule = '''
+rule test {
+
+ condition:
+ #badid
+
+}
+'''
+
+ with self.assertRaisesRegex(ValueError, 'Unable to create content scanner'):
+
+ scanner = ContentScanner(rule)
+
+
+ def testNamespacesWithoutReductionCode(self):
+ """Clean the code for ROST namespaces."""
+
+ rule = '''
+rule test {
+
+ condition:
+ console
+
+}
+'''
+
+ self.check_rule_failure(rule)
+
+
+ def testCallOnNonCallable(self):
+ """Reject calls on non callable expressions softly."""
+
+ rule = '''
+rule test {
+
+ condition:
+ console.log().log()
+
+}
+'''
+
+ self.check_rule_failure(rule)
+
+
+ def testSelfReferencingRule(self):
+ """Reject any rule referencing itself as match condition."""
+
+ rule = '''
+rule test {
+
+ condition:
+ test
+
+}
+'''
+
+ self.check_rule_failure(rule)
+
+
+ def testSelfReferencingRule(self):
+ """Expect only one argument for the not operator, even in debug mode."""
+
+ rule = '''
+rule test {
+
+ condition:
+ not(0)
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ def testNoCommon(self):
+ """Handle the case where no common item is found from an empty set."""
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = "a"
+
+ condition:
+ maxcommon($a) == 0
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ def testAAsAcharacter(self):
+ """Consider the 'a' character as a valid lowercase character."""
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = "0000a0I0" nocase
+
+ condition:
+ $a
+
+}
+'''
+
+ self.check_rule_failure(rule)
+
+
+ def testAAsAcharacter(self):
+ """Do not expect initialized trackers when there is no real defined search pattern."""
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = {[0]}
+
+ condition:
+ $a
+
+}
+'''
+
+ with self.assertRaisesRegex(ValueError, 'Unable to create content scanner'):
+
+ scanner = ContentScanner(rule)
+
+
+ def testAllocations(self):
+ """Handle big alloctions for strings in conditions with regular expressions."""
+
+ rule = '''
+rule test {
+
+ condition:
+ "%s" == "%s"
+
+}
+''' % ("0" * (256 * 2 + 8), "0" * (256 * 2 + 8))
+
+ self.check_rule_success(rule)
+
+
+ def testFileFinalAccess(self):
+ """Ensure patterns found at the edges of scanned content do not crash the scanner."""
+
+ cnt = MemoryContent(bytes([ 0 for i in range(16) ]))
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 00 00 00 00 00 00 00 00 }
+
+ condition:
+ $a
+
+}
+'''
+
+ self.check_rule_success(rule, cnt)
+
+
+ def testValidHexRangeMerge(self):
+ """Merge valid hexadecimal ranges."""
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { [0] ?? }
+
+ condition:
+ $a
+
+}
+'''
+
+ with self.assertRaisesRegex(ValueError, 'Unable to create content scanner'):
+
+ scanner = ContentScanner(rule)
+
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { [2] ?? }
+
+ condition:
+ $a
+
+}
+'''
+
+ self.check_rule_failure(rule)
+
+
+ def testSmallBase64(self):
+ """Handle small base64 encodings which may produce few patterns."""
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = "0" base64
+
+ condition:
+ $a
+
+}
+'''
+
+ self.check_rule_failure(rule)
+
+
+ def testCountIndex(self):
+ """Ban pattern count indexes from the grammer."""
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = "1"
+
+ condition:
+ #*[0]
+
+}
+'''
+
+ with self.assertRaisesRegex(ValueError, 'Unable to create content scanner'):
+
+ scanner = ContentScanner(rule)
diff --git a/tests/analysis/scan/grammar.py b/tests/analysis/scan/grammar.py
new file mode 100644
index 0000000..14f67fa
--- /dev/null
+++ b/tests/analysis/scan/grammar.py
@@ -0,0 +1,484 @@
+
+import json
+
+from common import RostTestClass
+from pychrysalide.analysis.contents import MemoryContent
+
+
+class TestRostGrammar(RostTestClass):
+ """TestCases for the ROST grammar."""
+
+ def testRelationalExpressions(self):
+ """Build expressions with relational comparisons."""
+
+ cases = [
+
+ # Regular
+ [ '-1', '<=', '2', True ],
+ [ '-1', '<=', '2', True ],
+ [ '"aaa"', '==', '"aaa"', True ],
+ [ '"aaa"', '<', '"aaaa"', True ],
+ [ '""', '<', '"aaaa"', True ],
+
+ # Cast
+ [ 'false', '==', '0', True ],
+ [ 'false', '==', '1', False ],
+ [ 'true', '!=', '0', True ],
+ [ '1', '==', 'true', True ],
+ [ 'false', '==', '()', True ],
+ [ 'true', '==', '(0,)', True ],
+
+ ]
+
+ for op1, kwd, op2, expected in cases:
+
+ rule = '''
+rule test {
+
+ condition:
+ %s %s %s
+
+}
+''' % (op1, kwd, op2)
+
+ if expected:
+ self.check_rule_success(rule)
+ else:
+ self.check_rule_failure(rule)
+
+
+ def testLogicalOperations(self):
+ """Evaluate some logical operations."""
+
+ cases = [
+ [ 'true and false', False ],
+ [ 'false or false', False ],
+ [ 'true and true or false', True ],
+ [ 'false or true and false', False ],
+ [ '1 or false', True ],
+ ]
+
+ for cond, expected in cases:
+
+ rule = '''
+rule test {
+
+ condition:
+ %s
+
+}
+''' % (cond)
+
+ if expected:
+ self.check_rule_success(rule)
+ else:
+ self.check_rule_failure(rule)
+
+
+ def testArithmeticOperations(self):
+ """Evaluate some arithmetic operations."""
+
+ cases = [
+
+ # Clever
+ '1 + 2 == 3',
+ '10 + -3 == 7',
+ '-3 + 10 == 7',
+ '-10 - 1 < 0',
+ '-10 - 1 == -11',
+ '(-10 - 1) == -11',
+ '(-1 - -10) == 9',
+ '-2 * -3 == 6',
+ '-2 * 3 == -6',
+
+ # Legacy
+ '1 + 4 * 3 + 2 == 15',
+ '(1 + 4) * 3 + 2 == 17',
+ '1 + 4 * (3 + 2) == 21',
+ '(1 + 4) * (3 + 2) == 25',
+
+ ]
+
+ for c in cases:
+
+ rule = '''
+rule test {
+
+ condition:
+ %s
+
+}
+''' % (c)
+
+ self.check_rule_success(rule)
+
+
+ def testBasicStringsOperations(self):
+ """Build expressions with basic strings operations."""
+
+ cases = [
+
+ # Clever
+ [ '123---456', 'contains', '---', True ],
+ [ '123---456', 'contains', 'xxx', False ],
+ [ '---123---456', 'startswith', '---', True ],
+ [ '---123---456', 'startswith', 'xxx', False ],
+ [ '123---456---', 'endswith', '---', True ],
+ [ '123---456---', 'endswith', 'xxx', False ],
+ [ 'AAA---BBB', 'icontains', 'aaa', True ],
+ [ 'AAA---BBB', 'icontains', 'xxx', False ],
+ [ 'AAA---BBB', 'istartswith', 'aAa', True ],
+ [ 'AAA---BBB', 'istartswith', 'xxx', False ],
+ [ 'AAA---BBB', 'iendswith', 'bBb', True ],
+ [ 'AAA---BBB', 'iendswith', 'xxx', False ],
+ [ 'AzertY', 'iequals', 'AZERTY', True ],
+ [ 'AzertY', 'iequals', 'AZERTY-', False ],
+
+ # Legacy
+ [ '123\t456', 'contains', '\t', True ],
+ [ '123-456', 'startswith', '1', True ],
+ [ '123-456', 'startswith', '1234', False ],
+ [ '123-456', 'endswith', '6', True ],
+ [ '123-456', 'endswith', '3456', False ],
+
+ ]
+
+ for op1, kwd, op2, expected in cases:
+
+ rule = '''
+rule test {
+
+ condition:
+ "%s" %s "%s"
+
+}
+''' % (op1, kwd, op2)
+
+ if expected:
+ self.check_rule_success(rule)
+ else:
+ self.check_rule_failure(rule)
+
+
+ def testSizeUnits(self):
+ """Evaluate size units."""
+
+ cases = [
+ '1KB == 1024',
+ '2MB == 2 * 1024 * 1024',
+ '4Kb == (4 * 1024)',
+ '1KB <= 1024 and 1024 < 1MB',
+ ]
+
+ for c in cases:
+
+ rule = '''
+rule test {
+
+ condition:
+ %s
+
+}
+''' % (c)
+
+ self.check_rule_success(rule)
+
+
+ def testPrivateRules(self):
+ """Ensure private rules remain silent."""
+
+ for private in [ True, False ]:
+ for state in [ True, False ]:
+
+ rule = '''
+%srule silent {
+
+ condition:
+ %s
+
+}
+
+rule test {
+
+ condition:
+ silent
+
+}
+''' % ('private ' if private else '', 'true' if state else 'false')
+
+ scanner, ctx = self._validate_rule_result(rule, self._empty_content, state)
+
+ data = scanner.convert_to_json(ctx)
+ jdata = json.loads(data)
+
+ # Exemple :
+ #
+ # [{'bytes_patterns': [], 'matched': True, 'name': 'test'},
+ # {'bytes_patterns': [], 'matched': True, 'name': 'silent'}]
+
+ found = len([ j['name'] for j in jdata if j['name'] == 'silent' ]) > 0
+
+ self.assertTrue(private ^ found)
+
+
+ def testGlobalRules(self):
+ """Take global rules into account."""
+
+ for glob_state in [ True, False ]:
+ for state in [ True, False ]:
+
+ rule = '''
+%srule silent {
+
+ condition:
+ %s
+
+}
+
+rule test {
+
+ condition:
+ true
+
+}
+''' % ('global ' if glob_state else '', 'true' if state else 'false')
+
+ expected = not(glob_state) or state
+
+ if expected:
+ self.check_rule_success(rule)
+ else:
+ self.check_rule_failure(rule)
+
+
+ def testMatchCount(self):
+ """Ensure match count provides expected values."""
+
+ cnt = MemoryContent(b'\x01\x02\x02\x03\x03\x03')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $int_01 = "\x01"
+ $int_02 = "\x02"
+ $int_03 = "\x03"
+
+ condition:
+ #int_01 == count($int_01) and #int_01 == 1
+ and #int_02 == count($int_02) and #int_02 == 2
+ and #int_03 == count($int_03) and #int_03 == 3
+ and #int_0* == count($int_0*) and #int_0* == 6
+
+}
+'''
+
+ self.check_rule_success(rule, cnt)
+
+
+ def testBackingUpHandlers(self):
+ """Ensure handlers for backing up removals do not limit the grammar."""
+
+ cnt = MemoryContent(b'AB12')
+
+ # Uncompleted token in rule definition: '?? ?? '
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?? ?? }
+
+ condition:
+ #a == 3
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+ # Uncompleted token in rule definition: '?? '
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?? 4? }
+
+ condition:
+ #a == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+ # Uncompleted token in rule definition: '?? ?'
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?? ?2 }
+
+ condition:
+ #a == 2
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+ # Uncompleted token in rule definition: '?? '
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?? 42 }
+
+ condition:
+ #a == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ # Uncompleted token in rule definition: '?1 ?'
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?1 ?? }
+
+ condition:
+ #a == 2
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+ # Uncompleted token in rule definition: '?1 4? '
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?1 4? }
+
+ condition:
+ #a == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+ # Uncompleted token in rule definition: '?1 ?2 '
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?1 ?2 }
+
+ condition:
+ #a == 2
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+ # Uncompleted token in rule definition: '?1 4'
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?1 42 }
+
+ condition:
+ #a == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ # Uncompleted token in rule definition: '41 '
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 41 ?? }
+
+ condition:
+ #a == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+ # Uncompleted token in rule definition: '41 4'
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 41 4? }
+
+ condition:
+ #a == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+ # Uncompleted token in rule definition: '41 '
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 41 ?2 }
+
+ condition:
+ #a == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+ # Uncompleted token in rule definition: '41 42 '
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 41 42 }
+
+ condition:
+ #a == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+
+
+# TODO : test <haystack> matches <regex>
+
+
+
diff --git a/tests/analysis/scan/matches.py b/tests/analysis/scan/matches.py
new file mode 100644
index 0000000..efcae4f
--- /dev/null
+++ b/tests/analysis/scan/matches.py
@@ -0,0 +1,64 @@
+
+from common import RostTestClass
+from pychrysalide.analysis.contents import MemoryContent
+
+
+class TestRostMatchs(RostTestClass):
+ """TestCases for the ROST pattern matching engine."""
+
+ def testCountMatches(self):
+ """Count matched patterns."""
+
+ cnt = MemoryContent(b'aaa aaa bbb aaa')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = "aaa"
+ $b = "bbb"
+
+ condition:
+ #a == 3 and #b < 2
+
+}
+'''
+
+ self.check_rule_success(rule, cnt)
+
+
+ def testCountSameMatches(self):
+ """Count matches of similar patterns."""
+
+ cnt = MemoryContent(b'ABCDabcdABCDabcd')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = "\x61\x62\x63\x64"
+ $b = "\x61\x62\x63\x64"
+
+ condition:
+ #a == 2 and #b == 2
+
+}
+'''
+
+ self.check_rule_success(rule, cnt)
+
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = "\x61\x62\x63\x64"
+ $b = "\x61\x62\x63"
+
+ condition:
+ #a == 2 and #b == 2
+
+}
+'''
+
+ self.check_rule_success(rule, cnt)
diff --git a/tests/analysis/scan/pyapi.py b/tests/analysis/scan/pyapi.py
new file mode 100644
index 0000000..7a697b3
--- /dev/null
+++ b/tests/analysis/scan/pyapi.py
@@ -0,0 +1,297 @@
+
+import binascii
+import struct
+
+from chrysacase import ChrysalideTestCase
+from gi._constants import TYPE_INVALID
+from pychrysalide.analysis.scan import ScanExpression
+from pychrysalide.analysis.scan import ScanOptions
+from pychrysalide.analysis.scan import find_token_modifiers_for_name
+from pychrysalide.glibext import ComparableItem
+
+
+class TestRostPythonAPI(ChrysalideTestCase):
+ """TestCase for the ROST Python API."""
+
+ def testEmptyOptions(self):
+ """Check default scan options."""
+
+ ops = ScanOptions()
+
+ self.assertEqual(ops.backend_for_data, TYPE_INVALID)
+
+
+ def testDirectInstancesOfExpression(self):
+ """Reject direct instances of ROST expressions."""
+
+ with self.assertRaisesRegex(RuntimeError, 'pychrysalide.analysis.scan.ScanExpression is an abstract class'):
+
+ e = ScanExpression()
+
+
+ def testBooleanComparison(self):
+ """Compare custom scan expressions."""
+
+ class StrLenExpr(ScanExpression):
+
+ def __init__(self, value):
+ super().__init__(ScanExpression.ScanReductionState.REDUCED)
+ self._value = value
+
+ def _cmp_rich(self, other, op):
+
+ if op == ComparableItem.RichCmpOperation.EQ:
+ return len(self._value) == len(other._value)
+
+
+ e0 = StrLenExpr('00000000000')
+
+ e1 = StrLenExpr('00000000000')
+
+ e2 = StrLenExpr('000000000000000000000000000')
+
+ self.assertTrue(e0 == e1)
+
+ # !?
+ # Python teste e0 != e1 (non implémenté), puis e1 != e0 (pareil) et en déduit une différence !
+ # self.assertFalse(e0 != e1)
+
+ self.assertFalse(e0 == e2)
+
+ # TypeError: '<' not supported between instances of 'StrLenExpr' and 'StrLenExpr'
+ with self.assertRaisesRegex(TypeError, '\'<\' not supported between instances'):
+ self.assertTrue(e0 < e1)
+
+
+ def testBytePatternModifiers(self):
+ """Validate the bytes produced by modifiers."""
+
+ mod = find_token_modifiers_for_name('plain')
+ self.assertIsNotNone(mod)
+
+ source = b'ABC'
+ transformed = mod.transform(source)
+
+ self.assertEqual(source, transformed[0])
+
+
+ mod = find_token_modifiers_for_name('hex')
+ self.assertIsNotNone(mod)
+
+ source = b'ABC'
+ transformed = mod.transform(source)
+
+ self.assertEqual(binascii.hexlify(source), transformed[0])
+
+
+ mod = find_token_modifiers_for_name('rev')
+ self.assertIsNotNone(mod)
+
+ source = b'ABC'
+ transformed = mod.transform(source)
+
+ self.assertEqual(source[::-1], transformed[0])
+
+
+ mod = find_token_modifiers_for_name('lower')
+ self.assertIsNotNone(mod)
+
+ source = b'AbC'
+ transformed = mod.transform(source)
+
+ self.assertEqual(source.lower(), transformed[0])
+
+
+ mod = find_token_modifiers_for_name('upper')
+ self.assertIsNotNone(mod)
+
+ source = b'AbC'
+ transformed = mod.transform(source)
+
+ self.assertEqual(source.upper(), transformed[0])
+
+
+ mod = find_token_modifiers_for_name('wide')
+ self.assertIsNotNone(mod)
+
+ source = b'ABC'
+ transformed = mod.transform(source)
+
+ self.assertEqual(source.decode('ascii'), transformed[0].decode('utf-16-le'))
+
+
+ mod = find_token_modifiers_for_name('base64')
+ self.assertIsNotNone(mod)
+
+ source = b'ABC'
+ transformed = mod.transform(source)
+
+ self.assertEqual(len(transformed), 3)
+ self.assertEqual(transformed[0], b'QUJD')
+ self.assertEqual(transformed[1], b'FCQ')
+ self.assertEqual(transformed[2], b'BQk')
+
+
+ def testClassicalAPIHashing(self):
+ """Reproduce classical API Hashing results."""
+
+ def b2i(t):
+ return struct.unpack('<I', t)[0]
+
+
+ # Example:
+ # - PlugX (2020) - https://vms.drweb.fr/virus/?i=21512304
+
+ mod = find_token_modifiers_for_name('crc32')
+ self.assertIsNotNone(mod)
+
+ source = b'GetCurrentProcess\x00'
+ transformed = mod.transform(source)
+
+ self.assertEqual(b2i(transformed[0]), 0x3690e66)
+
+
+ # Example:
+ # - GuLoader (2020) - https://www.crowdstrike.com/blog/guloader-malware-analysis/
+
+ mod = find_token_modifiers_for_name('djb2')
+ self.assertIsNotNone(mod)
+
+ source = b'GetProcAddress'
+ transformed = mod.transform(source)
+
+ self.assertEqual(b2i(transformed[0]), 0xcf31bb1f)
+
+
+ def testCustomAPIHashing(self):
+ """Reproduce custom API Hashing results."""
+
+ def b2i(t):
+ return struct.unpack('<I', t)[0]
+
+
+ # Example:
+ # Underminer Exploit Kit (2019) - https://jsac.jpcert.or.jp/archive/2019/pdf/JSAC2019_1_koike-nakajima_jp.pdf
+
+ mod = find_token_modifiers_for_name('add1505-shl5')
+ self.assertIsNotNone(mod)
+
+ source = b'LoadLibraryA'
+ transformed = mod.transform(source)
+
+ self.assertEqual(b2i(transformed[0]), 0x5fbff0fb)
+
+
+ # Example:
+ # Enigma Stealer (2023) https://www.trendmicro.com/es_mx/research/23/b/enigma-stealer-targets-cryptocurrency-industry-with-fake-jobs.html
+
+ mod = find_token_modifiers_for_name('enigma-murmur')
+ self.assertIsNotNone(mod)
+
+ source = b'CreateMutexW'
+ transformed = mod.transform(source)
+
+ self.assertEqual(b2i(transformed[0]), 0xfd43765a)
+
+
+ # Examples:
+ # - ShadowHammer (2019) - https://blog.f-secure.com/analysis-shadowhammer-asus-attack-first-stage-payload/
+ # - ShadowHammer (2019) - https://securelist.com/operation-shadowhammer-a-high-profile-supply-chain-attack/90380/
+
+ mod = find_token_modifiers_for_name('imul21-add')
+ self.assertIsNotNone(mod)
+
+ source = b'VirtualAlloc'
+ transformed = mod.transform(source)
+
+ self.assertEqual(b2i(transformed[0]), 0xdf894b12)
+
+
+ # Examples:
+ # - Bottle Exploit Kit (2019) - https://nao-sec.org/2019/12/say-hello-to-bottle-exploit-kit.html
+ # - ShadowHammer (2019) - https://securelist.com/operation-shadowhammer-a-high-profile-supply-chain-attack/90380/
+
+ mod = find_token_modifiers_for_name('imul83-add')
+ self.assertIsNotNone(mod)
+
+ source = b'GetProcAddress'
+ transformed = mod.transform(source)
+
+ self.assertEqual(b2i(transformed[0]), 0x9ab9b854)
+
+
+ # Examples:
+ # - ?? (2021) - https://www.threatspike.com/blogs/reflective-dll-injection
+ # - Mustang Panda (2022) - https://blog.talosintelligence.com/mustang-panda-targets-europe/
+
+ mod = find_token_modifiers_for_name('ror13')
+ self.assertIsNotNone(mod)
+
+ source = b'GetProcAddress'
+ transformed = mod.transform(source)
+
+ self.assertEqual(b2i(transformed[0]), 0x7c0dfcaa)
+
+ source = b'VirtualAlloc'
+ transformed = mod.transform(source)
+
+ self.assertEqual(b2i(transformed[0]), 0x91afca54)
+
+
+ # Example:
+ # - Energetic Bear (2019) - https://insights.sei.cmu.edu/blog/api-hashing-tool-imagine-that/
+
+ mod = find_token_modifiers_for_name('sll1-add-hash32')
+ self.assertIsNotNone(mod)
+
+ source = b'LoadLibraryA'
+ transformed = mod.transform(source)
+
+ self.assertEqual(b2i(transformed[0]), 0x000d5786)
+
+
+ # Example:
+ # - SideWinder/WarHawk (2022) - https://www.zscaler.com/blogs/security-research/warhawk-new-backdoor-arsenal-sidewinder-apt-group
+
+ mod = find_token_modifiers_for_name('sub42')
+ self.assertIsNotNone(mod)
+
+ source = b'LoadLibraryA'
+ transformed = mod.transform(source)
+
+ self.assertEqual(transformed[0], b'\x8e\xb1\xa3\xa6\x8e\xab\xa4\xb4\xa3\xb4\xbb\x83')
+
+
+ # Example:
+ # - TrickBot (2021) - https://medium.com/walmartglobaltech/trickbot-crews-new-cobaltstrike-loader-32c72b78e81c
+
+ mod = find_token_modifiers_for_name('sub-index1')
+ self.assertIsNotNone(mod)
+
+ source = b'raw.githubusercontent.com'
+ transformed = mod.transform(source)
+
+ self.assertEqual(transformed[0], b'\x73\x63\x7a\x32\x6c\x6f\x7b\x70\x7e\x6c\x80\x7f\x72\x80\x72\x7f\x7f\x86\x78\x82\x89\x44\x7a\x87\x86')
+
+
+ def testBytePatternModifiersAPI(self):
+ """Validate the API for pattern modifiers."""
+
+ mod = find_token_modifiers_for_name('plain')
+ self.assertIsNotNone(mod)
+
+ source = [ b'ABC', b'01234' ]
+ transformed = mod.transform(source)
+
+ self.assertEqual(len(source), len(transformed))
+ self.assertEqual(source[0], transformed[0])
+ self.assertEqual(source[1], transformed[1])
+
+
+ mod = find_token_modifiers_for_name('xor')
+ self.assertIsNotNone(mod)
+
+ source = [ b'ABC' ]
+ transformed = mod.transform(source, 0x20)
+
+ self.assertEqual(transformed[0], b'abc')
diff --git a/tests/analysis/scan/scanning_hex.py b/tests/analysis/scan/scanning_hex.py
new file mode 100644
index 0000000..4b0fda4
--- /dev/null
+++ b/tests/analysis/scan/scanning_hex.py
@@ -0,0 +1,716 @@
+
+from common import RostTestClass
+from pychrysalide.analysis.contents import MemoryContent
+
+
+class TestRostScanningBinary(RostTestClass):
+ """TestCases for the bytes section syntax (binary)."""
+
+ def testLonelyPatterns(self):
+ """Evaluate the most simple patterns."""
+
+ cnt = MemoryContent(b'Abcdef')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 41 }
+
+ condition:
+ #a == 1 and @a[0] == 0
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'Abcdef')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 62 }
+
+ condition:
+ #a == 1 and @a[0] == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'Abcdef')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 66 }
+
+ condition:
+ #a == 1 and @a[0] == 5
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'Abcdef')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?1 }
+
+ condition:
+ #a == 1 and @a[0] == 0
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'Abcdef')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?2 }
+
+ condition:
+ #a == 1 and @a[0] == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'Abcdef')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?6 }
+
+ condition:
+ #a == 1 and @a[0] == 5
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ def ___testLonelyPatternsNot(self):
+ """Evaluate the most simple patterns (not version)."""
+
+ cnt = MemoryContent(b'Abcdef')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ~41 }
+
+ condition:
+ #a == 5 and @a[0] == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'Abcdef')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ~62 }
+
+ condition:
+ #a == 5 and @a[0] == 0
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'Abcdef')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ~66 }
+
+ condition:
+ #a == 5 and @a[4] == 4
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'Abcdef')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ~?1 }
+
+ condition:
+ #a == 5 and @a[0] == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'Abcdef')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ~?2 }
+
+ condition:
+ #a == 5 and @a[0] == 0
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'Abcdef')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ~?6 }
+
+ condition:
+ #a == 5 and @a[4] == 4
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ def testSimpleHexPattern(self):
+ """Test a simple hex pattern."""
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 41 62 63 }
+
+ condition:
+ #a == 1 and @a[0] == 4
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 2d 41 62 63 }
+
+ condition:
+ #a == 1 and @a[0] == 3
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ def testSimpleMaskedHexPattern(self):
+ """Test a simple masked hex pattern."""
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?1 6? ?3 }
+
+ condition:
+ #a == 1 and @a[0] == 4
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ def testHexPatternWithPlainAndMasked(self):
+ """Test hex patterns with plain and masked bytes."""
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 41 6? ?3 }
+
+ condition:
+ #a == 1 and @a[0] == 4
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 4? 62 ?3 }
+
+ condition:
+ #a == 1 and @a[0] == 4
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 4? ?2 63 }
+
+ condition:
+ #a == 1 and @a[0] == 4
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 4? ?2 ?3 }
+
+ condition:
+ #a == 1 and @a[0] == 4
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 2d 4? ?2 63 }
+
+ condition:
+ #a == 1 and @a[0] == 3
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 2d 4? 62 ?3 2d }
+
+ condition:
+ #a == 1 and @a[0] == 3
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 2? 41 6? 63 ?d }
+
+ condition:
+ #a == 1 and @a[0] == 3
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ def testHexPatternWithPlainAndHoles(self):
+ """Test hex patterns with plain bytes and holes."""
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 33 ?? 41 ?? 63 ?? 34 }
+
+ condition:
+ #a == 1 and @a[0] == 2
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?? 33 ?? 41 ?? 63 ?? 34 ?? }
+
+ condition:
+ #a == 1 and @a[0] == 1 and !a[0] == 9
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?? 33 [1-5] 63 ?? 34 ?? }
+
+ condition:
+ #a == 1 and @a[0] == 1 and !a[0] == 9
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { [3-4] 41 ?? 63 ?? 34 ?? }
+
+ condition:
+ #a == 1 and @a[0] == 1 and !a[0] == 9
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?? 33 ?? 41 ?? 63 [3-] }
+
+ condition:
+ #a == 1 and @a[0] == 1 and !a[0] == 9
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ def testHexPatternWithMaskedAndHoles(self):
+ """Test hex patterns with masked bytes and holes."""
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?3 ?? 4? ?? 6? ?? ?4 }
+
+ condition:
+ #a == 1 and @a[0] == 2
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?? ?3 ?? 4? ?? 6? ?? ?4 ?? }
+
+ condition:
+ #a == 1 and @a[0] == 1 and !a[0] == 9
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?? ?3 [1-5] ?3 ?? ?4 ?? }
+
+ condition:
+ #a == 1 and @a[0] == 1 and !a[0] == 9
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { [3-4] ?1 ?? ?3 ?? ?4 ?? }
+
+ condition:
+ #a == 1 and @a[0] == 1 and !a[0] == 9
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ?? 3? ?? 4? ?? 6? [3-] }
+
+ condition:
+ #a == 1 and @a[0] == 1 and !a[0] == 9
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ def testPipedPlainHexPatterns(self):
+ """Look for several patterns at once with piped definition."""
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 41 62 ( 63 | 64 | 65 ) }
+
+ condition:
+ #a == 1 and @a[0] == 4 and !a[0] == 3
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ( 41 | f2 | f3 ) 62 63 }
+
+ condition:
+ #a == 1 and @a[0] == 4 and !a[0] == 3
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 41 ( 61 | 62 | 63 ) 63 }
+
+ condition:
+ #a == 1 and @a[0] == 4 and !a[0] == 3
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ( 41 62 63 | 42 62 63 | 43 62 63 ) }
+
+ condition:
+ #a == 1 and @a[0] == 4 and !a[0] == 3
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ def testPipedMaskedHexPatterns(self):
+ """Look for several patterns at once with piped and masked definition."""
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 4? 6? ( ?3 | ?4 | ?5 ) }
+
+ condition:
+ #a == 1 and @a[0] == 4 and !a[0] == 3
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ( ?1 | ?2 | ?3 ) 6? 6? }
+
+ condition:
+ #a == 1 and @a[0] == 4 and !a[0] == 3
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { 4? ( ?1 | ?2 | ?3 ) 6? }
+
+ condition:
+ #a == 1 and @a[0] == 4 and !a[0] == 3
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ def testDuplicatedResultsFiltering(self):
+ """Filter duplicated results."""
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = { ( 4? ?2 ?3 | 4? 6? 6? | ?3 6? ?3 ) }
+
+ condition:
+ #a == 1 and @a[0] == 4 and !a[0] == 3
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
diff --git a/tests/analysis/scan/scanning_str.py b/tests/analysis/scan/scanning_str.py
new file mode 100644
index 0000000..75427a7
--- /dev/null
+++ b/tests/analysis/scan/scanning_str.py
@@ -0,0 +1,194 @@
+
+from common import RostTestClass
+from pychrysalide.analysis.contents import MemoryContent
+
+
+class TestRostScanningStrings(RostTestClass):
+ """TestCases for the bytes section syntax (strings)."""
+
+ def testSimpleStringPattern(self):
+ """Test a simple string pattern."""
+
+ cnt = MemoryContent(b'123-Abc-456')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = "Abc"
+
+ condition:
+ #a == 1 and @a[0] == 4
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ def testEscapedStringPattern(self):
+ """Test escaped string patterns."""
+
+ cnt = MemoryContent(b'\a\b\t\n\v\f\r' + bytes([ 0x1b ]) + b'"\\\xff')
+
+ rule = r'''
+rule test {
+
+ bytes:
+ $a = "\a\b\t\n\v\f\r\e\"\\\xff"
+
+ condition:
+ #a == 1 and @a[0] == 0
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'\a\b\t\n--123--\v\f\r' + bytes([ 0x1b ]) + b'"\\\xff')
+
+ rule = r'''
+rule test {
+
+ bytes:
+ $a = "\a\b\t\n--123--\v\f\r\e\"\\\xff"
+
+ condition:
+ #a == 1 and @a[0] == 0
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ def testStringModifiers(self):
+ """Check string modifiers output."""
+
+ cnt = MemoryContent(b'--414243--')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = "ABC" hex
+
+ condition:
+ #a == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'--ABC--')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = "ABC" plain
+
+ condition:
+ #a == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'--CBA--')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = "ABC" rev
+
+ condition:
+ #a == 1
+
+}
+'''
+
+
+ def testStringPatternFullword(self):
+ """Test a fullword string pattern."""
+
+ cnt = MemoryContent(b'ABCDEF 123 ')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = "DEF" fullword
+ $b = "123" fullword
+
+ condition:
+ #a == 0 and #b == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'DEF 123 ')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = "DEF" fullword
+ $b = "123" fullword
+
+ condition:
+ #a == 1 and #b == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ cnt = MemoryContent(b'\tDEF 123 ')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = "DEF" fullword
+ $b = "123" fullword
+
+ condition:
+ #a == 1 and #b == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
+
+
+ def testStringPatternCase(self):
+ """Test a string pattern with case care."""
+
+ cnt = MemoryContent(b'abc123-Abc123Def456GHI...z0z1z2z3z4z5z6z7z8z9')
+
+ rule = '''
+rule test {
+
+ bytes:
+ $a = "Abc" nocase
+ $b = "ABC123DEF456GHI" nocase
+ $z = "z0z1z2z3z4z5z6z7z8z9" nocase
+
+ condition:
+ #a == 2 and #b == 1 and #z == 1
+
+}
+'''
+
+ self.check_rule_success(rule, content=cnt)
diff --git a/tests/analysis/scan/sets.py b/tests/analysis/scan/sets.py
new file mode 100644
index 0000000..1d10fbf
--- /dev/null
+++ b/tests/analysis/scan/sets.py
@@ -0,0 +1,118 @@
+
+from common import RostTestClass
+
+
+class TestRostSets(RostTestClass):
+ """TestCases for sets support in ROST."""
+
+ def testSetsAsBooleans(self):
+ """Convert sets to boolean."""
+
+ rule = '''
+rule test {
+
+ condition:
+ ()
+
+}
+'''
+
+ self.check_rule_failure(rule)
+
+
+ rule = '''
+rule test {
+
+ condition:
+ (1, )
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ rule = '''
+rule test {
+
+ condition:
+ ("aaa", true, 123)
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ def testStringAsArray(self):
+ """Handle strings as arrays."""
+
+ rule = '''
+rule test {
+
+ condition:
+ count("aaa")
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ rule = '''
+rule test {
+
+ condition:
+ count("aaa") == 3
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ def testSetsIntersections(self):
+ """Perform sets intersections."""
+
+ rule = '''
+rule test {
+
+ condition:
+ ("aaa", "bbb") in ("AAA", "BBB", "aaa")
+
+}
+'''
+
+ self.check_rule_success(rule)
+
+
+ rule = '''
+rule test {
+
+ condition:
+ ("aaa", "bbb") in ("123", )
+
+}
+'''
+
+ self.check_rule_failure(rule)
+
+
+
+
+
+
+
+
+
+
+
+
+
+ # TODO :
+
+ # test : intersection(a, a) == a
+
+ # test : "123" in "0123456789"
+ # test : "123" in "012987"
+
diff --git a/tests/common/bitfield.py b/tests/common/bitfield.py
index e014111..75dfb6e 100644
--- a/tests/common/bitfield.py
+++ b/tests/common/bitfield.py
@@ -19,6 +19,23 @@ class TestBitFields(ChrysalideTestCase):
self.assertEqual(bf.popcount, bf2.popcount)
+ def testResizeBitField(self):
+ """Resize bitfields."""
+
+ bf_a = BitField(10, 0)
+
+ bf_b = BitField(6, 0)
+ bf_b.resize(10)
+
+ self.assertEqual(bf_a, bf_b)
+
+ bf_a = BitField(133, 1)
+
+ bf_b = BitField(64, 1)
+ bf_b.resize(133)
+
+ self.assertEqual(bf_a, bf_b)
+
def testBitFieldValues(self):
"""Evaluate bitfields basic values."""
@@ -70,6 +87,39 @@ class TestBitFields(ChrysalideTestCase):
self.assertEqual(bf_f.popcount, bf_a.popcount)
+ def testBitFieldLogicalOperationsAt(self):
+ """Perform logical operations on bitfields at a given position."""
+
+ bf_a = BitField(75, 0)
+
+ bf_b = BitField(4, 1)
+ bf_b.reset(2, 1)
+
+ bf_a.or_at(bf_b, 63)
+
+ self.assertFalse(bf_a.test(62))
+
+ self.assertTrue(bf_a.test(63))
+ self.assertTrue(bf_a.test(64))
+ self.assertFalse(bf_a.test(65))
+ self.assertTrue(bf_a.test(66))
+
+ self.assertFalse(bf_a.test(67))
+
+ bf_a = BitField(75, 0)
+
+ bf_a.or_at(bf_b, 60)
+
+ self.assertFalse(bf_a.test(59))
+
+ self.assertTrue(bf_a.test(60))
+ self.assertTrue(bf_a.test(61))
+ self.assertFalse(bf_a.test(62))
+ self.assertTrue(bf_a.test(63))
+
+ self.assertFalse(bf_a.test(64))
+
+
def testBitFieldSwitch(self):
"""Switch various bits in bitfields."""
@@ -118,6 +168,47 @@ class TestBitFields(ChrysalideTestCase):
self.assertTrue(bf.test_none(0, 54))
+ def testBitFieldWithBitField(self):
+ """Test bits in bitfields against other bitfields."""
+
+ bf = BitField(32, 0)
+ bf.set(8, 16)
+
+ mask = BitField(8, 1)
+
+ self.assertTrue(bf.test_ones_with(8, mask))
+ self.assertTrue(bf.test_ones_with(16, mask))
+ self.assertFalse(bf.test_ones_with(17, mask))
+ self.assertTrue(bf.test_zeros_with(24, mask))
+
+ bf = BitField(256, 0)
+ bf.set(60, 8)
+ bf.set(126, 10)
+
+ mask = BitField(4, 1)
+
+ self.assertTrue(bf.test_zeros_with(8, mask))
+ self.assertTrue(bf.test_zeros_with(122, mask))
+
+ self.assertFalse(bf.test_zeros_with(58, mask))
+ self.assertFalse(bf.test_ones_with(58, mask))
+ self.assertTrue(bf.test_ones_with(60, mask))
+ self.assertFalse(bf.test_zeros_with(63, mask))
+ self.assertTrue(bf.test_ones_with(64, mask))
+ self.assertFalse(bf.test_zeros_with(65, mask))
+ self.assertFalse(bf.test_ones_with(65, mask))
+
+ self.assertFalse(bf.test_zeros_with(125, mask))
+ self.assertFalse(bf.test_ones_with(125, mask))
+ self.assertTrue(bf.test_ones_with(128, mask))
+ self.assertFalse(bf.test_zeros_with(129, mask))
+ self.assertTrue(bf.test_ones_with(132, mask))
+ self.assertFalse(bf.test_zeros_with(133, mask))
+ self.assertFalse(bf.test_ones_with(133, mask))
+
+ self.assertTrue(bf.test_zeros_with(136, mask))
+
+
def testPopCountForBitField(self):
"""Count bits set to 1 in bitfield."""
@@ -138,3 +229,86 @@ class TestBitFields(ChrysalideTestCase):
bf_b = BitField(9, 1)
self.assertNotEqual(bf_a, bf_b)
+
+
+ def testSearchOfSetBit(self):
+ """Find the next set bit in a bit field."""
+
+ size = 128
+ bf = BitField(size, 0)
+
+ bits = [ 0, 1, 50, 63, 64, 65, 111 ]
+
+ for b in bits:
+ bf.set(b, 1)
+
+ prev = None
+ found = []
+
+ while prev is None or prev < size:
+
+ if prev is None:
+ f = bf.find_next_set()
+ else:
+ f = bf.find_next_set(prev)
+
+ if f < size:
+ found.append(f)
+
+ prev = f
+
+ self.assertEqual(found, bits)
+
+
+ def testRealCase00(self):
+ """Test bits in bitfields against other bitfields in a real case (#02)."""
+
+ bf = BitField(128, 0)
+
+ for b in [ 0, 50, 54, 58, 66, 70, 98 ]:
+ bf.set(b, 1)
+
+ mask = BitField(128, 0)
+
+ for b in [ 0, 51 ]:
+ mask.set(b, 1)
+
+ self.assertFalse(bf.test_zeros_with(0, mask))
+
+ self.assertTrue(bf.test_zeros_with(1, mask))
+
+ bf = BitField(32, 0)
+
+ mask = BitField(32, 0)
+
+ self.assertTrue(bf.test_zeros_with(0, mask))
+
+ for b in [ 0, 8, 9, 10 ]:
+ mask.set(b, 1)
+
+ self.assertTrue(bf.test_zeros_with(0, mask))
+
+ bf = BitField(32, 1)
+
+ self.assertFalse(bf.test_zeros_with(0, mask))
+
+ self.assertTrue(bf.test_ones_with(0, mask))
+
+
+ def testRealCase01(self):
+ """Test bits in bitfields against other bitfields in a real case (#01)."""
+
+ bf = BitField(128, 0)
+
+ mask = BitField(128, 0)
+
+ bits = [ 0, 50, 54, 58, 66, 70, 98 ]
+
+ for b in bits:
+ mask.set(b, 1)
+
+ bf.or_at(mask, 0)
+
+ self.assertEqual(mask.popcount, len(bits))
+
+ self.assertEqual(mask.popcount, bf.popcount)
diff --git a/tests/common/itoa.py b/tests/common/itoa.py
new file mode 100644
index 0000000..a004cbd
--- /dev/null
+++ b/tests/common/itoa.py
@@ -0,0 +1,28 @@
+
+from chrysacase import ChrysalideTestCase
+from pychrysalide.common import itoa
+
+
+class TestItoa(ChrysalideTestCase):
+ """TestCase for calls to the itoa() implementation."""
+
+ def testItoaCallss(self):
+ """Convert some integer values into strings."""
+
+ val = itoa(123, 10)
+ self.assertEqual(val, '123')
+
+ val = itoa(-123, 10)
+ self.assertEqual(val, '-123')
+
+ val = itoa(0, 10)
+ self.assertEqual(val, '0')
+
+ val = itoa(0, 2)
+ self.assertEqual(val, '0')
+
+ val = itoa(127, 2)
+ self.assertEqual(val, '1111111')
+
+ val = itoa(101, 2)
+ self.assertEqual(val, '1100101')
diff --git a/tests/plugins/encodings/all.py b/tests/plugins/encodings/all.py
new file mode 100644
index 0000000..a856ccb
--- /dev/null
+++ b/tests/plugins/encodings/all.py
@@ -0,0 +1,23 @@
+
+from chrysacase import ChrysalideTestCase
+from pychrysalide.plugins import encodings
+
+import base64
+
+
+class TestEncodingsModule(ChrysalideTestCase):
+ """TestCase for encodings plugin."""
+
+ def testBase64Encoding(self):
+ """Validate the base64 implementation."""
+
+ text = '0123456789abcdef'
+
+ for i in range(len(text) + 1):
+
+ src = text[:i].encode('ascii')
+
+ encoded = encodings.base64_encode(src)
+ ref = base64.b64encode(src)
+
+ self.assertEqual(encoded, ref)
diff --git a/tests/plugins/kaitai/__init__.py b/tests/plugins/kaitai/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/plugins/kaitai/__init__.py
diff --git a/tests/plugins/kaitai/language.py b/tests/plugins/kaitai/language.py
new file mode 100644
index 0000000..43b6185
--- /dev/null
+++ b/tests/plugins/kaitai/language.py
@@ -0,0 +1,2474 @@
+#!/usr/bin/python3-dbg
+# -*- coding: utf-8 -*-
+
+import locale
+
+from chrysacase import ChrysalideTestCase
+from pychrysalide.analysis.contents import MemoryContent
+from pychrysalide.plugins.kaitai.parsers import KaitaiStruct
+
+
+class TestKaitaiStruct(ChrysalideTestCase):
+ """TestCase for the KaitaiStruct parsing."""
+
+
+ @classmethod
+ def setUpClass(cls):
+
+ super(TestKaitaiStruct, cls).setUpClass()
+
+ cls.log('Setting locale suitable for floats...')
+
+ cls._old_locale = locale.getlocale(locale.LC_NUMERIC)
+
+ locale.setlocale(locale.LC_NUMERIC, 'C')
+
+
+ @classmethod
+ def tearDownClass(cls):
+
+ super(TestKaitaiStruct, cls).tearDownClass()
+
+ cls.log('Reverting locale...')
+
+ locale.setlocale(locale.LC_NUMERIC, cls._old_locale)
+
+
+
+ #################################
+ ### 4. Kaitai Struct language
+ #################################
+
+
+ def testKaitaiFixedLength(self):
+ """Load fixed-size structures."""
+
+ # Cf. 4.1. Fixed-size structures
+
+ definitions = '''
+meta:
+ id: mydesc
+ title: My Long Title
+ endian: be
+seq:
+ - id: field0
+ type: u4
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ self.assertEqual(kstruct.meta.id, 'mydesc')
+ self.assertEqual(kstruct.meta.title, 'My Long Title')
+
+ content = MemoryContent(b'\x01\x02\x03\x04')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.field0.range.length, 4)
+ self.assertEqual(parsed.field0.value, 0x01020304)
+
+ definitions = '''
+meta:
+ endian: le
+seq:
+ - id: field0
+ type: u4
+ - id: field1
+ type: u4be
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ self.assertIsNone(kstruct.meta.id)
+ self.assertIsNone(kstruct.meta.title)
+
+ content = MemoryContent(b'\x01\x02\x03\x04\x01\x02\x03\x04')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.field0.range.length, 4)
+ self.assertEqual(parsed.field0.value, 0x04030201)
+
+ self.assertEqual(parsed.field1.range.length, 4)
+ self.assertEqual(parsed.field1.value, 0x01020304)
+
+
+ definitions = '''
+seq:
+ - id: field0
+ type: u1
+ - id: field1
+ size: 2
+ - id: field2
+ size: field0 + 1
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x02\x03\x04\x05')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.field0.range.length, 1)
+ self.assertEqual(parsed.field0.value, 0x01)
+
+ self.assertEqual(parsed.field1.range.length, 2)
+ self.assertEqual(parsed.field1.truncated_bytes, b'\x02\x03')
+
+ self.assertEqual(parsed.field2.range.length, 2)
+ self.assertEqual(parsed.field2.truncated_bytes, b'\x04\x05')
+
+
+ def testDocstrings(self):
+ """Handle Kaitai documentation."""
+
+ # Cf. 4.2. Docstrings
+
+ definitions = '''
+seq:
+ - id: rating
+ type: s4
+ doc: Rating, can be negative
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x02\x02\x04')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.rating.creator.doc, 'Rating, can be negative')
+
+
+ def testKaitaiContents(self):
+ """Read various forms of fixed content."""
+
+ # Cf. 4.3. Checking for "magic" signatures
+
+ definitions = '''
+seq:
+ - id: field0
+ contents: [ 0, 0x10, '22', "50 ]
+'''
+
+ # ValueError: Unable to create Kaitai structure.
+ with self.assertRaisesRegex(ValueError, "Unable to create Kaitai structure"):
+ kstruct = KaitaiStruct(definitions)
+ self.assertIsNotNone(kstruct)
+
+
+ definitions = '''
+seq:
+ - id: field0
+ contents: [ 0x41, 66, 'CD' ]
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'ABCD')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.field0.range.length, 4)
+
+ self.assertEqual(parsed.field0.value, b'ABCD')
+
+
+ definitions = '''
+seq:
+ - id: field0
+ contents: ABCD
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'ABCD')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.field0.range.length, 4)
+
+
+ definitions = '''
+seq:
+ - id: field0
+ contents: "ABCD"
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'ABCD')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.field0.range.length, 4)
+
+
+ definitions = '''
+seq:
+ - id: field0
+ contents:
+ - 0x41
+ - "B"
+ - CD
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'ABCD')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.field0.range.length, 4)
+
+
+ def testVariableLengthStructures(self):
+ """Parse variable-length structures."""
+
+ # Cf. 4.4. Variable-length structures
+
+ definitions = '''
+seq:
+ - id: my_len
+ type: u1
+ - id: my_str
+ type: str
+ size: my_len
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x03ABC')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.my_len.value, 3)
+
+ self.assertEqual(parsed.my_str.value, b'ABC')
+
+
+ definitions = '''
+seq:
+ - id: my_len
+ type: u1
+ - id: my_str
+ type: str
+ size: my_len * 2
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x03ABCDEF')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.my_len.value, 3)
+
+ self.assertEqual(parsed.my_str.value, b'ABCDEF')
+
+
+ definitions = '''
+seq:
+ - id: field0
+ size-eos: true
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x02\x02\x03')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(content, parsed.content)
+
+ self.assertEqual(parsed.range.addr.phys, 0)
+ self.assertEqual(parsed.range.length, len(content.data))
+
+
+ def testDelimitedStructures(self):
+ """Parse delimited structures."""
+
+ # Cf. 4.5. Delimited structures
+
+ definitions = '''
+seq:
+ - id: my_string
+ type: str
+ terminator: 0
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'ABC\x00DEF')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.my_string.value, b'ABC')
+
+
+ definitions = '''
+seq:
+ - id: my_string
+ type: strz
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'ABC\x00DEF')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.my_string.value, b'ABC')
+
+
+ definitions = '''
+seq:
+ - id: name
+ type: str
+ size: 8
+ terminator: 0
+ - id: guard
+ size: 1
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'ABC\x00\x00\x00\x00\x00x\x00')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.name.value, b'ABC')
+
+ self.assertEqual(parsed.guard.value, b'x')
+
+
+ def __passed__testEnums(self):
+ """Parse delimited structures."""
+
+ # Cf. 4.6. Enums (named integer constants)
+
+ pass
+
+
+ def testSubTypes(self):
+ """Includes subtypes definitions."""
+
+ # Cf. 4.7. Substructures (subtypes)
+
+ definitions = '''
+seq:
+ - id: field0
+ type: custom_type
+ - id: field1
+ type: custom_type
+ - id: field2
+ type: custom_type
+types:
+ custom_type:
+ seq:
+ - id: len
+ type: u1
+ - id: value
+ size: len
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\xaa\x02\xbb\xbb\x03\xcc\xcc\xcc')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.field0.len.value, 1)
+ self.assertEqual(parsed.field0.value.truncated_bytes, b'\xaa')
+
+ self.assertEqual(parsed.field1.len.value, 2)
+ self.assertEqual(parsed.field1.value.truncated_bytes, b'\xbb\xbb')
+
+ self.assertEqual(parsed.field2.len.value, 3)
+ self.assertEqual(parsed.field2.value.truncated_bytes, b'\xcc\xcc\xcc')
+
+
+ def testOtherAttributesAccess(self):
+ """Access attributes in other types."""
+
+ # Cf. 4.8. Accessing attributes in other types
+
+ definitions = '''
+seq:
+ - id: header
+ type: main_header
+ - id: body
+ size: header.body_len
+types:
+ main_header:
+ seq:
+ - id: magic
+ contents: FMT
+ - id: body_len
+ type: u1
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'FMT\x04\xaa\xbb\xcc\xdd')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.header.magic.raw_bytes, b'FMT')
+ self.assertEqual(parsed.header.magic.range.length, 3)
+
+ self.assertEqual(parsed.header.body_len.value, 4)
+
+ self.assertEqual(parsed.body.raw_bytes, b'\xaa\xbb\xcc\xdd')
+ self.assertEqual(parsed.body.range.length, 4)
+
+
+ def testConditionals(self):
+ """Read Kaitai values according to previous loaded values."""
+
+ # Cf. 4.9. Conditionals
+
+ definitions = '''
+seq:
+ - id: field1
+ type: u1
+ - id: field2
+ type: u1
+ - id: field3
+ type: u1
+ if: field1 + field2 > 10
+ - id: field4
+ type: u1
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x02\x03\x04')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.field1.value, 0x01)
+ self.assertEqual(parsed.field2.value, 0x02)
+ self.assertFalse(hasattr(parsed, 'field3'))
+ self.assertEqual(parsed.field4.value, 0x03)
+
+
+ definitions = '''
+seq:
+ - id: field1
+ type: u1
+ - id: field2
+ type: u1
+ - id: field3
+ type: u1
+ if: field1 + field2 > 1
+ - id: field4
+ type: u1
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x02\x03\x04')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.field1.value, 0x01)
+ self.assertEqual(parsed.field2.value, 0x02)
+ self.assertTrue(hasattr(parsed, 'field3'))
+ self.assertEqual(parsed.field4.value, 0x04)
+
+
+ definitions = '''
+seq:
+ - id: field1
+ type: u1
+ - id: field2
+ type: u1
+ - id: field3
+ type: u1
+ if: field1 + field2 == threshold::three
+ - id: field4
+ type: u1
+enums:
+ threshold:
+ 1: one
+ 2: two
+ 3: three
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x02\x03\x04')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.field1.value, 0x01)
+ self.assertEqual(parsed.field2.value, 0x02)
+ self.assertTrue(hasattr(parsed, 'field3'))
+ self.assertEqual(parsed.field4.value, 0x04)
+
+
+ def testRepeatedReadUntilEOS(self):
+ """Read items until the end of the stream."""
+
+ # Cf. 4.10.1. Repeat until end of stream
+
+ definitions = '''
+seq:
+ - id: field0
+ type: u2be
+ repeat: eos
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x00\x02\x00\x03\x00\x04\x00')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(len(parsed.field0), len(content.data) / 2)
+
+ for i in range(4):
+ self.assertEqual(parsed.field0[i].value, (i + 1) << 8)
+
+
+ def testRepeatedReadAccordingToCounter(self):
+ """Repeat read of items for a nomber of times."""
+
+ # Cf. 4.10.2. Repeat for a number of times
+
+ definitions = '''
+seq:
+ - id: field0
+ type: u1
+ - id: field1
+ type: u1
+ repeat: expr
+ repeat-expr: 1
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x01')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.field0.value, 0x01)
+
+ self.assertEqual(len(parsed.field1), 1)
+
+ for i in range(1):
+ self.assertEqual(parsed.field1[i].value, i + 1)
+
+ definitions = '''
+seq:
+ - id: field0
+ type: u1
+ - id: field1
+ type: u1
+ - id: field2
+ type: u2
+ repeat: expr
+ repeat-expr: field0 + field1
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x02\x01\x00\x02\x00\x03\x00')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.field0.value, 0x01)
+ self.assertEqual(parsed.field1.value, 0x02)
+
+ self.assertEqual(len(parsed.field2), 3)
+
+ for i in range(3):
+ self.assertEqual(parsed.field2[i].value, i + 1)
+
+
+ def testRepeatUntilConditionIsMet(self):
+ """Repeat until condition is met."""
+
+ # Cf. 4.10.3. Repeat until condition is met
+
+ definitions = '''
+seq:
+ - id: numbers
+ type: u1
+ repeat: until
+ repeat-until: _ == 0xff
+ - id: extra
+ type: u1
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x02\xff\xcc')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(len(parsed.numbers), 3)
+
+ for i in range(2):
+ self.assertEqual(parsed.numbers[i].value, i + 1)
+
+ self.assertEqual(parsed.numbers[2].value, 0xff)
+
+ self.assertEqual(parsed.extra.value, 0xcc)
+
+ definitions = '''
+seq:
+ - id: records
+ type: buffer_with_len
+ repeat: until
+ repeat-until: _.len == 0
+types:
+ buffer_with_len:
+ seq:
+ - id: len
+ type: u1
+ - id: value
+ size: len
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x02\xaa\xaa\x01\xbb\x00')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.records[0].len.value, 2)
+ self.assertEqual(parsed.records[0].value.raw_bytes, b'\xaa\xaa')
+
+ self.assertEqual(parsed.records[1].len.value, 1)
+ self.assertEqual(parsed.records[1].value.raw_bytes, b'\xbb')
+
+ self.assertEqual(parsed.records[2].len.value, 0)
+ self.assertEqual(parsed.records[2].value.raw_bytes, b'')
+
+
+ def testParseTLVImplementation(self):
+ """Parse a typical TLV implementation."""
+
+ # Cf. 4.11. Typical TLV implementation (switching types on an expression)
+
+ definitions = '''
+seq:
+ - id: record
+ type: rec_def
+ repeat: eos
+types:
+ rec_def:
+ seq:
+ - id: rec_type
+ type: u1
+ - id: len
+ type: u1
+ - id: body
+ size: len
+ type:
+ switch-on: rec_type
+ cases:
+ 1: rec_type_1
+ 2: rec_type_2
+ rec_type_1:
+ seq:
+ - id: field1
+ type: u1
+ rec_type_2:
+ seq:
+ - id: field2
+ type: u2
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x01\xaa\x02\x02\xcc\xbb')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(len(parsed.record), 2)
+
+ self.assertEqual(parsed.record[0].rec_type.value, 1)
+ self.assertEqual(parsed.record[0].len.value, 1)
+ self.assertEqual(parsed.record[0].body.field1.value, 0xaa)
+
+ self.assertEqual(parsed.record[1].rec_type.value, 2)
+ self.assertEqual(parsed.record[1].len.value, 2)
+ self.assertEqual(parsed.record[1].body.field2.value, 0xbbcc)
+
+
+ def testInstanceWithDataBeyondTheSequence(self):
+ """Build instances with data beyond the sequence."""
+
+ # Cf. 4.12. Instances: data beyond the sequence
+
+ definitions = '''
+instances:
+ some_integer:
+ pos: 0x4
+ type: u1
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x02\x03\x04\x05\x06\x07\x08')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.some_integer.value, 5)
+
+
+ definitions = '''
+seq:
+ - id: file_offset
+ type: u1
+ - id: file_size
+ type: u1
+instances:
+ body:
+ pos: file_offset
+ size: file_size
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x04\x02\x90\x90ABCD')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.file_offset.value, 4)
+
+ self.assertEqual(parsed.file_size.value, 2)
+
+ self.assertEqual(parsed.body.value, b'AB')
+
+
+ def testValueInstances(self):
+ """Build value instances"""
+
+ # Cf. 4.13. Value instances
+
+ definitions = '''
+seq:
+ - id: length
+ type: u1
+ - id: extra
+ type: u1
+instances:
+ length_extended:
+ value: length * 3 + extra
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x02\x03\x04')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.length.value, 1)
+
+ self.assertEqual(parsed.extra.value, 2)
+
+ self.assertEqual(parsed.length_extended.value, 5)
+
+
+ def testBitSizedIntegers(self):
+ """Read bit-sized integers."""
+
+ # Cf. 4.14. Bit-sized integers
+
+ definitions = '''
+seq:
+ - id: packed_1
+ type: u1
+instances:
+ version:
+ value: (packed_1 & 0b11110000) >> 4
+ len_header:
+ value: packed_1 & 0b00001111
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x9a')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.packed_1.value, 0x9a)
+
+ self.assertEqual(parsed.version.value, 0x9)
+
+ self.assertEqual(parsed.len_header.value, 0xa)
+
+
+ def __passed__testBitSizedIntegersBigEndian(self):
+ """Read bit-sized integers."""
+
+ # Cf. 4.14.1. Big-endian order
+
+ pass
+
+
+ def __passed__testBitSizedIntegersLittleEndian(self):
+ """Read bit-sized integers."""
+
+ # Cf. 4.14.2. Little-endian order
+
+ pass
+
+
+ def __passed__testBitSizedIntegersSpecifiedEndianness(self):
+ """Read bit-sized integers with specified bit endianness."""
+
+ # Cf. 4.14.3. Specifying bit endianness
+
+ pass
+
+
+
+ #################################
+ ### 5. Streams and substreams
+ #################################
+
+
+ def testTotalSizeLimit(self):
+ """Limit total size of structure."""
+
+ # Cf. 5.1. Limiting total size of structure
+
+ definitions = '''
+seq:
+ - id: body_len
+ type: u1
+ - id: random
+ size: 2
+ - id: comment
+ size: body_len - 2
+ - id: extra
+ type: u1
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x05\x01\x02---\xbb')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.body_len.value, 0x05)
+
+ self.assertEqual(parsed.random.raw_bytes, b'\x01\x02')
+
+ self.assertEqual(parsed.comment.raw_bytes, b'---')
+
+ self.assertEqual(parsed.extra.raw_bytes, b'\xbb')
+
+
+ definitions = '''
+seq:
+ - id: body_len
+ type: u1
+ - id: body
+ type: record_body
+ size: body_len
+ - id: extra
+ type: u1
+types:
+ record_body:
+ seq:
+ - id: random
+ size: 2
+ - id: comment
+ size-eos: true
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x05\x01\x02---\xbb')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.body_len.value, 0x05)
+
+ self.assertEqual(parsed.body.random.raw_bytes, b'\x01\x02')
+
+ self.assertEqual(parsed.body.comment.raw_bytes, b'---')
+
+ self.assertEqual(parsed.extra.raw_bytes, b'\xbb')
+
+
+ def testRepeatSizeLimit(self):
+ """Repeating until total size reaches limit."""
+
+ # Cf. 5.2. Repeating until total size reaches limit
+
+ content = MemoryContent(b'\x03\x00\x01\x02\xbb')
+
+ definitions = '''
+seq:
+ - id: total_len
+ type: u1
+ - id: files
+ type: file_entries
+ size: total_len
+ - id: extra
+ type: u1
+types:
+ file_entries:
+ seq:
+ - id: entries
+ type: entry
+ repeat: eos
+ entry:
+ seq:
+ - id: index
+ type: u1
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.total_len.value, 3)
+
+ self.assertEqual(len(parsed.files.entries), 3)
+
+ for i in range(3):
+ self.assertEqual(parsed.files.entries[i].index.value, i)
+
+ self.assertEqual(parsed.extra.value, 0xbb)
+
+
+ def testRelativePositioning(self):
+ """Parse with relative positioning."""
+
+ # Cf. 5.3. Relative positioning
+
+ content = MemoryContent(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\0xe\x0f')
+
+ definitions = '''
+seq:
+ - id: some_header
+ size: 4
+ - id: body
+ type: block
+ size: 12
+types:
+ block:
+ seq:
+ - id: foo
+ type: u1
+ instances:
+ some_bytes_in_the_middle:
+ pos: 4
+ size: 4
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.some_header.value, b'\x00\x01\x02\x03')
+ self.assertEqual(parsed.body.foo.value, 0x04)
+
+ self.assertEqual(parsed.body.some_bytes_in_the_middle.value, b'\x08\x09\x0a\x0b')
+
+
+ def testAbsolutePositioning(self):
+ """Read from absolute position."""
+
+ # Cf. 5.4. Absolute positioning
+
+ content = MemoryContent(b'\x06\x03\x00\x00\x00\x00\x01\x02\x03\xbb')
+
+ definitions = '''
+seq:
+ - id: items
+ size: 10
+ type: entry
+ repeat: eos
+types:
+ entry:
+ seq:
+ - id: ofs_body
+ type: u1
+ - id: len_body
+ type: u1
+ instances:
+ body:
+ io: _root._io
+ pos: ofs_body
+ size: len_body
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.items[0].ofs_body.value, 6)
+ self.assertEqual(parsed.items[0].len_body.value, 3)
+
+ self.assertEqual(parsed.items[0].body.value, b'\x01\x02\x03')
+
+
+ def testSubstreamChoice(self):
+ """Choose a substream."""
+
+ # Cf. 5.5. Choosing a substream
+
+ content = MemoryContent(b'\xaa\xaa\xaa\xaa\x01\x02\x03\x04\x05\x06\x07\x08\x02\x03')
+
+ definitions = '''
+seq:
+ - id: global_header
+ size: 4
+ - id: block_one
+ type: big_container
+ size: 8
+ - id: block_two
+ type: smaller_container
+ size: 2
+types:
+ big_container:
+ seq:
+ - id: some_header
+ size: 8
+ smaller_container:
+ seq:
+ - id: ofs_in_big
+ type: u1
+ - id: len_in_big
+ type: u1
+ instances:
+ something_in_big:
+ io: _root.block_one._io
+ pos: ofs_in_big
+ size: len_in_big
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.block_two.ofs_in_big.value, 2)
+ self.assertEqual(parsed.block_two.len_in_big.value, 3)
+
+ self.assertEqual(parsed.block_two.something_in_big.value, b'\x03\x04\x05')
+
+
+ def __passed__testContentPreProcessing(self):
+ """Process content before parsing."""
+
+ # Cf. 5.6. Processing: dealing with compressed, obfuscated and encrypted data
+
+ pass
+
+
+
+ ##############################
+ ### 6. Expression language
+ ##############################
+
+
+ def testBasicDataTypes(self):
+ """Handle basic data types."""
+
+ # Cf. 6.1. Basic data types
+
+ definitions = '''
+seq:
+ - id: field1
+ type: u1
+ - id: field2
+ type: u2
+ - id: field4
+ type: u4
+ - id: field8
+ type: u8
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x02\x02\x04\x04\x04\x04\x08\x08\x08\x08\x08\x08\x08\x08')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.field1.range.length, 1)
+ self.assertEqual(parsed.field2.range.length, 2)
+ self.assertEqual(parsed.field4.range.length, 4)
+ self.assertEqual(parsed.field8.range.length, 8)
+
+ definitions = '''
+seq:
+ - id: field1
+ type: u1
+ - id: field4
+ type: u4le
+ - id: field4bis
+ type: u4be
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x02\x03\x04\x05\x02\x03\x04\x05')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.field1.value, 0x01)
+ self.assertEqual(parsed.field4.value, 0x05040302)
+ self.assertEqual(parsed.field4bis.value, 0x02030405)
+
+
+ definitions = '''
+instances:
+ number1:
+ value: 0xdead_cafe
+ number2:
+ value: 0xdead_cafe_dead_cafe
+ number3:
+ value: 12_345_678
+ number4:
+ value: 0b10100011
+ number5:
+ value: 0b1010_0011_1010_0011
+'''
+
+ content = MemoryContent(b'')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.number1.value, 0xdeadcafe)
+
+ self.assertEqual(parsed.number2.value, 0xdeadcafedeadcafe)
+
+ self.assertEqual(parsed.number3.value, 12345678)
+
+ self.assertEqual(parsed.number4.value, 0b10100011)
+
+ self.assertEqual(parsed.number5.value, 0b1010001110100011)
+
+
+ definitions = '''
+seq:
+ - id: op0
+ type: u1
+instances:
+ result:
+ value: 0xdeadcafe + op0
+ result2:
+ value: 0XdeadCAFE + op0
+'''
+
+ content = MemoryContent(b'\x00')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.result.value, 0xdeadcafe)
+
+ self.assertEqual(parsed.result2.value, 0xdeadcafe)
+
+
+ definitions = '''
+instances:
+ bytes1:
+ value: []
+ bytes2:
+ value: [ ]
+ bytes3:
+ value: [ 0x90 ]
+ bytes4:
+ value: [foo, 0, A, 0xa, 42]
+'''
+
+ content = MemoryContent(b'')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.bytes1.value, b'')
+
+ self.assertEqual(parsed.bytes2.value, b'')
+
+ self.assertEqual(parsed.bytes3.value, b'\x90')
+
+ self.assertEqual(parsed.bytes4.value, b'\x66\x6f\x6f\x00\x41\x0a\x2a')
+
+
+ definitions = '''
+instances:
+ escaped:
+ value: '[ "\\a\\b\\t\\n\\v\\f", "\\0", 0, " \\r\\e\\\"\\123" ]'
+'''
+
+ content = MemoryContent(b'')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.escaped.value, b'\x07\x08\x09\x0a\x0b\x0c\x00\x00 \x0d\x1b\x22\x53')
+
+
+ definitions_0 = r'''
+instances:
+ escaped:
+ value: "[ \"\\a\\b\\t\\n\\v\\f\", \"\\0\", 0, \"\\r\\e\\\"'\\123\" ]"
+'''
+
+ definitions_1 = r'''
+instances:
+ escaped:
+ value: [ "\\a\\b\\t\\n\\v\\f", "\\0", 0, "\\r\\e\\\"'\\123" ]
+'''
+
+ definitions_2 = '''
+instances:
+ escaped:
+ value: [ "\\\\a\\\\b\\\\t\\\\n\\\\v\\\\f", "\\\\0", 0, "\\\\r\\\\e\\\\\\"'\\\\123" ]
+'''
+
+ for d in [ definitions_0, definitions_1, definitions_2 ]:
+
+ content = MemoryContent(b'')
+
+ kstruct = KaitaiStruct(d)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.escaped.value, b'\x07\x08\x09\x0a\x0b\x0c\x00\x00\x0d\x1b\x22\x27\x53')
+
+
+ def __passed__testUserDefinedTypes(self):
+ """Create user-defined types."""
+
+ # Cf. 6.2.1. User-defined types
+
+ pass
+
+
+ def testArrays(self):
+ """Create various arrays."""
+
+ # Cf. 6.2.2. Arrays
+
+ definitions = '''
+instances:
+ result_0:
+ value: "[]"
+ result_1:
+ value: "[CAFE, 0, BABE]"
+ result_2:
+ value: "[CAFE, 0, BABE] == 'CAFE' + [ 0x00 ] + 'BABE'"
+ result_3:
+ value: "[CAFE, 0, BABE] == [ 0x43, 0x41, 0x46, 0x45, 0x00, 0x42, 0x41, 0x42, 0x45 ]"
+ result_4:
+ value: "[foo, 0, A, 0xa, 42] == [ 0x66, 0x6f, 0x6f, 0x00, 0x41, 0x0a, 0x2a ]"
+ result_5:
+ value: "[1, 0x55, '▒,3', 3] == [ 0x01, 0x55, 0xe2, 0x96, 0x92, 0x2c, 0x33, 0x03 ]"
+'''
+
+ content = MemoryContent(b'')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.result_0.value, b'')
+
+ self.assertEqual(parsed.result_1.value, b'CAFE\x00BABE')
+
+
+ definitions = '''
+seq:
+ - id: indexes
+ type: u1
+ repeat: eos
+instances:
+ table:
+ value: "[ [ 1, 2, 3 ], [ 4, 5, 6 ], [ 7, 8, 9 ] ]"
+ ref:
+ value: indexes
+ result_0:
+ value: table
+ result_1:
+ value: ref
+ result_2:
+ value: table[indexes[0]][indexes[1] - 1]
+ result_3:
+ value: table[indexes[0]][ref[1]]
+'''
+
+ content = MemoryContent(b'\x01\x02\x03\x04')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.result_0.value.value, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09')
+
+ self.assertEqual(type(parsed.result_1).__name__, 'RecordDelayed') # result_1
+ self.assertEqual(type(parsed.result_1.value).__name__, 'RecordDelayed') # result_1.ref
+ self.assertEqual(type(parsed.result_1.value.value).__name__, 'RecordList') # result_1.ref.table
+
+ self.assertEqual(parsed.result_1.value.value[3].value, 0x04)
+
+ self.assertEqual(parsed.result_2.value, 5)
+
+ self.assertEqual(parsed.result_3.value, 6)
+
+
+ def testArithmeticOperators(self):
+ """Compute with arithmetic operators."""
+
+ # Cf. 6.3.1. Arithmetic operators
+
+ definitions = '''
+seq:
+ - id: op0
+ type: u1
+ - id: op1
+ type: u1
+instances:
+ result_0:
+ value: op0 + op1 * 3
+ result_1:
+ value: (2 + op0) * op1
+ result_2:
+ value: 7 * 2.0
+ result_3:
+ value: 7 / 2.0
+ result_4:
+ value: -5 % 3
+ result_5:
+ value: 4 % 3
+ result_6:
+ value: 6 - 3 - -4.0
+'''
+
+ content = MemoryContent(b'\x02\x03\x04\x05')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.result_0.value, 11)
+
+ self.assertEqual(parsed.result_1.value, 12)
+
+ self.assertEqual(parsed.result_2.value, 14.0)
+
+ self.assertEqual(parsed.result_3.value, 3.5)
+
+ self.assertEqual(parsed.result_4.value, 1)
+
+ self.assertEqual(parsed.result_5.value, 1)
+
+ self.assertEqual(parsed.result_6.value, 7.0)
+
+
+ definitions = '''
+seq:
+ - id: base
+ size: 3
+instances:
+ result_0:
+ value: "'xXx ' + base + ' -- %< --'"
+'''
+
+ content = MemoryContent(b'ABC')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.result_0.value, b'xXx ABC -- %< --')
+
+
+ definitions = '''
+seq:
+ - id: nums
+ type: u1
+ repeat: eos
+instances:
+ computed:
+ value: nums[0] + nums[3]
+ computed2:
+ value: nums[0] * nums.size + nums[3]
+ computed3:
+ value: nums[0] * nums[nums.size - 1]
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x02\x03\x04')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.computed.value, 5)
+
+ self.assertEqual(parsed.computed2.value, 8)
+
+ self.assertEqual(parsed.computed3.value, 4)
+
+
+ def testRelationalOperators(self):
+ """Compute with relational operators."""
+
+ # Cf. 6.3.2. Relational operators
+
+ definitions = '''
+seq:
+ - id: op0
+ type: u1
+ - id: op1
+ type: u1
+ - id: op2
+ size: 3
+instances:
+ result0:
+ value: op0 == op1
+ result1:
+ value: op0 != op1
+ result2:
+ value: op2 == 'ABC'
+ result3:
+ value: op2 < 'ABCD'
+ result4:
+ value: (op0 + 1) >= op1
+ result5:
+ value: "(op0 + 1) == 'ABC'.length"
+'''
+
+ content = MemoryContent(b'\x02\x03ABCD')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertFalse(parsed.result0.value)
+
+ self.assertTrue(parsed.result1.value)
+
+ self.assertTrue(parsed.result2.value)
+
+ self.assertTrue(parsed.result3.value)
+
+ self.assertTrue(parsed.result4.value)
+
+ self.assertTrue(parsed.result5.value)
+
+
+ def testBitwiseOperators(self):
+ """Compute with bitwise operators."""
+
+ # Cf. 6.3.3. Bitwise operators
+
+ definitions = '''
+seq:
+ - id: op0
+ type: u1
+ - id: op1
+ type: u1
+ - id: op2
+ type: u1
+instances:
+ result_0:
+ value: op0 & op1
+ result_1:
+ value: op1 << op0 >> 1
+ result_2:
+ value: (op2 | 0x80) >> 1
+'''
+
+ content = MemoryContent(b'\x02\x07\x01')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.result_0.value, 0x2)
+
+ self.assertEqual(parsed.result_1.value, 14)
+
+ self.assertEqual(parsed.result_2.value, 0x40)
+
+
+ def testLogicalOperators(self):
+ """Compute with logical boolean operators."""
+
+ # Cf. 6.3.4. Logical (boolean) operators
+
+ definitions = '''
+seq:
+ - id: op0
+ type: u1
+ - id: op1
+ type: u1
+instances:
+ result_0:
+ value: (op0 > 0) and not false
+ result_1:
+ value: op0 == 1 or op1 == 2
+'''
+
+ content = MemoryContent(b'\x01\x02')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertTrue(parsed.result_0.value)
+
+ self.assertTrue(parsed.result_1.value)
+
+
+ def testTernaryOperator(self):
+ """Offer challenges to the ternary operator."""
+
+ # Cf. 6.3.5. Ternary (if-then-else) operator
+
+ definitions = '''
+seq:
+ - id: op0
+ type: u1
+ - id: op1
+ type: u1
+ - id: op2
+ type: u1
+instances:
+ result_0:
+ value: 'op0 == 0x80 ? op1 + 1 : op1 * op2'
+ result_1:
+ value: 'op0 < 0x80 ? op1 + 1 : op1 * op2'
+ result_1:
+ value: 'op0 < 0x80 ? op1 + 1 : op1 * op2'
+ result_2:
+ value: '(op0 + 0x10) >= 0x90 ? true : 123'
+ result_3:
+ value: '(op0 + 0x10) >= 0x90 and false ? true : 123'
+'''
+
+ content = MemoryContent(b'\x80\x03\x04')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.result_0.value, 4)
+
+ self.assertEqual(parsed.result_1.value, 12)
+
+ self.assertTrue(parsed.result_2.value)
+
+ self.assertEqual(parsed.result_3.value, 123)
+
+
+ def testIntegersMethods(self):
+ """Run methods from integers."""
+
+ # Cf. 6.4.1. Integers
+
+ definitions = '''
+instances:
+ bytes1:
+ value: 123.to_s == "123" and -123.to_s == '-123'
+'''
+
+ content = MemoryContent(b'')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertTrue(parsed.bytes1.value)
+
+
+ def testFloatsMethods(self):
+ """Run methods from floating numbers."""
+
+ # Cf. 6.4.2. Floating point numbers
+
+ definitions = '''
+instances:
+ result_0:
+ value: 2.32.to_i == 2 and -7.0.to_i == -7
+'''
+
+ content = MemoryContent(b'')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertTrue(parsed.result_0.value)
+
+
+ def XXXtestByteArraysAndStringsMethods(self):
+ """Run methods from byte arrays and strings."""
+
+ # Cf. 6.4.3. Byte arrays
+ # 6.4.4. Strings
+
+ definitions = '''
+instances:
+ result_1:
+ value: '[].length == 0'
+ result_2:
+ value: "'edcba'.reverse == 'XXabcdeXX'.substring(2, 6)"
+ result_3:
+ value: "'123'.to_i == 123 and '-123'.to_i == -123"
+ result_4:
+ value: "[ 0x50, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x74, 0x61, 0x63, 0x69, 0xc3, 0xb3, 0x6e, 0x2e, 0x73, 0x78, 0x69 ].to_s('utf-8')"
+ result_5:
+ value: "'1010'.to_i(2) == 10 and 'cc'.to_i(16) == 204"
+'''
+
+ content = MemoryContent(b'')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertTrue(parsed.result_1.value)
+
+ self.assertTrue(parsed.result_2.value)
+
+ self.assertTrue(parsed.result_3.value)
+
+ # Cf. https://docs.gtk.org/glib/character-set.html
+ # https://developer-old.gnome.org/glib/stable/glib-Character-Set-Conversion.html#g-convert
+ self.assertEqual(parsed.result_4.value.decode('utf-8'), 'Presentación.sxi')
+
+ self.assertTrue(parsed.result_5.value)
+
+
+ def __passed__testEnumsMethods(self):
+ """Run methods from booleans."""
+
+ # Cf. 6.4.5. Enums
+
+ pass
+
+
+ def testBooleansMethods(self):
+ """Run methods from booleans."""
+
+ # Cf. 6.4.6. Booleans
+
+ definitions = '''
+instances:
+ result_0:
+ value: true.to_i == 1
+ result_1:
+ value: (1 == 2).to_i == 0
+'''
+
+ content = MemoryContent(b'')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertTrue(parsed.result_0.value)
+
+ self.assertTrue(parsed.result_1.value)
+
+
+ def testUserDefinedTypes(self):
+ """Retrieve user-defined types."""
+
+ # Cf. 6.4.7. User-defined types
+
+ definitions = '''
+instances:
+ result_0:
+ value: _root
+'''
+
+ content = MemoryContent(b'')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.result_0.value, parsed)
+
+
+ def __passed__testArraysMethods(self):
+ """Run methods from arrays."""
+
+ # Cf. 6.4.8. Array types
+
+ pass
+
+
+ def __passed__testStreamsMethods(self):
+ """Run methods from streams."""
+
+ # Cf. 6.4.9. Streams
+
+ pass
+
+
+
+ ##############################
+ ### 7. Advanced techniques
+ ##############################
+
+
+ def testSwitchOverStrings(self):
+ """Switch over strings."""
+
+ # Cf. 7.1.1. Switching over strings
+
+ definitions = '''
+seq:
+ - id: rec_type
+ type: strz
+ - id: body
+ type:
+ switch-on: rec_type
+ cases:
+ '"KETCHUP"': rec_type_1
+ '"MUSTARD"': rec_type_2
+ '"GUACAMOLE"': rec_type_3
+types:
+ rec_type_1:
+ instances:
+ direct:
+ value: 1
+ rec_type_2:
+ instances:
+ direct:
+ value: 2
+ rec_type_3:
+ instances:
+ direct:
+ value: 3
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'GUACAMOLE\x00')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.rec_type.value, b'GUACAMOLE')
+
+ self.assertEqual(parsed.body.direct.value, 3)
+
+
+ def testSwitchOverEnums(self):
+ """Switch over enumerations."""
+
+ # Cf. 7.1.2. Switching over enums
+
+ definitions = '''
+seq:
+ - id: rec_type
+ type: u1
+ enum: media
+ - id: body
+ type:
+ switch-on: rec_type
+ cases:
+ 'media::cdrom': rec_type_1
+ 'media::dvdrom': rec_type_2
+ 'media::cassette': rec_type_3
+types:
+ rec_type_1:
+ instances:
+ direct:
+ value: 1
+ rec_type_2:
+ instances:
+ direct:
+ value: 2
+ rec_type_3:
+ instances:
+ direct:
+ value: 3
+enums:
+ media:
+ 1: cdrom
+ 2: dvdrom
+ 3: cassette
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.rec_type.value, 1)
+
+ self.assertEqual(parsed.body.direct.value, 1)
+
+
+ def testFourCC(self):
+ """Recognize four character code."""
+
+ # Cf. 7.1.3. FourCC
+
+ definitions = '''
+seq:
+ - id: fourcc
+ type: u4le
+ enum: pixel_formats
+ - id: len
+ type: u1
+ - id: body
+ size: len
+ type:
+ switch-on: fourcc
+ cases:
+ 'pixel_formats::rgb2': block_rgb2
+ 'pixel_formats::rle4': block_rle4
+ 'pixel_formats::rle8': block_rle8
+types:
+ block_rgb2:
+ instances:
+ direct:
+ value: 2
+ block_rle4:
+ instances:
+ direct:
+ value: 4
+ block_rle8:
+ instances:
+ direct:
+ value: 8
+enums:
+ pixel_formats:
+ 0x32424752: rgb2
+ 0x34454C52: rle4
+ 0x38454C52: rle8
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'RLE4\x05ABCDE')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.fourcc.value, 0x34454C52)
+
+ self.assertEqual(parsed.len.value, 0x5)
+
+ self.assertEqual(parsed.body.direct.value, 4)
+
+
+ def testNothing(self):
+ """Do nothing."""
+
+ # Cf. 7.2. Do nothing
+
+ definitions = '''
+seq:
+ - id: field_0
+ size: 1
+ - id: field_1
+ type: dummy_1
+ - id: field_2
+ type: dummy_2
+ - id: field_3
+ type: dummy_3
+ - id: field_4
+ type: dummy_4
+ - id: field_5
+ size: 1
+types:
+ # One can use empty JSON object syntax to avoid specifying any of
+ # `seq`, `instances`, etc, sections.
+ dummy_1: {}
+ # One can use explicit doc to note that there's nothing there.
+ dummy_2:
+ doc: This type is intentionally left blank.
+ # One can use empty `seq` or `instances` or `types` section, any
+ # other empty sections, or any combination of thereof.
+ dummy_3:
+ seq: []
+ instances: {}
+ types: {}
+ # One can use a very explicit notion of the fact that we want to parse 0 bytes.
+ dummy_4:
+ seq:
+ - id: no_value
+ size: 0
+'''
+
+ content = MemoryContent(b'az')
+
+ kstruct = KaitaiStruct(definitions)
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.field_0.value, b'a')
+
+ self.assertEqual(type(parsed.field_1).__name__, 'RecordEmpty')
+ self.assertEqual(parsed.field_1.range.length, 0)
+
+ self.assertEqual(type(parsed.field_2).__name__, 'RecordEmpty')
+ self.assertEqual(parsed.field_2.range.length, 0)
+
+ self.assertEqual(type(parsed.field_3).__name__, 'RecordEmpty')
+ self.assertEqual(parsed.field_3.range.length, 0)
+
+ self.assertEqual(type(parsed.field_4.no_value).__name__, 'RecordEmpty')
+ self.assertEqual(parsed.field_4.no_value.range.length, 0)
+
+ self.assertEqual(parsed.field_5.value, b'z')
+
+
+ def testConsumeIncludeTerminators(self):
+ """Consume and/or include terminators."""
+
+ # Cf. 7.3.1. Terminator: consume or include?
+
+ definitions = '''
+seq:
+ - id: str1
+ type: str
+ terminator: 0x2e # `.`
+ - id: str2
+ type: str
+ terminator: 0x2e # `.`
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'foo.bar.')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.str1.value, b'foo')
+
+ self.assertEqual(parsed.str2.value, b'bar')
+
+
+ definitions = '''
+seq:
+ - id: str1
+ type: str
+ terminator: 0x2e # `.`
+ include: true
+ - id: str2
+ type: str
+ terminator: 0x2e # `.`
+ eos-error: false
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'foo.bar')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.str1.value, b'foo.')
+
+ self.assertEqual(parsed.str2.value, b'bar')
+
+
+ definitions = '''
+seq:
+ - id: str1
+ type: str
+ terminator: 0x2e # `.`
+ consume: false
+ - id: the_rest
+ type: str
+ size-eos: true
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'foo.bar.')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.str1.value, b'foo')
+
+ self.assertEqual(parsed.the_rest.value, b'.bar.')
+
+
+ definitions = '''
+seq:
+ - id: str1
+ type: str
+ terminator: .
+ - id: the_rest
+ type: str
+ size-eos: true
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'foo.bar.')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.str1.value, b'foo')
+
+ self.assertEqual(parsed.the_rest.value, b'bar.')
+
+
+ definitions = '''
+seq:
+ - id: str1
+ type: str
+ terminator: xxx.
+ - id: the_rest
+ type: str
+ size-eos: true
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'fooxxx.bar.')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.str1.value, b'foo')
+
+ self.assertEqual(parsed.the_rest.value, b'bar.')
+
+
+ def testIgnoreErrorsInDelimitedStructures(self):
+ """Ignore errors in delimited structures."""
+
+ # Cf. 7.3.2. Ignoring errors in delimited structures
+
+ definitions = '''
+seq:
+ - id: my_string
+ type: str
+ terminator: 0
+ eos-error: false
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x61\x62\x63\x00\x64\x65\x66')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.my_string.value, b'abc')
+
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x61\x62\x63\x00')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.my_string.value, b'abc')
+
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x61\x62\x63')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.my_string.value, b'abc')
+
+
+ def __passed__testImportTypesFromOtherFiles(self):
+ """Import types from other files."""
+
+ # Cf. 7.4. Importing types from other files
+
+ pass
+
+
+ def __passed__testPlugExternalCodeForOpaqueTypes(self):
+ """Plug external code for opaque types."""
+
+ # Cf. 7.5. Opaque types: plugging in external code
+
+ pass
+
+
+ def __passed__testCustomProcessingRoutines(self):
+ """Handle custom processing routines."""
+
+ # Cf. 7.6. Custom processing routines
+
+ pass
+
+
+ def __passed__testParentTypeEnforcing(self):
+ """Enforce parent type."""
+
+ # Cf. 7.7. Enforcing parent type
+
+ pass
+
+
+ def testTypecasting(self):
+ """Ensure there is no need for typecasting."""
+
+ # Cf. 7.8. Typecasting
+
+ definitions = '''
+seq:
+ - id: num_sections
+ type: u1
+ - id: sections
+ type: section
+ repeat: expr
+ repeat-expr: num_sections
+types:
+ section:
+ seq:
+ - id: sect_type
+ type: u1
+ - id: body
+ type:
+ switch-on: sect_type
+ cases:
+ 1: sect_header
+ 2: sect_color_data
+ sect_header:
+ seq:
+ - id: width
+ type: u1
+ - id: height
+ type: u1
+ sect_color_data:
+ seq:
+ - id: rgb
+ size: 3
+instances:
+ check_0:
+ value: sections[0].body.width * sections[0].body.height
+ check_1:
+ value: sections[1].body.rgb
+ check_2:
+ value: sections[2].body.width * sections[2].body.height
+ check_3:
+ value: sections[3].body.rgb
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x04\x01\x02\x04\x02ABC\x01\x03\x05\x02UVW')
+
+ parsed = kstruct.parse(content)
+
+ # Vérifications externes
+
+ self.assertEqual(parsed.num_sections.value, 4)
+
+ self.assertEqual(len(parsed.sections), 4)
+
+ self.assertEqual(parsed.sections[0].body.width.value + parsed.sections[0].body.height.value, 6)
+
+ self.assertEqual(parsed.sections[1].body.rgb.value, b'ABC')
+
+ self.assertEqual(parsed.sections[2].body.width.value + parsed.sections[2].body.height.value, 8)
+
+ self.assertEqual(parsed.sections[3].body.rgb.value, b'UVW')
+
+ # Vérifications internes
+
+ self.assertEqual(parsed.check_0.value, 8)
+
+ self.assertEqual(parsed.check_1.value.value, b'ABC')
+
+ self.assertEqual(parsed.check_2.value, 15)
+
+ self.assertEqual(parsed.check_3.value.value, b'UVW')
+
+
+
+ ##########################
+ ### 8. Common pitfalls
+ ##########################
+
+
+ def testReadTypeWithSubstream(self):
+ """Read user-type with substream."""
+
+ # Cf. 8.1. Specifying size creates a substream
+
+ definitions = '''
+seq:
+ - id: header
+ size: 4
+ - id: block
+ type: block
+ size: 4 # <= important size designation, creates a substream
+instances:
+ byte_3:
+ pos: 3
+ type: u1
+types:
+ block:
+ instances:
+ byte_3:
+ pos: 3
+ type: u1
+ byte_3_alt:
+ io: _root._io # <= thanks to this, always points to a byte in main stream
+ pos: 3
+ type: u1
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x00\x01\x02\x03\x04\x05\x06\x07')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.header.value, b'\x00\x01\x02\x03')
+
+ self.assertEqual(parsed.byte_3.value, 0x03)
+
+ self.assertEqual(parsed.block.byte_3.value, 0x07)
+
+ self.assertEqual(parsed.block.byte_3_alt.value, 0x03)
+
+
+ definitions = '''
+seq:
+ - id: header
+ size: 4
+ - id: block
+ type: block
+instances:
+ byte_3:
+ pos: 3
+ type: u1
+types:
+ block:
+ instances:
+ byte_3:
+ pos: 3
+ type: u1
+ byte_3_alt:
+ io: _root._io # <= thanks to this, always points to a byte in main stream
+ pos: 3
+ type: u1
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x00\x01\x02\x03\x04\x05\x06\x07')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.header.value, b'\x00\x01\x02\x03')
+
+ self.assertEqual(parsed.byte_3.value, 0x03)
+
+ self.assertEqual(parsed.block.byte_3.value, 0x03)
+
+ self.assertEqual(parsed.block.byte_3_alt.value, 0x03)
+
+
+ def testReadTypeWithoutSubstream(self):
+ """Read user-type without substream."""
+
+ # Cf. 8.2. Not specifying size does not create a substream
+
+ definitions = '''
+seq:
+ - id: header
+ size: 2
+ - id: block_as_type1
+ type: type1
+ size: 2 # <= important, creates a substream
+types:
+ type1:
+ seq:
+ - id: val1
+ size: 2
+ type2:
+ seq:
+ - id: val2
+ size: 2
+instances:
+ block_as_type2:
+ io: block_as_type1._io
+ pos: 0
+ type: type2
+ internal_check:
+ value: block_as_type2._io == _root._io
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'aabb')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.header.value, b'aa')
+
+ self.assertEqual(parsed.block_as_type1.val1.value, b'bb')
+
+ self.assertEqual(parsed.block_as_type2.val2.value, b'bb')
+
+ self.assertFalse(parsed.internal_check.value)
+
+
+ definitions = '''
+seq:
+ - id: header
+ size: 2
+ - id: block_as_type1
+ type: type1
+types:
+ type1:
+ seq:
+ - id: val1
+ size: 2
+ type2:
+ seq:
+ - id: val2
+ size: 2
+instances:
+ block_as_type2:
+ io: block_as_type1._io
+ pos: 0
+ type: type2
+ internal_check:
+ value: block_as_type2._io == _root._io
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'aabb')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.header.value, b'aa')
+
+ self.assertEqual(parsed.block_as_type1.val1.value, b'bb')
+
+ self.assertEqual(parsed.block_as_type2.val2.value, b'aa')
+
+ self.assertTrue(parsed.internal_check.value)
+
+
+ def __passed__testSizedProcess(self):
+ """Provide a sized data to processing."""
+
+ # Cf. 8.3. Applying process without a size
+
+ pass
+
+
+ def __passed__testRelatedKeys(self):
+ """Check refering keys and their related YAML nodes."""
+
+ # Cf. 8.4. Keys relating to the whole array and to each element in repeated attributes
+
+ pass
+
+
+
+ #######################
+ ### x. Extra checks
+ #######################
+
+
+ def testMssingField(self):
+ """Raise error on missing field."""
+
+ definitions = '''
+seq:
+ - id: field0
+ size-eos: true
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\x01\x02\x02\x03')
+
+ parsed = kstruct.parse(content)
+ self.assertIsNotNone(parsed)
+
+ self.assertEqual(parsed.field0.creator.raw_id, 'field0')
+
+ self.assertEqual(parsed.field0.value, b'\x01\x02\x02\x03')
+
+ # AttributeError: 'pychrysalide.plugins.kaitai.records.RecordList' object has no attribute 'xxxx'
+ with self.assertRaisesRegex(AttributeError, "object has no attribute 'xxxx'"):
+ print(parsed.xxxx)
+
+
+ def testLEB128Values(self):
+ """Read some Little Endian Base 128 values."""
+
+ definitions = '''
+seq:
+ - id: groups
+ type: group
+ repeat: until
+ repeat-until: not _.has_next
+types:
+ group:
+ -webide-representation: '{value}'
+ doc: |
+ One byte group, clearly divided into 7-bit "value" chunk and 1-bit "continuation" flag.
+ seq:
+ - id: b
+ type: u1
+ instances:
+ has_next:
+ value: (b & 0b1000_0000) != 0
+ doc: If true, then we have more bytes to read
+ value:
+ value: b & 0b0111_1111
+ doc: The 7-bit (base128) numeric value chunk of this group
+instances:
+ len:
+ value: groups.size
+ value:
+ value: >-
+ groups[0].value
+ + (len >= 2 ? (groups[1].value << 7) : 0)
+ + (len >= 3 ? (groups[2].value << 14) : 0)
+ + (len >= 4 ? (groups[3].value << 21) : 0)
+ + (len >= 5 ? (groups[4].value << 28) : 0)
+ + (len >= 6 ? (groups[5].value << 35) : 0)
+ + (len >= 7 ? (groups[6].value << 42) : 0)
+ + (len >= 8 ? (groups[7].value << 49) : 0)
+ doc: Resulting unsigned value as normal integer
+ sign_bit:
+ value: '1 << (7 * len - 1)'
+ value_signed:
+ value: '(value ^ sign_bit) - sign_bit'
+ doc-ref: https://graphics.stanford.edu/~seander/bithacks.html#VariableSignExtend
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ content = MemoryContent(b'\xe5\x8e\x26')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.len.value, 3)
+
+ self.assertEqual(parsed.value.value, parsed.value_signed.value)
+
+ self.assertEqual(parsed.value.value, 624485)
+
+
+ content = MemoryContent(b'\xc0\xbb\x78')
+
+ parsed = kstruct.parse(content)
+
+ self.assertEqual(parsed.len.value, 3)
+
+ self.assertNotEqual(parsed.value.value, parsed.value_signed.value)
+
+ self.assertEqual(parsed.value_signed.value, -123456)
diff --git a/tests/plugins/kaitai/rost.py b/tests/plugins/kaitai/rost.py
new file mode 100644
index 0000000..4a29ef8
--- /dev/null
+++ b/tests/plugins/kaitai/rost.py
@@ -0,0 +1,170 @@
+#!/usr/bin/python3-dbg
+# -*- coding: utf-8 -*-
+
+import locale
+
+from chrysacase import ChrysalideTestCase
+from pychrysalide.analysis.contents import MemoryContent
+from pychrysalide.analysis.scan import ContentScanner
+from pychrysalide.analysis.scan import ScanOptions
+from pychrysalide.analysis.scan.patterns.backends import AcismBackend
+from pychrysalide import core
+from pychrysalide.plugins.kaitai.parsers import KaitaiStruct
+from pychrysalide.plugins.kaitai.rost import KaitaiTrigger
+
+
+class TestScansWithKaitai(ChrysalideTestCase):
+ """TestCase for ROST scan with the KaitaiStruct parsing."""
+
+ @classmethod
+ def setUpClass(cls):
+
+ super(TestScansWithKaitai, cls).setUpClass()
+
+ cls._options = ScanOptions()
+ cls._options.backend_for_data = AcismBackend
+
+
+ def testSimpleKaitaiDefinitionForScanning(self):
+ """Rely on basic Kaitai simple definition for scanning."""
+
+ definitions = '''
+meta:
+ id: basic_test
+seq:
+ - id: field0
+ type: u1
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ trigger = KaitaiTrigger(kstruct)
+
+ root_ns = core.get_rost_root_namespace()
+
+ ns = root_ns.resolve('kaitai')
+ ns.register_item(trigger)
+
+ ns = ns.resolve('basic_test')
+ self.assertEqual(ns, trigger)
+
+ cnt = MemoryContent(b'\x01\x02\x03')
+
+ rule = '''
+rule testing {
+
+ condition:
+ kaitai.basic_test.field0 == 1
+
+}
+'''
+
+ scanner = ContentScanner(rule)
+ ctx = scanner.analyze(self._options, cnt)
+
+ self.assertIsNotNone(ctx)
+
+ self.assertFalse(ctx.has_match_for_rule('no_such_rule'))
+
+ self.assertTrue(ctx.has_match_for_rule('testing'))
+
+
+ definitions = '''
+meta:
+ id: other_basic_test
+seq:
+ - id: field0
+ type: u1
+ - id: field1
+ type: u1
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ trigger = KaitaiTrigger(kstruct)
+
+ root_ns = core.get_rost_root_namespace()
+
+ ns = root_ns.resolve('kaitai')
+ ns.register_item(trigger)
+
+ ns = ns.resolve('other_basic_test')
+ self.assertEqual(ns, trigger)
+
+ cnt = MemoryContent(b'\x01\x02\x03')
+
+ rule = '''
+rule testing {
+
+ condition:
+ kaitai.other_basic_test.field0 == 1 and kaitai.other_basic_test.field1 == 2
+
+}
+'''
+
+ scanner = ContentScanner(rule)
+ ctx = scanner.analyze(self._options, cnt)
+
+ self.assertIsNotNone(ctx)
+
+ self.assertTrue(ctx.has_match_for_rule('testing'))
+
+
+ rule = '''
+rule testing {
+
+ condition:
+ kaitai.other_basic_test.field0 == 1 and kaitai.other_basic_testXXXX.field1 == 2
+
+}
+'''
+
+ scanner = ContentScanner(rule)
+ ctx = scanner.analyze(self._options, cnt)
+
+ self.assertIsNotNone(ctx)
+
+ self.assertFalse(ctx.has_match_for_rule('testing'))
+
+
+ def testKaitaiDefinitionWithListForScanning(self):
+ """Access list items from Kaitai definition when scanning with ROST."""
+
+ definitions = '''
+meta:
+ id: test_with_list
+seq:
+ - id: field0
+ type: u1
+ repeat: eos
+'''
+
+ kstruct = KaitaiStruct(definitions)
+
+ trigger = KaitaiTrigger(kstruct)
+
+ root_ns = core.get_rost_root_namespace()
+
+ ns = root_ns.resolve('kaitai')
+ ns.register_item(trigger)
+
+ ns = ns.resolve('test_with_list')
+ self.assertEqual(ns, trigger)
+
+ cnt = MemoryContent(b'\x01\x02\x03')
+
+ rule = '''
+rule testing {
+
+ condition:
+ kaitai.test_with_list.field0[0] == 1 and kaitai.test_with_list.field0[1] == 2 and kaitai.test_with_list.field0[2] == 3
+
+}
+'''
+
+ scanner = ContentScanner(rule)
+ ctx = scanner.analyze(self._options, cnt)
+
+ self.assertIsNotNone(ctx)
+
+ self.assertTrue(ctx.has_match_for_rule('testing'))
diff --git a/tests/plugins/yaml.py b/tests/plugins/yaml.py
new file mode 100644
index 0000000..4d2680c
--- /dev/null
+++ b/tests/plugins/yaml.py
@@ -0,0 +1,175 @@
+#!/usr/bin/python3-dbg
+# -*- coding: utf-8 -*-
+
+
+from chrysacase import ChrysalideTestCase
+from pychrysalide.plugins import yaml
+
+
+class TestYamlSupport(ChrysalideTestCase):
+ """TestCase for the YAML support."""
+
+
+ def testParseSimpleYamlContent(self):
+ """Parse basic YAML content."""
+
+ definitions = '''
+a: av
+b: bv
+c: cv
+'''
+
+ root = yaml.parse_from_text(definitions)
+
+ self.assertFalse(root.is_sequence)
+
+ self.assertEqual(root.nodes[0].key, 'a')
+ self.assertEqual(root.nodes[1].key, 'b')
+ self.assertEqual(root.nodes[2].key, 'c')
+
+ self.assertEqual(root.nodes[0].value, 'av')
+ self.assertEqual(root.nodes[1].value, 'bv')
+ self.assertEqual(root.nodes[2].value, 'cv')
+
+ definitions = '''
+- a: av
+- b: bv
+- c: cv
+'''
+
+ root = yaml.parse_from_text(definitions)
+
+ self.assertTrue(root.is_sequence)
+
+ self.assertEqual(root.nodes[0].nodes[0].key, 'a')
+ self.assertEqual(root.nodes[1].nodes[0].key, 'b')
+ self.assertEqual(root.nodes[2].nodes[0].key, 'c')
+
+ self.assertEqual(root.nodes[0].nodes[0].value, 'av')
+ self.assertEqual(root.nodes[1].nodes[0].value, 'bv')
+ self.assertEqual(root.nodes[2].nodes[0].value, 'cv')
+
+
+ def testSearchYamlNodes(self):
+ """Search YAML nodes related to paths."""
+
+ definitions = '''
+root:
+ a: v0
+ b: v1
+ c: v2
+ sub:
+ aa: v00
+ bb: v01
+ cc:
+ - i: w
+ - j: x
+ - k: c
+ d: v3
+'''
+
+ root = yaml.parse_from_text(definitions)
+
+ found = root.find_first_by_path('/root/a')
+
+ self.assertEqual(found.value, 'v0')
+
+ found = root.find_first_by_path('/root/sub')
+
+ self.assertEqual(found.value, None)
+
+ found = root.find_first_by_path('/root/sub/cc')
+
+ self.assertEqual(found.value, None)
+
+ found = root.find_first_by_path('/root/sub/cc/j')
+
+ self.assertEqual(found.value, 'x')
+
+ found = root.find_first_by_path('/root/d')
+
+ self.assertEqual(found.value, 'v3')
+
+
+ definitions = '''
+root:
+ - a: av
+ aa: aav
+ ab: abv
+ - b: bv
+ ba: bav
+ bb: bbv
+'''
+
+ root = yaml.parse_from_text(definitions)
+
+ found = root.find_first_by_path('/root/ba')
+
+ self.assertEqual(found.value, 'bav')
+
+ found = root.find_first_by_path('/root/b')
+
+ self.assertEqual(found.value, 'bv')
+
+ found = root.find_first_by_path('/root/')
+
+ self.assertTrue(found.is_sequence)
+ self.assertFalse(found.nodes[0].is_sequence)
+ self.assertEqual(found.nodes[0].nodes[0].value, 'av')
+
+
+ def testComplexYamlContent(self):
+ """Process more complex YAML content."""
+
+ definitions = '''
+root:
+ a: 'v0'
+ b: 'v1 ? 1 : 2'
+ c: v2 # final comment
+ d: "'xx::xx'"
+'''
+
+ root = yaml.parse_from_text(definitions)
+
+ found = root.find_first_by_path('/root/a')
+
+ self.assertEqual(found.value, 'v0')
+
+ found = root.find_first_by_path('/root/b')
+
+ self.assertEqual(found.value, 'v1 ? 1 : 2')
+
+ found = root.find_first_by_path('/root/c')
+
+ self.assertEqual(found.value, 'v2')
+
+ found = root.find_first_by_path('/root/d')
+
+ self.assertEqual(found.value, "'xx::xx'")
+
+
+ def testArrayAsSeq(self):
+ """Handle array as YAML block sequence."""
+
+ definitions = '''
+root:
+ a: [ a, 'b', 0xcc, "\td\n\\"'" ]
+'''
+
+ root = yaml.parse_from_text(definitions)
+
+ found = root.find_first_by_path('/root/a')
+
+ self.assertIsNone(found.value)
+
+ self.assertEqual(len(found.children.nodes), 4)
+
+ self.assertEqual(found.children.nodes[0].key, 'a')
+
+ self.assertEqual(found.children.nodes[1].key, 'b')
+
+ self.assertEqual(found.children.nodes[2].key, '0xcc')
+
+ self.assertEqual(found.children.nodes[3].key, "\td \"'")
+
+ self.assertEqual(found.aggregate_value(), '[ a, \'b\', 0xcc, " d \"\'" ]')
diff --git a/tests/plugins/yamlrdr.py b/tests/plugins/yamlrdr.py
deleted file mode 100644
index 47f02ba..0000000
--- a/tests/plugins/yamlrdr.py
+++ /dev/null
@@ -1,277 +0,0 @@
-#!/usr/bin/python3-dbg
-# -*- coding: utf-8 -*-
-
-
-from chrysacase import ChrysalideTestCase
-from pychrysalide.plugins.yaml import YamlReader
-import tempfile
-
-
-class TestYamlReader(ChrysalideTestCase):
- """TestCase for the Yaml reader."""
-
-
- @classmethod
- def setUpClass(cls):
-
- super(TestYamlReader, cls).setUpClass()
-
- cls._simple_map = tempfile.NamedTemporaryFile()
-
- cls._simple_map_data = b'''
-a: av
-b: bv
-c: cv
-
-'''
-
- cls._simple_seq = tempfile.NamedTemporaryFile()
-
- cls._simple_seq_data = b'''
-- a: av
-- b: bv
-- c: cv
-
-'''
-
- cls._nested = tempfile.NamedTemporaryFile()
-
- cls._nested_data = b'''
-root:
- a: v0
- b: v1
- c: v2
- sub:
- aa: v00
- bb: v01
- cc: v02
- - i: w
- - j: x
- - k: c
- d: v3
-
-'''
-
- cls._mixed = tempfile.NamedTemporaryFile()
-
- cls._mixed_data = b'''
-root:
- - a: av
- aa: aav
- ab: abv
- - b: bv
- ba: bav
- bb: bbv
-
-'''
-
- tmp = [
- [ cls._simple_map, cls._simple_map_data ],
- [ cls._simple_seq, cls._simple_seq_data ],
- [ cls._nested, cls._nested_data ],
- [ cls._mixed, cls._mixed_data ],
- ]
-
- for f, d in tmp:
-
- f.write(d)
- f.flush()
-
- cls.log('Using temporary file "%s"' % f.name)
-
-
- @classmethod
- def tearDownClass(cls):
-
- super(TestYamlReader, cls).tearDownClass()
-
- tmp = [
- cls._simple_map,
- cls._simple_seq,
- cls._nested,
- cls._mixed,
- ]
-
- for f in tmp:
-
- cls.log('Delete file "%s"' % f.name)
-
- f.close()
-
-
- def testSimpleYamlContent(self):
- """Validate Yaml content readers."""
-
- def _build_node_desc(node, left, extra = ''):
-
- if hasattr(node, 'key'):
-
- line = node.yaml_line
-
- prefix = '- ' if line.is_list_item else extra
- desc = left + prefix + line.key + ':' + (' ' + line.value if line.value else '') + '\n'
- indent = ' '
-
- collec = node.collection
-
- else:
-
- desc = ''
- indent = ''
-
- if hasattr(node, 'nodes'):
- collec = node
-
- if collec:
-
- if collec.is_sequence:
- extra = ' '
-
- for child in collec.nodes:
- desc += _build_node_desc(child, left + indent, extra)
-
- return desc
-
-
- reader = YamlReader.new_from_path(self._simple_map.name)
- self.assertIsNotNone(reader)
- self.assertIsNotNone(reader.tree)
-
- fulldesc = _build_node_desc(reader.tree.root, '')
-
- self.assertEqual('\n' + fulldesc + '\n', self._simple_map_data.decode('ascii'))
-
- reader = YamlReader.new_from_path(self._simple_seq.name)
- self.assertIsNotNone(reader)
- self.assertIsNotNone(reader.tree)
-
- fulldesc = _build_node_desc(reader.tree.root, '')
-
- self.assertEqual('\n' + fulldesc + '\n', self._simple_seq_data.decode('ascii'))
-
- reader = YamlReader.new_from_path(self._nested.name)
- self.assertIsNotNone(reader)
- self.assertIsNotNone(reader.tree)
-
- fulldesc = _build_node_desc(reader.tree.root, '')
-
- self.assertEqual('\n' + fulldesc + '\n', self._nested_data.decode('ascii'))
-
- reader = YamlReader.new_from_path(self._mixed.name)
- self.assertIsNotNone(reader)
- self.assertIsNotNone(reader.tree)
-
- fulldesc = _build_node_desc(reader.tree.root, '')
-
- self.assertEqual('\n' + fulldesc + '\n', self._mixed_data.decode('ascii'))
-
-
- def testSimpleYamlContentFinder(self):
- """Validate Yaml nested content search."""
-
- reader = YamlReader.new_from_path(self._nested.name)
- self.assertIsNotNone(reader)
-
- found = reader.tree.find_by_path('/root/sub')
-
- self.assertEqual(len(found), 1)
-
- if len(found) == 1:
- self.assertEqual(found[0].key, 'sub')
-
- found = reader.tree.find_by_path('/root/sub/')
-
- self.assertEqual(len(found), 3)
-
- found = reader.tree.find_by_path('/root/sub/xx')
-
- self.assertEqual(len(found), 0)
-
- found = reader.tree.find_by_path('/root/sub/cc/i')
-
- self.assertEqual(len(found), 1)
-
- if len(found) == 1:
- self.assertEqual(found[0].key, 'i')
- self.assertEqual(found[0].yaml_line.is_list_item, True)
-
- found = reader.tree.find_by_path('/root/sub/cc')
-
- self.assertEqual(len(found), 1)
-
- if len(found) == 1:
-
- root = found[0]
-
- found = root.find_by_path('cc/i')
-
- self.assertEqual(len(found), 1)
-
- if len(found) == 1:
-
- self.assertEqual(found[0].key, 'i')
- self.assertEqual(found[0].yaml_line.is_list_item, True)
-
- found = root.find_by_path('/cc/i')
-
- self.assertEqual(len(found), 1)
-
- if len(found) == 1:
-
- self.assertEqual(found[0].key, 'i')
- self.assertEqual(found[0].yaml_line.is_list_item, True)
-
- found = root.find_by_path('//i')
-
- self.assertEqual(len(found), 1)
-
- if len(found) == 1:
-
- self.assertEqual(found[0].key, 'i')
- self.assertEqual(found[0].yaml_line.is_list_item, True)
-
-
- def testMixedYamlContentFinder(self):
- """Validate Yaml mixed content search."""
-
- reader = YamlReader.new_from_path(self._mixed.name)
- self.assertIsNotNone(reader)
-
- found = reader.tree.find_by_path('/root')
-
- self.assertEqual(len(found), 1)
-
- if len(found) == 1:
- self.assertEqual(found[0].key, 'root')
-
- found = reader.tree.find_by_path('/root/', True)
-
- self.assertEqual(len(found), 1)
-
- found = reader.tree.find_one_by_path('/root/', True)
-
- self.assertIsNotNone(found)
-
- if found:
-
- sub = found.find_one_by_path('/a')
- self.assertIsNotNone(sub)
- self.assertEqual(sub.key, 'a')
-
- sub = found.find_one_by_path('/aa')
- self.assertIsNotNone(sub)
- self.assertEqual(sub.key, 'aa')
-
- found = reader.tree.find_by_path('/root/')
-
- self.assertEqual(len(found), 2)
-
- if len(found) == 2:
-
- sub = found[0].find_one_by_path('/a')
- self.assertIsNotNone(sub)
- self.assertEqual(sub.key, 'a')
-
- sub = found[0].find_one_by_path('/aa')
- self.assertIsNotNone(sub)
- self.assertEqual(sub.key, 'aa')