diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/analysis/scan/booleans.py | 98 | ||||
-rw-r--r-- | tests/analysis/scan/common.py | 54 | ||||
-rw-r--r-- | tests/analysis/scan/examples.py | 70 | ||||
-rw-r--r-- | tests/analysis/scan/functions.py | 239 | ||||
-rw-r--r-- | tests/analysis/scan/fuzzing.py | 289 | ||||
-rw-r--r-- | tests/analysis/scan/grammar.py | 484 | ||||
-rw-r--r-- | tests/analysis/scan/matches.py | 64 | ||||
-rw-r--r-- | tests/analysis/scan/pyapi.py | 297 | ||||
-rw-r--r-- | tests/analysis/scan/scanning_hex.py | 716 | ||||
-rw-r--r-- | tests/analysis/scan/scanning_str.py | 194 | ||||
-rw-r--r-- | tests/analysis/scan/sets.py | 118 | ||||
-rw-r--r-- | tests/common/bitfield.py | 174 | ||||
-rw-r--r-- | tests/common/itoa.py | 28 | ||||
-rw-r--r-- | tests/plugins/encodings/all.py | 23 | ||||
-rw-r--r-- | tests/plugins/kaitai/__init__.py | 0 | ||||
-rw-r--r-- | tests/plugins/kaitai/language.py | 2474 | ||||
-rw-r--r-- | tests/plugins/kaitai/rost.py | 170 | ||||
-rw-r--r-- | tests/plugins/yaml.py | 175 | ||||
-rw-r--r-- | tests/plugins/yamlrdr.py | 277 |
19 files changed, 5667 insertions, 277 deletions
diff --git a/tests/analysis/scan/booleans.py b/tests/analysis/scan/booleans.py new file mode 100644 index 0000000..aa3c1a3 --- /dev/null +++ b/tests/analysis/scan/booleans.py @@ -0,0 +1,98 @@ + +from common import RostTestClass + + +class TestRostBooleans(RostTestClass): + """TestCases for booleans and ROST.""" + + def testFinalCondition(self): + """Validate the final condition.""" + + rule = ''' +rule test { + + condition: + false + +} +''' + + self.check_rule_failure(rule) + + + rule = ''' +rule test { + + condition: + true + +} +''' + + self.check_rule_success(rule) + + + def testBasicBooleanOperations(self): + """Evaluate basic boolean operations.""" + + rule = ''' +rule test { + + condition: + true and false + +} +''' + + self.check_rule_failure(rule) + + + rule = ''' +rule test { + + condition: + true or false + +} +''' + + self.check_rule_success(rule) + + + def testImplicitCast(self): + """Imply implicit casts to booleans.""" + + rule = ''' +rule test { + + condition: + true and 0 + +} +''' + + self.check_rule_failure(rule) + + + rule = ''' +rule test { + + condition: + 1 or false + +} +''' + + self.check_rule_success(rule) + + + rule = ''' +rule test { + + condition: + 1 or () + +} +''' + + self.check_rule_success(rule) diff --git a/tests/analysis/scan/common.py b/tests/analysis/scan/common.py new file mode 100644 index 0000000..507b7e2 --- /dev/null +++ b/tests/analysis/scan/common.py @@ -0,0 +1,54 @@ + +from chrysacase import ChrysalideTestCase +from pychrysalide.analysis.contents import MemoryContent +from pychrysalide.analysis.scan import ContentScanner +from pychrysalide.analysis.scan import ScanOptions +from pychrysalide.analysis.scan.patterns.backends import AcismBackend + + +class RostTestClass(ChrysalideTestCase): + """TestCase for analysis.scan.ScanExpression.""" + + @classmethod + def setUpClass(cls): + + super(RostTestClass, cls).setUpClass() + + cls._options = ScanOptions() + cls._options.backend_for_data = AcismBackend + + cls._empty_content = MemoryContent(b'') + + + def _validate_rule_result(self, rule, content, expected): + """Check for scan success or failure.""" + + scanner = ContentScanner(rule) + ctx = scanner.analyze(self._options, content) + + self.assertIsNotNone(ctx) + + if expected: + self.assertTrue(ctx.has_match_for_rule('test')) + else: + self.assertFalse(ctx.has_match_for_rule('test')) + + return scanner, ctx + + + def check_rule_success(self, rule, content = None): + """Check for scan success.""" + + if content is None: + content = self._empty_content + + self._validate_rule_result(rule, content, True) + + + def check_rule_failure(self, rule, content = None): + """Check for scan failure.""" + + if content is None: + content = self._empty_content + + self._validate_rule_result(rule, content, False) diff --git a/tests/analysis/scan/examples.py b/tests/analysis/scan/examples.py new file mode 100644 index 0000000..74b5094 --- /dev/null +++ b/tests/analysis/scan/examples.py @@ -0,0 +1,70 @@ + +from common import RostTestClass +from pychrysalide.analysis.contents import MemoryContent + + +class TestRostExamples(RostTestClass): + """TestCases for the examples provides in the ROST documentation.""" + + def testComments(self): + """Ensure comments do not bother rule definitions.""" + + rule = ''' +/* + Multi-line header... +*/ + +rule test { // comment + + /* + * Some context + */ + + condition: /* List of condition(s) */ + true // Dummy condition + +} +''' + + self.check_rule_success(rule) + + + def testArithmeticPrecedence(self): + """Take care of arithmetic operators precedence.""" + + rule = ''' +rule test { // MulFirst + + condition: + 1 + 4 * (3 + 2) == 21 + and + (1 + 4) * (3 + 2) == 25 + +} +''' + + self.check_rule_success(rule) + + + def testUintCast(self): + """Process nested integer values from binary content.""" + + cnt = MemoryContent(b'\x4d\x5a\x00\x00' + b'\x50\x45\x00\x00' + 52 * b'\x00' + b'\x04\x00\x00\x00') + + rule = ''' +rule test { // IsPE + + condition: + + // MZ signature at offset 0 and ... + + uint16(0) == 0x5a4d and + + // ... PE signature at offset stored in the MZ header at offset 0x3c + + uint32(uint32(0x3c)) == 0x00004550 + +} +''' + + self.check_rule_success(rule, cnt) diff --git a/tests/analysis/scan/functions.py b/tests/analysis/scan/functions.py new file mode 100644 index 0000000..6aca957 --- /dev/null +++ b/tests/analysis/scan/functions.py @@ -0,0 +1,239 @@ + +from common import RostTestClass +from pychrysalide.analysis.contents import MemoryContent + + +class TestRostFunctions(RostTestClass): + """TestCases for the core functions of ROST.""" + + # Core + # ==== + + def testSetCounter(self): + """Count quantities and set sizes.""" + + rule = ''' +rule test { + + condition: + count("ABC") == 3 + and count("AB", "C") == count("ABC") + +} +''' + + self.check_rule_success(rule) + + + cnt = MemoryContent(b'\x01\x02\x02\x03\x03\x03') + + rule = ''' +rule test { + + bytes: + $int_01 = "\x01" + $int_02 = "\x02" + $int_3 = "\x03" + + condition: + count($int_0*, $int_3) == #int_* + +} +''' + + self.check_rule_success(rule, cnt) + + + def testDatasize(self): + """Handle the size of the provided data.""" + + cnt = MemoryContent(b'\x01\x02\x03\x04') + + cases = [ + 'datasize == 4', + 'uint16(0) == 0x201 and uint16(datasize - 2) == 0x0403', + ] + + for c in cases: + + rule = ''' +rule test { + + condition: + %s + +} +''' % c + + self.check_rule_success(rule, cnt) + + + def testMaxCommon(self): + """Count the largest quantity of same items in a set.""" + + cnt = MemoryContent(b'') + + cases = [ + [ '1', 1 ], + [ '1, 2, 3', 1 ], + [ '1, 2, 1, 3, 1', 3 ], + [ '1, "a", 2, 3, "a"', 2 ], + ] + + for c, q in cases: + + rule = ''' +rule test { + + condition: + maxcommon(%s) == %u + +} +''' % (c, q) + + self.check_rule_success(rule, cnt) + + + # Modules + # ======= + + def testConsole(self): + """Ensure logging always returns true.""" + + rule = ''' +rule test { + + condition: + console.log() + +} +''' + + self.check_rule_success(rule) + + + def testMagic(self): + """Scan text content with the Magic module.""" + + cnt = MemoryContent(b'aaaa') + + cases = [ + [ 'type', 'ASCII text, with no line terminators' ], + [ 'mime_encoding', 'us-ascii' ], + [ 'mime_type', 'text/plain' ], + ] + + for target, expected in cases: + + rule = ''' +rule test { + + condition: + magic.%s() == "%s" + +} +''' % (target, expected) + + self.check_rule_success(rule, cnt) + + + def testMathOperations(self): + """Perform math operations with core functions.""" + + rule = ''' +rule test { + + condition: + math.to_string(123) == "123" + and math.to_string(291, 16) == "0x123" + and math.to_string(-83, 8) == "-0123" + and math.to_string(123, 2) == "0b1111011" + +} +''' + + self.check_rule_success(rule) + + + def testStringOperations(self): + """Perform string operations with core functions.""" + + rule = ''' +rule test { + + condition: + string.lower("ABCd") == "abcd" and string.lower("123abc") == "123abc" + +} +''' + + self.check_rule_success(rule) + + + rule = ''' +rule test { + + condition: + string.upper("abcD") == "ABCD" and string.upper("123ABC") == "123ABC" + +} +''' + + self.check_rule_success(rule) + + + rule = ''' +rule test { + + condition: + string.to_int("123") == 123 + and string.to_int("123", 16) == 291 + and string.to_int("0x123") == 291 + and string.to_int("-0123") == -83 + +} +''' + + self.check_rule_success(rule) + + + rule = r''' +rule test { + + condition: + "A\x00B\x00C\x00D\x00" endswith string.wide("CD") + and "A\x00B\x00C\x00D\x00" contains string.wide("BC") + +} +''' + + self.check_rule_success(rule) + + + def testTime(self): + """Check current time.""" + + # Cf. https://www.epochconverter.com/ + + rule = ''' +rule test { + + condition: + time.make(2023, 8, 5, 22, 8, 41) == 0x64cec869 + +} +''' + + self.check_rule_success(rule) + + + rule = ''' +rule test { + + condition: + time.now() >= 0x64cec874 and time.now() <= time.now() + +} +''' + + self.check_rule_success(rule) diff --git a/tests/analysis/scan/fuzzing.py b/tests/analysis/scan/fuzzing.py new file mode 100644 index 0000000..1b9b25b --- /dev/null +++ b/tests/analysis/scan/fuzzing.py @@ -0,0 +1,289 @@ + +from common import RostTestClass +from pychrysalide.analysis.contents import MemoryContent +from pychrysalide.analysis.scan import ContentScanner +from pychrysalide.analysis.scan import ScanOptions +from pychrysalide.analysis.scan.patterns.backends import AcismBackend +from pychrysalide.analysis.scan.patterns.backends import BitapBackend + + +class TestRostFuzzingFixes(RostTestClass): + """TestCases to remember all the fixes for crashes identified by fuzzing.""" + + def testEmptyPatternListWithContent(self): + """Check no backend is run if there is no pattern to look for.""" + + content = MemoryContent(b'\n') + + rule = ''' +''' + + backends = [ + AcismBackend, # This one was segfaulting + BitapBackend, + ] + + for b in backends: + + options = ScanOptions() + options.backend_for_data = b + + scanner = ContentScanner(rule) + ctx = scanner.analyze(options, content) + + self.assertIsNotNone(ctx) + + + def testMandatoryCondition(self): + """Ensure a condition section exists in a rule.""" + + rule = ''' +rule test { + +} +''' + + with self.assertRaisesRegex(ValueError, 'Unable to create content scanner'): + + scanner = ContentScanner(rule) + + + def testNonExistingPattern(self): + """Avoid to count the matches of a non-existing pattern.""" + + rule = ''' +rule test { + + condition: + #badid + +} +''' + + with self.assertRaisesRegex(ValueError, 'Unable to create content scanner'): + + scanner = ContentScanner(rule) + + + def testNamespacesWithoutReductionCode(self): + """Clean the code for ROST namespaces.""" + + rule = ''' +rule test { + + condition: + console + +} +''' + + self.check_rule_failure(rule) + + + def testCallOnNonCallable(self): + """Reject calls on non callable expressions softly.""" + + rule = ''' +rule test { + + condition: + console.log().log() + +} +''' + + self.check_rule_failure(rule) + + + def testSelfReferencingRule(self): + """Reject any rule referencing itself as match condition.""" + + rule = ''' +rule test { + + condition: + test + +} +''' + + self.check_rule_failure(rule) + + + def testSelfReferencingRule(self): + """Expect only one argument for the not operator, even in debug mode.""" + + rule = ''' +rule test { + + condition: + not(0) + +} +''' + + self.check_rule_success(rule) + + + def testNoCommon(self): + """Handle the case where no common item is found from an empty set.""" + + rule = ''' +rule test { + + bytes: + $a = "a" + + condition: + maxcommon($a) == 0 + +} +''' + + self.check_rule_success(rule) + + + def testAAsAcharacter(self): + """Consider the 'a' character as a valid lowercase character.""" + + rule = ''' +rule test { + + bytes: + $a = "0000a0I0" nocase + + condition: + $a + +} +''' + + self.check_rule_failure(rule) + + + def testAAsAcharacter(self): + """Do not expect initialized trackers when there is no real defined search pattern.""" + + rule = ''' +rule test { + + bytes: + $a = {[0]} + + condition: + $a + +} +''' + + with self.assertRaisesRegex(ValueError, 'Unable to create content scanner'): + + scanner = ContentScanner(rule) + + + def testAllocations(self): + """Handle big alloctions for strings in conditions with regular expressions.""" + + rule = ''' +rule test { + + condition: + "%s" == "%s" + +} +''' % ("0" * (256 * 2 + 8), "0" * (256 * 2 + 8)) + + self.check_rule_success(rule) + + + def testFileFinalAccess(self): + """Ensure patterns found at the edges of scanned content do not crash the scanner.""" + + cnt = MemoryContent(bytes([ 0 for i in range(16) ])) + + rule = ''' +rule test { + + bytes: + $a = { 00 00 00 00 00 00 00 00 } + + condition: + $a + +} +''' + + self.check_rule_success(rule, cnt) + + + def testValidHexRangeMerge(self): + """Merge valid hexadecimal ranges.""" + + rule = ''' +rule test { + + bytes: + $a = { [0] ?? } + + condition: + $a + +} +''' + + with self.assertRaisesRegex(ValueError, 'Unable to create content scanner'): + + scanner = ContentScanner(rule) + + + rule = ''' +rule test { + + bytes: + $a = { [2] ?? } + + condition: + $a + +} +''' + + self.check_rule_failure(rule) + + + def testSmallBase64(self): + """Handle small base64 encodings which may produce few patterns.""" + + rule = ''' +rule test { + + bytes: + $a = "0" base64 + + condition: + $a + +} +''' + + self.check_rule_failure(rule) + + + def testCountIndex(self): + """Ban pattern count indexes from the grammer.""" + + rule = ''' +rule test { + + bytes: + $a = "1" + + condition: + #*[0] + +} +''' + + with self.assertRaisesRegex(ValueError, 'Unable to create content scanner'): + + scanner = ContentScanner(rule) diff --git a/tests/analysis/scan/grammar.py b/tests/analysis/scan/grammar.py new file mode 100644 index 0000000..14f67fa --- /dev/null +++ b/tests/analysis/scan/grammar.py @@ -0,0 +1,484 @@ + +import json + +from common import RostTestClass +from pychrysalide.analysis.contents import MemoryContent + + +class TestRostGrammar(RostTestClass): + """TestCases for the ROST grammar.""" + + def testRelationalExpressions(self): + """Build expressions with relational comparisons.""" + + cases = [ + + # Regular + [ '-1', '<=', '2', True ], + [ '-1', '<=', '2', True ], + [ '"aaa"', '==', '"aaa"', True ], + [ '"aaa"', '<', '"aaaa"', True ], + [ '""', '<', '"aaaa"', True ], + + # Cast + [ 'false', '==', '0', True ], + [ 'false', '==', '1', False ], + [ 'true', '!=', '0', True ], + [ '1', '==', 'true', True ], + [ 'false', '==', '()', True ], + [ 'true', '==', '(0,)', True ], + + ] + + for op1, kwd, op2, expected in cases: + + rule = ''' +rule test { + + condition: + %s %s %s + +} +''' % (op1, kwd, op2) + + if expected: + self.check_rule_success(rule) + else: + self.check_rule_failure(rule) + + + def testLogicalOperations(self): + """Evaluate some logical operations.""" + + cases = [ + [ 'true and false', False ], + [ 'false or false', False ], + [ 'true and true or false', True ], + [ 'false or true and false', False ], + [ '1 or false', True ], + ] + + for cond, expected in cases: + + rule = ''' +rule test { + + condition: + %s + +} +''' % (cond) + + if expected: + self.check_rule_success(rule) + else: + self.check_rule_failure(rule) + + + def testArithmeticOperations(self): + """Evaluate some arithmetic operations.""" + + cases = [ + + # Clever + '1 + 2 == 3', + '10 + -3 == 7', + '-3 + 10 == 7', + '-10 - 1 < 0', + '-10 - 1 == -11', + '(-10 - 1) == -11', + '(-1 - -10) == 9', + '-2 * -3 == 6', + '-2 * 3 == -6', + + # Legacy + '1 + 4 * 3 + 2 == 15', + '(1 + 4) * 3 + 2 == 17', + '1 + 4 * (3 + 2) == 21', + '(1 + 4) * (3 + 2) == 25', + + ] + + for c in cases: + + rule = ''' +rule test { + + condition: + %s + +} +''' % (c) + + self.check_rule_success(rule) + + + def testBasicStringsOperations(self): + """Build expressions with basic strings operations.""" + + cases = [ + + # Clever + [ '123---456', 'contains', '---', True ], + [ '123---456', 'contains', 'xxx', False ], + [ '---123---456', 'startswith', '---', True ], + [ '---123---456', 'startswith', 'xxx', False ], + [ '123---456---', 'endswith', '---', True ], + [ '123---456---', 'endswith', 'xxx', False ], + [ 'AAA---BBB', 'icontains', 'aaa', True ], + [ 'AAA---BBB', 'icontains', 'xxx', False ], + [ 'AAA---BBB', 'istartswith', 'aAa', True ], + [ 'AAA---BBB', 'istartswith', 'xxx', False ], + [ 'AAA---BBB', 'iendswith', 'bBb', True ], + [ 'AAA---BBB', 'iendswith', 'xxx', False ], + [ 'AzertY', 'iequals', 'AZERTY', True ], + [ 'AzertY', 'iequals', 'AZERTY-', False ], + + # Legacy + [ '123\t456', 'contains', '\t', True ], + [ '123-456', 'startswith', '1', True ], + [ '123-456', 'startswith', '1234', False ], + [ '123-456', 'endswith', '6', True ], + [ '123-456', 'endswith', '3456', False ], + + ] + + for op1, kwd, op2, expected in cases: + + rule = ''' +rule test { + + condition: + "%s" %s "%s" + +} +''' % (op1, kwd, op2) + + if expected: + self.check_rule_success(rule) + else: + self.check_rule_failure(rule) + + + def testSizeUnits(self): + """Evaluate size units.""" + + cases = [ + '1KB == 1024', + '2MB == 2 * 1024 * 1024', + '4Kb == (4 * 1024)', + '1KB <= 1024 and 1024 < 1MB', + ] + + for c in cases: + + rule = ''' +rule test { + + condition: + %s + +} +''' % (c) + + self.check_rule_success(rule) + + + def testPrivateRules(self): + """Ensure private rules remain silent.""" + + for private in [ True, False ]: + for state in [ True, False ]: + + rule = ''' +%srule silent { + + condition: + %s + +} + +rule test { + + condition: + silent + +} +''' % ('private ' if private else '', 'true' if state else 'false') + + scanner, ctx = self._validate_rule_result(rule, self._empty_content, state) + + data = scanner.convert_to_json(ctx) + jdata = json.loads(data) + + # Exemple : + # + # [{'bytes_patterns': [], 'matched': True, 'name': 'test'}, + # {'bytes_patterns': [], 'matched': True, 'name': 'silent'}] + + found = len([ j['name'] for j in jdata if j['name'] == 'silent' ]) > 0 + + self.assertTrue(private ^ found) + + + def testGlobalRules(self): + """Take global rules into account.""" + + for glob_state in [ True, False ]: + for state in [ True, False ]: + + rule = ''' +%srule silent { + + condition: + %s + +} + +rule test { + + condition: + true + +} +''' % ('global ' if glob_state else '', 'true' if state else 'false') + + expected = not(glob_state) or state + + if expected: + self.check_rule_success(rule) + else: + self.check_rule_failure(rule) + + + def testMatchCount(self): + """Ensure match count provides expected values.""" + + cnt = MemoryContent(b'\x01\x02\x02\x03\x03\x03') + + rule = ''' +rule test { + + bytes: + $int_01 = "\x01" + $int_02 = "\x02" + $int_03 = "\x03" + + condition: + #int_01 == count($int_01) and #int_01 == 1 + and #int_02 == count($int_02) and #int_02 == 2 + and #int_03 == count($int_03) and #int_03 == 3 + and #int_0* == count($int_0*) and #int_0* == 6 + +} +''' + + self.check_rule_success(rule, cnt) + + + def testBackingUpHandlers(self): + """Ensure handlers for backing up removals do not limit the grammar.""" + + cnt = MemoryContent(b'AB12') + + # Uncompleted token in rule definition: '?? ?? ' + + rule = ''' +rule test { + + bytes: + $a = { ?? ?? } + + condition: + #a == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '?? ' + + rule = ''' +rule test { + + bytes: + $a = { ?? 4? } + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '?? ?' + + rule = ''' +rule test { + + bytes: + $a = { ?? ?2 } + + condition: + #a == 2 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '?? ' + + rule = ''' +rule test { + + bytes: + $a = { ?? 42 } + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + # Uncompleted token in rule definition: '?1 ?' + + rule = ''' +rule test { + + bytes: + $a = { ?1 ?? } + + condition: + #a == 2 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '?1 4? ' + + rule = ''' +rule test { + + bytes: + $a = { ?1 4? } + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '?1 ?2 ' + + rule = ''' +rule test { + + bytes: + $a = { ?1 ?2 } + + condition: + #a == 2 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '?1 4' + + rule = ''' +rule test { + + bytes: + $a = { ?1 42 } + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + # Uncompleted token in rule definition: '41 ' + + rule = ''' +rule test { + + bytes: + $a = { 41 ?? } + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '41 4' + + rule = ''' +rule test { + + bytes: + $a = { 41 4? } + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '41 ' + + rule = ''' +rule test { + + bytes: + $a = { 41 ?2 } + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + # Uncompleted token in rule definition: '41 42 ' + + rule = ''' +rule test { + + bytes: + $a = { 41 42 } + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + + +# TODO : test <haystack> matches <regex> + + + diff --git a/tests/analysis/scan/matches.py b/tests/analysis/scan/matches.py new file mode 100644 index 0000000..efcae4f --- /dev/null +++ b/tests/analysis/scan/matches.py @@ -0,0 +1,64 @@ + +from common import RostTestClass +from pychrysalide.analysis.contents import MemoryContent + + +class TestRostMatchs(RostTestClass): + """TestCases for the ROST pattern matching engine.""" + + def testCountMatches(self): + """Count matched patterns.""" + + cnt = MemoryContent(b'aaa aaa bbb aaa') + + rule = ''' +rule test { + + bytes: + $a = "aaa" + $b = "bbb" + + condition: + #a == 3 and #b < 2 + +} +''' + + self.check_rule_success(rule, cnt) + + + def testCountSameMatches(self): + """Count matches of similar patterns.""" + + cnt = MemoryContent(b'ABCDabcdABCDabcd') + + rule = ''' +rule test { + + bytes: + $a = "\x61\x62\x63\x64" + $b = "\x61\x62\x63\x64" + + condition: + #a == 2 and #b == 2 + +} +''' + + self.check_rule_success(rule, cnt) + + + rule = ''' +rule test { + + bytes: + $a = "\x61\x62\x63\x64" + $b = "\x61\x62\x63" + + condition: + #a == 2 and #b == 2 + +} +''' + + self.check_rule_success(rule, cnt) diff --git a/tests/analysis/scan/pyapi.py b/tests/analysis/scan/pyapi.py new file mode 100644 index 0000000..7a697b3 --- /dev/null +++ b/tests/analysis/scan/pyapi.py @@ -0,0 +1,297 @@ + +import binascii +import struct + +from chrysacase import ChrysalideTestCase +from gi._constants import TYPE_INVALID +from pychrysalide.analysis.scan import ScanExpression +from pychrysalide.analysis.scan import ScanOptions +from pychrysalide.analysis.scan import find_token_modifiers_for_name +from pychrysalide.glibext import ComparableItem + + +class TestRostPythonAPI(ChrysalideTestCase): + """TestCase for the ROST Python API.""" + + def testEmptyOptions(self): + """Check default scan options.""" + + ops = ScanOptions() + + self.assertEqual(ops.backend_for_data, TYPE_INVALID) + + + def testDirectInstancesOfExpression(self): + """Reject direct instances of ROST expressions.""" + + with self.assertRaisesRegex(RuntimeError, 'pychrysalide.analysis.scan.ScanExpression is an abstract class'): + + e = ScanExpression() + + + def testBooleanComparison(self): + """Compare custom scan expressions.""" + + class StrLenExpr(ScanExpression): + + def __init__(self, value): + super().__init__(ScanExpression.ScanReductionState.REDUCED) + self._value = value + + def _cmp_rich(self, other, op): + + if op == ComparableItem.RichCmpOperation.EQ: + return len(self._value) == len(other._value) + + + e0 = StrLenExpr('00000000000') + + e1 = StrLenExpr('00000000000') + + e2 = StrLenExpr('000000000000000000000000000') + + self.assertTrue(e0 == e1) + + # !? + # Python teste e0 != e1 (non implémenté), puis e1 != e0 (pareil) et en déduit une différence ! + # self.assertFalse(e0 != e1) + + self.assertFalse(e0 == e2) + + # TypeError: '<' not supported between instances of 'StrLenExpr' and 'StrLenExpr' + with self.assertRaisesRegex(TypeError, '\'<\' not supported between instances'): + self.assertTrue(e0 < e1) + + + def testBytePatternModifiers(self): + """Validate the bytes produced by modifiers.""" + + mod = find_token_modifiers_for_name('plain') + self.assertIsNotNone(mod) + + source = b'ABC' + transformed = mod.transform(source) + + self.assertEqual(source, transformed[0]) + + + mod = find_token_modifiers_for_name('hex') + self.assertIsNotNone(mod) + + source = b'ABC' + transformed = mod.transform(source) + + self.assertEqual(binascii.hexlify(source), transformed[0]) + + + mod = find_token_modifiers_for_name('rev') + self.assertIsNotNone(mod) + + source = b'ABC' + transformed = mod.transform(source) + + self.assertEqual(source[::-1], transformed[0]) + + + mod = find_token_modifiers_for_name('lower') + self.assertIsNotNone(mod) + + source = b'AbC' + transformed = mod.transform(source) + + self.assertEqual(source.lower(), transformed[0]) + + + mod = find_token_modifiers_for_name('upper') + self.assertIsNotNone(mod) + + source = b'AbC' + transformed = mod.transform(source) + + self.assertEqual(source.upper(), transformed[0]) + + + mod = find_token_modifiers_for_name('wide') + self.assertIsNotNone(mod) + + source = b'ABC' + transformed = mod.transform(source) + + self.assertEqual(source.decode('ascii'), transformed[0].decode('utf-16-le')) + + + mod = find_token_modifiers_for_name('base64') + self.assertIsNotNone(mod) + + source = b'ABC' + transformed = mod.transform(source) + + self.assertEqual(len(transformed), 3) + self.assertEqual(transformed[0], b'QUJD') + self.assertEqual(transformed[1], b'FCQ') + self.assertEqual(transformed[2], b'BQk') + + + def testClassicalAPIHashing(self): + """Reproduce classical API Hashing results.""" + + def b2i(t): + return struct.unpack('<I', t)[0] + + + # Example: + # - PlugX (2020) - https://vms.drweb.fr/virus/?i=21512304 + + mod = find_token_modifiers_for_name('crc32') + self.assertIsNotNone(mod) + + source = b'GetCurrentProcess\x00' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0x3690e66) + + + # Example: + # - GuLoader (2020) - https://www.crowdstrike.com/blog/guloader-malware-analysis/ + + mod = find_token_modifiers_for_name('djb2') + self.assertIsNotNone(mod) + + source = b'GetProcAddress' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0xcf31bb1f) + + + def testCustomAPIHashing(self): + """Reproduce custom API Hashing results.""" + + def b2i(t): + return struct.unpack('<I', t)[0] + + + # Example: + # Underminer Exploit Kit (2019) - https://jsac.jpcert.or.jp/archive/2019/pdf/JSAC2019_1_koike-nakajima_jp.pdf + + mod = find_token_modifiers_for_name('add1505-shl5') + self.assertIsNotNone(mod) + + source = b'LoadLibraryA' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0x5fbff0fb) + + + # Example: + # Enigma Stealer (2023) https://www.trendmicro.com/es_mx/research/23/b/enigma-stealer-targets-cryptocurrency-industry-with-fake-jobs.html + + mod = find_token_modifiers_for_name('enigma-murmur') + self.assertIsNotNone(mod) + + source = b'CreateMutexW' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0xfd43765a) + + + # Examples: + # - ShadowHammer (2019) - https://blog.f-secure.com/analysis-shadowhammer-asus-attack-first-stage-payload/ + # - ShadowHammer (2019) - https://securelist.com/operation-shadowhammer-a-high-profile-supply-chain-attack/90380/ + + mod = find_token_modifiers_for_name('imul21-add') + self.assertIsNotNone(mod) + + source = b'VirtualAlloc' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0xdf894b12) + + + # Examples: + # - Bottle Exploit Kit (2019) - https://nao-sec.org/2019/12/say-hello-to-bottle-exploit-kit.html + # - ShadowHammer (2019) - https://securelist.com/operation-shadowhammer-a-high-profile-supply-chain-attack/90380/ + + mod = find_token_modifiers_for_name('imul83-add') + self.assertIsNotNone(mod) + + source = b'GetProcAddress' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0x9ab9b854) + + + # Examples: + # - ?? (2021) - https://www.threatspike.com/blogs/reflective-dll-injection + # - Mustang Panda (2022) - https://blog.talosintelligence.com/mustang-panda-targets-europe/ + + mod = find_token_modifiers_for_name('ror13') + self.assertIsNotNone(mod) + + source = b'GetProcAddress' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0x7c0dfcaa) + + source = b'VirtualAlloc' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0x91afca54) + + + # Example: + # - Energetic Bear (2019) - https://insights.sei.cmu.edu/blog/api-hashing-tool-imagine-that/ + + mod = find_token_modifiers_for_name('sll1-add-hash32') + self.assertIsNotNone(mod) + + source = b'LoadLibraryA' + transformed = mod.transform(source) + + self.assertEqual(b2i(transformed[0]), 0x000d5786) + + + # Example: + # - SideWinder/WarHawk (2022) - https://www.zscaler.com/blogs/security-research/warhawk-new-backdoor-arsenal-sidewinder-apt-group + + mod = find_token_modifiers_for_name('sub42') + self.assertIsNotNone(mod) + + source = b'LoadLibraryA' + transformed = mod.transform(source) + + self.assertEqual(transformed[0], b'\x8e\xb1\xa3\xa6\x8e\xab\xa4\xb4\xa3\xb4\xbb\x83') + + + # Example: + # - TrickBot (2021) - https://medium.com/walmartglobaltech/trickbot-crews-new-cobaltstrike-loader-32c72b78e81c + + mod = find_token_modifiers_for_name('sub-index1') + self.assertIsNotNone(mod) + + source = b'raw.githubusercontent.com' + transformed = mod.transform(source) + + self.assertEqual(transformed[0], b'\x73\x63\x7a\x32\x6c\x6f\x7b\x70\x7e\x6c\x80\x7f\x72\x80\x72\x7f\x7f\x86\x78\x82\x89\x44\x7a\x87\x86') + + + def testBytePatternModifiersAPI(self): + """Validate the API for pattern modifiers.""" + + mod = find_token_modifiers_for_name('plain') + self.assertIsNotNone(mod) + + source = [ b'ABC', b'01234' ] + transformed = mod.transform(source) + + self.assertEqual(len(source), len(transformed)) + self.assertEqual(source[0], transformed[0]) + self.assertEqual(source[1], transformed[1]) + + + mod = find_token_modifiers_for_name('xor') + self.assertIsNotNone(mod) + + source = [ b'ABC' ] + transformed = mod.transform(source, 0x20) + + self.assertEqual(transformed[0], b'abc') diff --git a/tests/analysis/scan/scanning_hex.py b/tests/analysis/scan/scanning_hex.py new file mode 100644 index 0000000..4b0fda4 --- /dev/null +++ b/tests/analysis/scan/scanning_hex.py @@ -0,0 +1,716 @@ + +from common import RostTestClass +from pychrysalide.analysis.contents import MemoryContent + + +class TestRostScanningBinary(RostTestClass): + """TestCases for the bytes section syntax (binary).""" + + def testLonelyPatterns(self): + """Evaluate the most simple patterns.""" + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { 41 } + + condition: + #a == 1 and @a[0] == 0 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { 62 } + + condition: + #a == 1 and @a[0] == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { 66 } + + condition: + #a == 1 and @a[0] == 5 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ?1 } + + condition: + #a == 1 and @a[0] == 0 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ?2 } + + condition: + #a == 1 and @a[0] == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ?6 } + + condition: + #a == 1 and @a[0] == 5 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def ___testLonelyPatternsNot(self): + """Evaluate the most simple patterns (not version).""" + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ~41 } + + condition: + #a == 5 and @a[0] == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ~62 } + + condition: + #a == 5 and @a[0] == 0 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ~66 } + + condition: + #a == 5 and @a[4] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ~?1 } + + condition: + #a == 5 and @a[0] == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ~?2 } + + condition: + #a == 5 and @a[0] == 0 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'Abcdef') + + rule = ''' +rule test { + + bytes: + $a = { ~?6 } + + condition: + #a == 5 and @a[4] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testSimpleHexPattern(self): + """Test a simple hex pattern.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 41 62 63 } + + condition: + #a == 1 and @a[0] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 2d 41 62 63 } + + condition: + #a == 1 and @a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testSimpleMaskedHexPattern(self): + """Test a simple masked hex pattern.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ?1 6? ?3 } + + condition: + #a == 1 and @a[0] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testHexPatternWithPlainAndMasked(self): + """Test hex patterns with plain and masked bytes.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 41 6? ?3 } + + condition: + #a == 1 and @a[0] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 4? 62 ?3 } + + condition: + #a == 1 and @a[0] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 4? ?2 63 } + + condition: + #a == 1 and @a[0] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 4? ?2 ?3 } + + condition: + #a == 1 and @a[0] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 2d 4? ?2 63 } + + condition: + #a == 1 and @a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 2d 4? 62 ?3 2d } + + condition: + #a == 1 and @a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 2? 41 6? 63 ?d } + + condition: + #a == 1 and @a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testHexPatternWithPlainAndHoles(self): + """Test hex patterns with plain bytes and holes.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 33 ?? 41 ?? 63 ?? 34 } + + condition: + #a == 1 and @a[0] == 2 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ?? 33 ?? 41 ?? 63 ?? 34 ?? } + + condition: + #a == 1 and @a[0] == 1 and !a[0] == 9 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ?? 33 [1-5] 63 ?? 34 ?? } + + condition: + #a == 1 and @a[0] == 1 and !a[0] == 9 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { [3-4] 41 ?? 63 ?? 34 ?? } + + condition: + #a == 1 and @a[0] == 1 and !a[0] == 9 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ?? 33 ?? 41 ?? 63 [3-] } + + condition: + #a == 1 and @a[0] == 1 and !a[0] == 9 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testHexPatternWithMaskedAndHoles(self): + """Test hex patterns with masked bytes and holes.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ?3 ?? 4? ?? 6? ?? ?4 } + + condition: + #a == 1 and @a[0] == 2 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ?? ?3 ?? 4? ?? 6? ?? ?4 ?? } + + condition: + #a == 1 and @a[0] == 1 and !a[0] == 9 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ?? ?3 [1-5] ?3 ?? ?4 ?? } + + condition: + #a == 1 and @a[0] == 1 and !a[0] == 9 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { [3-4] ?1 ?? ?3 ?? ?4 ?? } + + condition: + #a == 1 and @a[0] == 1 and !a[0] == 9 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ?? 3? ?? 4? ?? 6? [3-] } + + condition: + #a == 1 and @a[0] == 1 and !a[0] == 9 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testPipedPlainHexPatterns(self): + """Look for several patterns at once with piped definition.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 41 62 ( 63 | 64 | 65 ) } + + condition: + #a == 1 and @a[0] == 4 and !a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ( 41 | f2 | f3 ) 62 63 } + + condition: + #a == 1 and @a[0] == 4 and !a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 41 ( 61 | 62 | 63 ) 63 } + + condition: + #a == 1 and @a[0] == 4 and !a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ( 41 62 63 | 42 62 63 | 43 62 63 ) } + + condition: + #a == 1 and @a[0] == 4 and !a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testPipedMaskedHexPatterns(self): + """Look for several patterns at once with piped and masked definition.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 4? 6? ( ?3 | ?4 | ?5 ) } + + condition: + #a == 1 and @a[0] == 4 and !a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ( ?1 | ?2 | ?3 ) 6? 6? } + + condition: + #a == 1 and @a[0] == 4 and !a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { 4? ( ?1 | ?2 | ?3 ) 6? } + + condition: + #a == 1 and @a[0] == 4 and !a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testDuplicatedResultsFiltering(self): + """Filter duplicated results.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = { ( 4? ?2 ?3 | 4? 6? 6? | ?3 6? ?3 ) } + + condition: + #a == 1 and @a[0] == 4 and !a[0] == 3 + +} +''' + + self.check_rule_success(rule, content=cnt) diff --git a/tests/analysis/scan/scanning_str.py b/tests/analysis/scan/scanning_str.py new file mode 100644 index 0000000..75427a7 --- /dev/null +++ b/tests/analysis/scan/scanning_str.py @@ -0,0 +1,194 @@ + +from common import RostTestClass +from pychrysalide.analysis.contents import MemoryContent + + +class TestRostScanningStrings(RostTestClass): + """TestCases for the bytes section syntax (strings).""" + + def testSimpleStringPattern(self): + """Test a simple string pattern.""" + + cnt = MemoryContent(b'123-Abc-456') + + rule = ''' +rule test { + + bytes: + $a = "Abc" + + condition: + #a == 1 and @a[0] == 4 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testEscapedStringPattern(self): + """Test escaped string patterns.""" + + cnt = MemoryContent(b'\a\b\t\n\v\f\r' + bytes([ 0x1b ]) + b'"\\\xff') + + rule = r''' +rule test { + + bytes: + $a = "\a\b\t\n\v\f\r\e\"\\\xff" + + condition: + #a == 1 and @a[0] == 0 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'\a\b\t\n--123--\v\f\r' + bytes([ 0x1b ]) + b'"\\\xff') + + rule = r''' +rule test { + + bytes: + $a = "\a\b\t\n--123--\v\f\r\e\"\\\xff" + + condition: + #a == 1 and @a[0] == 0 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testStringModifiers(self): + """Check string modifiers output.""" + + cnt = MemoryContent(b'--414243--') + + rule = ''' +rule test { + + bytes: + $a = "ABC" hex + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'--ABC--') + + rule = ''' +rule test { + + bytes: + $a = "ABC" plain + + condition: + #a == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'--CBA--') + + rule = ''' +rule test { + + bytes: + $a = "ABC" rev + + condition: + #a == 1 + +} +''' + + + def testStringPatternFullword(self): + """Test a fullword string pattern.""" + + cnt = MemoryContent(b'ABCDEF 123 ') + + rule = ''' +rule test { + + bytes: + $a = "DEF" fullword + $b = "123" fullword + + condition: + #a == 0 and #b == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'DEF 123 ') + + rule = ''' +rule test { + + bytes: + $a = "DEF" fullword + $b = "123" fullword + + condition: + #a == 1 and #b == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + cnt = MemoryContent(b'\tDEF 123 ') + + rule = ''' +rule test { + + bytes: + $a = "DEF" fullword + $b = "123" fullword + + condition: + #a == 1 and #b == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) + + + def testStringPatternCase(self): + """Test a string pattern with case care.""" + + cnt = MemoryContent(b'abc123-Abc123Def456GHI...z0z1z2z3z4z5z6z7z8z9') + + rule = ''' +rule test { + + bytes: + $a = "Abc" nocase + $b = "ABC123DEF456GHI" nocase + $z = "z0z1z2z3z4z5z6z7z8z9" nocase + + condition: + #a == 2 and #b == 1 and #z == 1 + +} +''' + + self.check_rule_success(rule, content=cnt) diff --git a/tests/analysis/scan/sets.py b/tests/analysis/scan/sets.py new file mode 100644 index 0000000..1d10fbf --- /dev/null +++ b/tests/analysis/scan/sets.py @@ -0,0 +1,118 @@ + +from common import RostTestClass + + +class TestRostSets(RostTestClass): + """TestCases for sets support in ROST.""" + + def testSetsAsBooleans(self): + """Convert sets to boolean.""" + + rule = ''' +rule test { + + condition: + () + +} +''' + + self.check_rule_failure(rule) + + + rule = ''' +rule test { + + condition: + (1, ) + +} +''' + + self.check_rule_success(rule) + + + rule = ''' +rule test { + + condition: + ("aaa", true, 123) + +} +''' + + self.check_rule_success(rule) + + + def testStringAsArray(self): + """Handle strings as arrays.""" + + rule = ''' +rule test { + + condition: + count("aaa") + +} +''' + + self.check_rule_success(rule) + + + rule = ''' +rule test { + + condition: + count("aaa") == 3 + +} +''' + + self.check_rule_success(rule) + + + def testSetsIntersections(self): + """Perform sets intersections.""" + + rule = ''' +rule test { + + condition: + ("aaa", "bbb") in ("AAA", "BBB", "aaa") + +} +''' + + self.check_rule_success(rule) + + + rule = ''' +rule test { + + condition: + ("aaa", "bbb") in ("123", ) + +} +''' + + self.check_rule_failure(rule) + + + + + + + + + + + + + + # TODO : + + # test : intersection(a, a) == a + + # test : "123" in "0123456789" + # test : "123" in "012987" + diff --git a/tests/common/bitfield.py b/tests/common/bitfield.py index e014111..75dfb6e 100644 --- a/tests/common/bitfield.py +++ b/tests/common/bitfield.py @@ -19,6 +19,23 @@ class TestBitFields(ChrysalideTestCase): self.assertEqual(bf.popcount, bf2.popcount) + def testResizeBitField(self): + """Resize bitfields.""" + + bf_a = BitField(10, 0) + + bf_b = BitField(6, 0) + bf_b.resize(10) + + self.assertEqual(bf_a, bf_b) + + bf_a = BitField(133, 1) + + bf_b = BitField(64, 1) + bf_b.resize(133) + + self.assertEqual(bf_a, bf_b) + def testBitFieldValues(self): """Evaluate bitfields basic values.""" @@ -70,6 +87,39 @@ class TestBitFields(ChrysalideTestCase): self.assertEqual(bf_f.popcount, bf_a.popcount) + def testBitFieldLogicalOperationsAt(self): + """Perform logical operations on bitfields at a given position.""" + + bf_a = BitField(75, 0) + + bf_b = BitField(4, 1) + bf_b.reset(2, 1) + + bf_a.or_at(bf_b, 63) + + self.assertFalse(bf_a.test(62)) + + self.assertTrue(bf_a.test(63)) + self.assertTrue(bf_a.test(64)) + self.assertFalse(bf_a.test(65)) + self.assertTrue(bf_a.test(66)) + + self.assertFalse(bf_a.test(67)) + + bf_a = BitField(75, 0) + + bf_a.or_at(bf_b, 60) + + self.assertFalse(bf_a.test(59)) + + self.assertTrue(bf_a.test(60)) + self.assertTrue(bf_a.test(61)) + self.assertFalse(bf_a.test(62)) + self.assertTrue(bf_a.test(63)) + + self.assertFalse(bf_a.test(64)) + + def testBitFieldSwitch(self): """Switch various bits in bitfields.""" @@ -118,6 +168,47 @@ class TestBitFields(ChrysalideTestCase): self.assertTrue(bf.test_none(0, 54)) + def testBitFieldWithBitField(self): + """Test bits in bitfields against other bitfields.""" + + bf = BitField(32, 0) + bf.set(8, 16) + + mask = BitField(8, 1) + + self.assertTrue(bf.test_ones_with(8, mask)) + self.assertTrue(bf.test_ones_with(16, mask)) + self.assertFalse(bf.test_ones_with(17, mask)) + self.assertTrue(bf.test_zeros_with(24, mask)) + + bf = BitField(256, 0) + bf.set(60, 8) + bf.set(126, 10) + + mask = BitField(4, 1) + + self.assertTrue(bf.test_zeros_with(8, mask)) + self.assertTrue(bf.test_zeros_with(122, mask)) + + self.assertFalse(bf.test_zeros_with(58, mask)) + self.assertFalse(bf.test_ones_with(58, mask)) + self.assertTrue(bf.test_ones_with(60, mask)) + self.assertFalse(bf.test_zeros_with(63, mask)) + self.assertTrue(bf.test_ones_with(64, mask)) + self.assertFalse(bf.test_zeros_with(65, mask)) + self.assertFalse(bf.test_ones_with(65, mask)) + + self.assertFalse(bf.test_zeros_with(125, mask)) + self.assertFalse(bf.test_ones_with(125, mask)) + self.assertTrue(bf.test_ones_with(128, mask)) + self.assertFalse(bf.test_zeros_with(129, mask)) + self.assertTrue(bf.test_ones_with(132, mask)) + self.assertFalse(bf.test_zeros_with(133, mask)) + self.assertFalse(bf.test_ones_with(133, mask)) + + self.assertTrue(bf.test_zeros_with(136, mask)) + + def testPopCountForBitField(self): """Count bits set to 1 in bitfield.""" @@ -138,3 +229,86 @@ class TestBitFields(ChrysalideTestCase): bf_b = BitField(9, 1) self.assertNotEqual(bf_a, bf_b) + + + def testSearchOfSetBit(self): + """Find the next set bit in a bit field.""" + + size = 128 + bf = BitField(size, 0) + + bits = [ 0, 1, 50, 63, 64, 65, 111 ] + + for b in bits: + bf.set(b, 1) + + prev = None + found = [] + + while prev is None or prev < size: + + if prev is None: + f = bf.find_next_set() + else: + f = bf.find_next_set(prev) + + if f < size: + found.append(f) + + prev = f + + self.assertEqual(found, bits) + + + def testRealCase00(self): + """Test bits in bitfields against other bitfields in a real case (#02).""" + + bf = BitField(128, 0) + + for b in [ 0, 50, 54, 58, 66, 70, 98 ]: + bf.set(b, 1) + + mask = BitField(128, 0) + + for b in [ 0, 51 ]: + mask.set(b, 1) + + self.assertFalse(bf.test_zeros_with(0, mask)) + + self.assertTrue(bf.test_zeros_with(1, mask)) + + bf = BitField(32, 0) + + mask = BitField(32, 0) + + self.assertTrue(bf.test_zeros_with(0, mask)) + + for b in [ 0, 8, 9, 10 ]: + mask.set(b, 1) + + self.assertTrue(bf.test_zeros_with(0, mask)) + + bf = BitField(32, 1) + + self.assertFalse(bf.test_zeros_with(0, mask)) + + self.assertTrue(bf.test_ones_with(0, mask)) + + + def testRealCase01(self): + """Test bits in bitfields against other bitfields in a real case (#01).""" + + bf = BitField(128, 0) + + mask = BitField(128, 0) + + bits = [ 0, 50, 54, 58, 66, 70, 98 ] + + for b in bits: + mask.set(b, 1) + + bf.or_at(mask, 0) + + self.assertEqual(mask.popcount, len(bits)) + + self.assertEqual(mask.popcount, bf.popcount) diff --git a/tests/common/itoa.py b/tests/common/itoa.py new file mode 100644 index 0000000..a004cbd --- /dev/null +++ b/tests/common/itoa.py @@ -0,0 +1,28 @@ + +from chrysacase import ChrysalideTestCase +from pychrysalide.common import itoa + + +class TestItoa(ChrysalideTestCase): + """TestCase for calls to the itoa() implementation.""" + + def testItoaCallss(self): + """Convert some integer values into strings.""" + + val = itoa(123, 10) + self.assertEqual(val, '123') + + val = itoa(-123, 10) + self.assertEqual(val, '-123') + + val = itoa(0, 10) + self.assertEqual(val, '0') + + val = itoa(0, 2) + self.assertEqual(val, '0') + + val = itoa(127, 2) + self.assertEqual(val, '1111111') + + val = itoa(101, 2) + self.assertEqual(val, '1100101') diff --git a/tests/plugins/encodings/all.py b/tests/plugins/encodings/all.py new file mode 100644 index 0000000..a856ccb --- /dev/null +++ b/tests/plugins/encodings/all.py @@ -0,0 +1,23 @@ + +from chrysacase import ChrysalideTestCase +from pychrysalide.plugins import encodings + +import base64 + + +class TestEncodingsModule(ChrysalideTestCase): + """TestCase for encodings plugin.""" + + def testBase64Encoding(self): + """Validate the base64 implementation.""" + + text = '0123456789abcdef' + + for i in range(len(text) + 1): + + src = text[:i].encode('ascii') + + encoded = encodings.base64_encode(src) + ref = base64.b64encode(src) + + self.assertEqual(encoded, ref) diff --git a/tests/plugins/kaitai/__init__.py b/tests/plugins/kaitai/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/plugins/kaitai/__init__.py diff --git a/tests/plugins/kaitai/language.py b/tests/plugins/kaitai/language.py new file mode 100644 index 0000000..43b6185 --- /dev/null +++ b/tests/plugins/kaitai/language.py @@ -0,0 +1,2474 @@ +#!/usr/bin/python3-dbg +# -*- coding: utf-8 -*- + +import locale + +from chrysacase import ChrysalideTestCase +from pychrysalide.analysis.contents import MemoryContent +from pychrysalide.plugins.kaitai.parsers import KaitaiStruct + + +class TestKaitaiStruct(ChrysalideTestCase): + """TestCase for the KaitaiStruct parsing.""" + + + @classmethod + def setUpClass(cls): + + super(TestKaitaiStruct, cls).setUpClass() + + cls.log('Setting locale suitable for floats...') + + cls._old_locale = locale.getlocale(locale.LC_NUMERIC) + + locale.setlocale(locale.LC_NUMERIC, 'C') + + + @classmethod + def tearDownClass(cls): + + super(TestKaitaiStruct, cls).tearDownClass() + + cls.log('Reverting locale...') + + locale.setlocale(locale.LC_NUMERIC, cls._old_locale) + + + + ################################# + ### 4. Kaitai Struct language + ################################# + + + def testKaitaiFixedLength(self): + """Load fixed-size structures.""" + + # Cf. 4.1. Fixed-size structures + + definitions = ''' +meta: + id: mydesc + title: My Long Title + endian: be +seq: + - id: field0 + type: u4 +''' + + kstruct = KaitaiStruct(definitions) + + self.assertEqual(kstruct.meta.id, 'mydesc') + self.assertEqual(kstruct.meta.title, 'My Long Title') + + content = MemoryContent(b'\x01\x02\x03\x04') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.field0.range.length, 4) + self.assertEqual(parsed.field0.value, 0x01020304) + + definitions = ''' +meta: + endian: le +seq: + - id: field0 + type: u4 + - id: field1 + type: u4be +''' + + kstruct = KaitaiStruct(definitions) + + self.assertIsNone(kstruct.meta.id) + self.assertIsNone(kstruct.meta.title) + + content = MemoryContent(b'\x01\x02\x03\x04\x01\x02\x03\x04') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.field0.range.length, 4) + self.assertEqual(parsed.field0.value, 0x04030201) + + self.assertEqual(parsed.field1.range.length, 4) + self.assertEqual(parsed.field1.value, 0x01020304) + + + definitions = ''' +seq: + - id: field0 + type: u1 + - id: field1 + size: 2 + - id: field2 + size: field0 + 1 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x02\x03\x04\x05') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.field0.range.length, 1) + self.assertEqual(parsed.field0.value, 0x01) + + self.assertEqual(parsed.field1.range.length, 2) + self.assertEqual(parsed.field1.truncated_bytes, b'\x02\x03') + + self.assertEqual(parsed.field2.range.length, 2) + self.assertEqual(parsed.field2.truncated_bytes, b'\x04\x05') + + + def testDocstrings(self): + """Handle Kaitai documentation.""" + + # Cf. 4.2. Docstrings + + definitions = ''' +seq: + - id: rating + type: s4 + doc: Rating, can be negative +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x02\x02\x04') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.rating.creator.doc, 'Rating, can be negative') + + + def testKaitaiContents(self): + """Read various forms of fixed content.""" + + # Cf. 4.3. Checking for "magic" signatures + + definitions = ''' +seq: + - id: field0 + contents: [ 0, 0x10, '22', "50 ] +''' + + # ValueError: Unable to create Kaitai structure. + with self.assertRaisesRegex(ValueError, "Unable to create Kaitai structure"): + kstruct = KaitaiStruct(definitions) + self.assertIsNotNone(kstruct) + + + definitions = ''' +seq: + - id: field0 + contents: [ 0x41, 66, 'CD' ] +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'ABCD') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.field0.range.length, 4) + + self.assertEqual(parsed.field0.value, b'ABCD') + + + definitions = ''' +seq: + - id: field0 + contents: ABCD +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'ABCD') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.field0.range.length, 4) + + + definitions = ''' +seq: + - id: field0 + contents: "ABCD" +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'ABCD') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.field0.range.length, 4) + + + definitions = ''' +seq: + - id: field0 + contents: + - 0x41 + - "B" + - CD +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'ABCD') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.field0.range.length, 4) + + + def testVariableLengthStructures(self): + """Parse variable-length structures.""" + + # Cf. 4.4. Variable-length structures + + definitions = ''' +seq: + - id: my_len + type: u1 + - id: my_str + type: str + size: my_len +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x03ABC') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.my_len.value, 3) + + self.assertEqual(parsed.my_str.value, b'ABC') + + + definitions = ''' +seq: + - id: my_len + type: u1 + - id: my_str + type: str + size: my_len * 2 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x03ABCDEF') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.my_len.value, 3) + + self.assertEqual(parsed.my_str.value, b'ABCDEF') + + + definitions = ''' +seq: + - id: field0 + size-eos: true +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x02\x02\x03') + + parsed = kstruct.parse(content) + + self.assertEqual(content, parsed.content) + + self.assertEqual(parsed.range.addr.phys, 0) + self.assertEqual(parsed.range.length, len(content.data)) + + + def testDelimitedStructures(self): + """Parse delimited structures.""" + + # Cf. 4.5. Delimited structures + + definitions = ''' +seq: + - id: my_string + type: str + terminator: 0 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'ABC\x00DEF') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.my_string.value, b'ABC') + + + definitions = ''' +seq: + - id: my_string + type: strz +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'ABC\x00DEF') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.my_string.value, b'ABC') + + + definitions = ''' +seq: + - id: name + type: str + size: 8 + terminator: 0 + - id: guard + size: 1 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'ABC\x00\x00\x00\x00\x00x\x00') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.name.value, b'ABC') + + self.assertEqual(parsed.guard.value, b'x') + + + def __passed__testEnums(self): + """Parse delimited structures.""" + + # Cf. 4.6. Enums (named integer constants) + + pass + + + def testSubTypes(self): + """Includes subtypes definitions.""" + + # Cf. 4.7. Substructures (subtypes) + + definitions = ''' +seq: + - id: field0 + type: custom_type + - id: field1 + type: custom_type + - id: field2 + type: custom_type +types: + custom_type: + seq: + - id: len + type: u1 + - id: value + size: len +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\xaa\x02\xbb\xbb\x03\xcc\xcc\xcc') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.field0.len.value, 1) + self.assertEqual(parsed.field0.value.truncated_bytes, b'\xaa') + + self.assertEqual(parsed.field1.len.value, 2) + self.assertEqual(parsed.field1.value.truncated_bytes, b'\xbb\xbb') + + self.assertEqual(parsed.field2.len.value, 3) + self.assertEqual(parsed.field2.value.truncated_bytes, b'\xcc\xcc\xcc') + + + def testOtherAttributesAccess(self): + """Access attributes in other types.""" + + # Cf. 4.8. Accessing attributes in other types + + definitions = ''' +seq: + - id: header + type: main_header + - id: body + size: header.body_len +types: + main_header: + seq: + - id: magic + contents: FMT + - id: body_len + type: u1 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'FMT\x04\xaa\xbb\xcc\xdd') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.header.magic.raw_bytes, b'FMT') + self.assertEqual(parsed.header.magic.range.length, 3) + + self.assertEqual(parsed.header.body_len.value, 4) + + self.assertEqual(parsed.body.raw_bytes, b'\xaa\xbb\xcc\xdd') + self.assertEqual(parsed.body.range.length, 4) + + + def testConditionals(self): + """Read Kaitai values according to previous loaded values.""" + + # Cf. 4.9. Conditionals + + definitions = ''' +seq: + - id: field1 + type: u1 + - id: field2 + type: u1 + - id: field3 + type: u1 + if: field1 + field2 > 10 + - id: field4 + type: u1 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x02\x03\x04') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.field1.value, 0x01) + self.assertEqual(parsed.field2.value, 0x02) + self.assertFalse(hasattr(parsed, 'field3')) + self.assertEqual(parsed.field4.value, 0x03) + + + definitions = ''' +seq: + - id: field1 + type: u1 + - id: field2 + type: u1 + - id: field3 + type: u1 + if: field1 + field2 > 1 + - id: field4 + type: u1 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x02\x03\x04') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.field1.value, 0x01) + self.assertEqual(parsed.field2.value, 0x02) + self.assertTrue(hasattr(parsed, 'field3')) + self.assertEqual(parsed.field4.value, 0x04) + + + definitions = ''' +seq: + - id: field1 + type: u1 + - id: field2 + type: u1 + - id: field3 + type: u1 + if: field1 + field2 == threshold::three + - id: field4 + type: u1 +enums: + threshold: + 1: one + 2: two + 3: three +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x02\x03\x04') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.field1.value, 0x01) + self.assertEqual(parsed.field2.value, 0x02) + self.assertTrue(hasattr(parsed, 'field3')) + self.assertEqual(parsed.field4.value, 0x04) + + + def testRepeatedReadUntilEOS(self): + """Read items until the end of the stream.""" + + # Cf. 4.10.1. Repeat until end of stream + + definitions = ''' +seq: + - id: field0 + type: u2be + repeat: eos +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x00\x02\x00\x03\x00\x04\x00') + + parsed = kstruct.parse(content) + + self.assertEqual(len(parsed.field0), len(content.data) / 2) + + for i in range(4): + self.assertEqual(parsed.field0[i].value, (i + 1) << 8) + + + def testRepeatedReadAccordingToCounter(self): + """Repeat read of items for a nomber of times.""" + + # Cf. 4.10.2. Repeat for a number of times + + definitions = ''' +seq: + - id: field0 + type: u1 + - id: field1 + type: u1 + repeat: expr + repeat-expr: 1 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x01') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.field0.value, 0x01) + + self.assertEqual(len(parsed.field1), 1) + + for i in range(1): + self.assertEqual(parsed.field1[i].value, i + 1) + + definitions = ''' +seq: + - id: field0 + type: u1 + - id: field1 + type: u1 + - id: field2 + type: u2 + repeat: expr + repeat-expr: field0 + field1 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x02\x01\x00\x02\x00\x03\x00') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.field0.value, 0x01) + self.assertEqual(parsed.field1.value, 0x02) + + self.assertEqual(len(parsed.field2), 3) + + for i in range(3): + self.assertEqual(parsed.field2[i].value, i + 1) + + + def testRepeatUntilConditionIsMet(self): + """Repeat until condition is met.""" + + # Cf. 4.10.3. Repeat until condition is met + + definitions = ''' +seq: + - id: numbers + type: u1 + repeat: until + repeat-until: _ == 0xff + - id: extra + type: u1 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x02\xff\xcc') + + parsed = kstruct.parse(content) + + self.assertEqual(len(parsed.numbers), 3) + + for i in range(2): + self.assertEqual(parsed.numbers[i].value, i + 1) + + self.assertEqual(parsed.numbers[2].value, 0xff) + + self.assertEqual(parsed.extra.value, 0xcc) + + definitions = ''' +seq: + - id: records + type: buffer_with_len + repeat: until + repeat-until: _.len == 0 +types: + buffer_with_len: + seq: + - id: len + type: u1 + - id: value + size: len +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x02\xaa\xaa\x01\xbb\x00') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.records[0].len.value, 2) + self.assertEqual(parsed.records[0].value.raw_bytes, b'\xaa\xaa') + + self.assertEqual(parsed.records[1].len.value, 1) + self.assertEqual(parsed.records[1].value.raw_bytes, b'\xbb') + + self.assertEqual(parsed.records[2].len.value, 0) + self.assertEqual(parsed.records[2].value.raw_bytes, b'') + + + def testParseTLVImplementation(self): + """Parse a typical TLV implementation.""" + + # Cf. 4.11. Typical TLV implementation (switching types on an expression) + + definitions = ''' +seq: + - id: record + type: rec_def + repeat: eos +types: + rec_def: + seq: + - id: rec_type + type: u1 + - id: len + type: u1 + - id: body + size: len + type: + switch-on: rec_type + cases: + 1: rec_type_1 + 2: rec_type_2 + rec_type_1: + seq: + - id: field1 + type: u1 + rec_type_2: + seq: + - id: field2 + type: u2 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x01\xaa\x02\x02\xcc\xbb') + + parsed = kstruct.parse(content) + + self.assertEqual(len(parsed.record), 2) + + self.assertEqual(parsed.record[0].rec_type.value, 1) + self.assertEqual(parsed.record[0].len.value, 1) + self.assertEqual(parsed.record[0].body.field1.value, 0xaa) + + self.assertEqual(parsed.record[1].rec_type.value, 2) + self.assertEqual(parsed.record[1].len.value, 2) + self.assertEqual(parsed.record[1].body.field2.value, 0xbbcc) + + + def testInstanceWithDataBeyondTheSequence(self): + """Build instances with data beyond the sequence.""" + + # Cf. 4.12. Instances: data beyond the sequence + + definitions = ''' +instances: + some_integer: + pos: 0x4 + type: u1 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x02\x03\x04\x05\x06\x07\x08') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.some_integer.value, 5) + + + definitions = ''' +seq: + - id: file_offset + type: u1 + - id: file_size + type: u1 +instances: + body: + pos: file_offset + size: file_size +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x04\x02\x90\x90ABCD') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.file_offset.value, 4) + + self.assertEqual(parsed.file_size.value, 2) + + self.assertEqual(parsed.body.value, b'AB') + + + def testValueInstances(self): + """Build value instances""" + + # Cf. 4.13. Value instances + + definitions = ''' +seq: + - id: length + type: u1 + - id: extra + type: u1 +instances: + length_extended: + value: length * 3 + extra +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x02\x03\x04') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.length.value, 1) + + self.assertEqual(parsed.extra.value, 2) + + self.assertEqual(parsed.length_extended.value, 5) + + + def testBitSizedIntegers(self): + """Read bit-sized integers.""" + + # Cf. 4.14. Bit-sized integers + + definitions = ''' +seq: + - id: packed_1 + type: u1 +instances: + version: + value: (packed_1 & 0b11110000) >> 4 + len_header: + value: packed_1 & 0b00001111 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x9a') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.packed_1.value, 0x9a) + + self.assertEqual(parsed.version.value, 0x9) + + self.assertEqual(parsed.len_header.value, 0xa) + + + def __passed__testBitSizedIntegersBigEndian(self): + """Read bit-sized integers.""" + + # Cf. 4.14.1. Big-endian order + + pass + + + def __passed__testBitSizedIntegersLittleEndian(self): + """Read bit-sized integers.""" + + # Cf. 4.14.2. Little-endian order + + pass + + + def __passed__testBitSizedIntegersSpecifiedEndianness(self): + """Read bit-sized integers with specified bit endianness.""" + + # Cf. 4.14.3. Specifying bit endianness + + pass + + + + ################################# + ### 5. Streams and substreams + ################################# + + + def testTotalSizeLimit(self): + """Limit total size of structure.""" + + # Cf. 5.1. Limiting total size of structure + + definitions = ''' +seq: + - id: body_len + type: u1 + - id: random + size: 2 + - id: comment + size: body_len - 2 + - id: extra + type: u1 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x05\x01\x02---\xbb') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.body_len.value, 0x05) + + self.assertEqual(parsed.random.raw_bytes, b'\x01\x02') + + self.assertEqual(parsed.comment.raw_bytes, b'---') + + self.assertEqual(parsed.extra.raw_bytes, b'\xbb') + + + definitions = ''' +seq: + - id: body_len + type: u1 + - id: body + type: record_body + size: body_len + - id: extra + type: u1 +types: + record_body: + seq: + - id: random + size: 2 + - id: comment + size-eos: true +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x05\x01\x02---\xbb') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.body_len.value, 0x05) + + self.assertEqual(parsed.body.random.raw_bytes, b'\x01\x02') + + self.assertEqual(parsed.body.comment.raw_bytes, b'---') + + self.assertEqual(parsed.extra.raw_bytes, b'\xbb') + + + def testRepeatSizeLimit(self): + """Repeating until total size reaches limit.""" + + # Cf. 5.2. Repeating until total size reaches limit + + content = MemoryContent(b'\x03\x00\x01\x02\xbb') + + definitions = ''' +seq: + - id: total_len + type: u1 + - id: files + type: file_entries + size: total_len + - id: extra + type: u1 +types: + file_entries: + seq: + - id: entries + type: entry + repeat: eos + entry: + seq: + - id: index + type: u1 +''' + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.total_len.value, 3) + + self.assertEqual(len(parsed.files.entries), 3) + + for i in range(3): + self.assertEqual(parsed.files.entries[i].index.value, i) + + self.assertEqual(parsed.extra.value, 0xbb) + + + def testRelativePositioning(self): + """Parse with relative positioning.""" + + # Cf. 5.3. Relative positioning + + content = MemoryContent(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\0xe\x0f') + + definitions = ''' +seq: + - id: some_header + size: 4 + - id: body + type: block + size: 12 +types: + block: + seq: + - id: foo + type: u1 + instances: + some_bytes_in_the_middle: + pos: 4 + size: 4 +''' + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.some_header.value, b'\x00\x01\x02\x03') + self.assertEqual(parsed.body.foo.value, 0x04) + + self.assertEqual(parsed.body.some_bytes_in_the_middle.value, b'\x08\x09\x0a\x0b') + + + def testAbsolutePositioning(self): + """Read from absolute position.""" + + # Cf. 5.4. Absolute positioning + + content = MemoryContent(b'\x06\x03\x00\x00\x00\x00\x01\x02\x03\xbb') + + definitions = ''' +seq: + - id: items + size: 10 + type: entry + repeat: eos +types: + entry: + seq: + - id: ofs_body + type: u1 + - id: len_body + type: u1 + instances: + body: + io: _root._io + pos: ofs_body + size: len_body +''' + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.items[0].ofs_body.value, 6) + self.assertEqual(parsed.items[0].len_body.value, 3) + + self.assertEqual(parsed.items[0].body.value, b'\x01\x02\x03') + + + def testSubstreamChoice(self): + """Choose a substream.""" + + # Cf. 5.5. Choosing a substream + + content = MemoryContent(b'\xaa\xaa\xaa\xaa\x01\x02\x03\x04\x05\x06\x07\x08\x02\x03') + + definitions = ''' +seq: + - id: global_header + size: 4 + - id: block_one + type: big_container + size: 8 + - id: block_two + type: smaller_container + size: 2 +types: + big_container: + seq: + - id: some_header + size: 8 + smaller_container: + seq: + - id: ofs_in_big + type: u1 + - id: len_in_big + type: u1 + instances: + something_in_big: + io: _root.block_one._io + pos: ofs_in_big + size: len_in_big +''' + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.block_two.ofs_in_big.value, 2) + self.assertEqual(parsed.block_two.len_in_big.value, 3) + + self.assertEqual(parsed.block_two.something_in_big.value, b'\x03\x04\x05') + + + def __passed__testContentPreProcessing(self): + """Process content before parsing.""" + + # Cf. 5.6. Processing: dealing with compressed, obfuscated and encrypted data + + pass + + + + ############################## + ### 6. Expression language + ############################## + + + def testBasicDataTypes(self): + """Handle basic data types.""" + + # Cf. 6.1. Basic data types + + definitions = ''' +seq: + - id: field1 + type: u1 + - id: field2 + type: u2 + - id: field4 + type: u4 + - id: field8 + type: u8 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x02\x02\x04\x04\x04\x04\x08\x08\x08\x08\x08\x08\x08\x08') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.field1.range.length, 1) + self.assertEqual(parsed.field2.range.length, 2) + self.assertEqual(parsed.field4.range.length, 4) + self.assertEqual(parsed.field8.range.length, 8) + + definitions = ''' +seq: + - id: field1 + type: u1 + - id: field4 + type: u4le + - id: field4bis + type: u4be +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x02\x03\x04\x05\x02\x03\x04\x05') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.field1.value, 0x01) + self.assertEqual(parsed.field4.value, 0x05040302) + self.assertEqual(parsed.field4bis.value, 0x02030405) + + + definitions = ''' +instances: + number1: + value: 0xdead_cafe + number2: + value: 0xdead_cafe_dead_cafe + number3: + value: 12_345_678 + number4: + value: 0b10100011 + number5: + value: 0b1010_0011_1010_0011 +''' + + content = MemoryContent(b'') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.number1.value, 0xdeadcafe) + + self.assertEqual(parsed.number2.value, 0xdeadcafedeadcafe) + + self.assertEqual(parsed.number3.value, 12345678) + + self.assertEqual(parsed.number4.value, 0b10100011) + + self.assertEqual(parsed.number5.value, 0b1010001110100011) + + + definitions = ''' +seq: + - id: op0 + type: u1 +instances: + result: + value: 0xdeadcafe + op0 + result2: + value: 0XdeadCAFE + op0 +''' + + content = MemoryContent(b'\x00') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.result.value, 0xdeadcafe) + + self.assertEqual(parsed.result2.value, 0xdeadcafe) + + + definitions = ''' +instances: + bytes1: + value: [] + bytes2: + value: [ ] + bytes3: + value: [ 0x90 ] + bytes4: + value: [foo, 0, A, 0xa, 42] +''' + + content = MemoryContent(b'') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.bytes1.value, b'') + + self.assertEqual(parsed.bytes2.value, b'') + + self.assertEqual(parsed.bytes3.value, b'\x90') + + self.assertEqual(parsed.bytes4.value, b'\x66\x6f\x6f\x00\x41\x0a\x2a') + + + definitions = ''' +instances: + escaped: + value: '[ "\\a\\b\\t\\n\\v\\f", "\\0", 0, " \\r\\e\\\"\\123" ]' +''' + + content = MemoryContent(b'') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.escaped.value, b'\x07\x08\x09\x0a\x0b\x0c\x00\x00 \x0d\x1b\x22\x53') + + + definitions_0 = r''' +instances: + escaped: + value: "[ \"\\a\\b\\t\\n\\v\\f\", \"\\0\", 0, \"\\r\\e\\\"'\\123\" ]" +''' + + definitions_1 = r''' +instances: + escaped: + value: [ "\\a\\b\\t\\n\\v\\f", "\\0", 0, "\\r\\e\\\"'\\123" ] +''' + + definitions_2 = ''' +instances: + escaped: + value: [ "\\\\a\\\\b\\\\t\\\\n\\\\v\\\\f", "\\\\0", 0, "\\\\r\\\\e\\\\\\"'\\\\123" ] +''' + + for d in [ definitions_0, definitions_1, definitions_2 ]: + + content = MemoryContent(b'') + + kstruct = KaitaiStruct(d) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.escaped.value, b'\x07\x08\x09\x0a\x0b\x0c\x00\x00\x0d\x1b\x22\x27\x53') + + + def __passed__testUserDefinedTypes(self): + """Create user-defined types.""" + + # Cf. 6.2.1. User-defined types + + pass + + + def testArrays(self): + """Create various arrays.""" + + # Cf. 6.2.2. Arrays + + definitions = ''' +instances: + result_0: + value: "[]" + result_1: + value: "[CAFE, 0, BABE]" + result_2: + value: "[CAFE, 0, BABE] == 'CAFE' + [ 0x00 ] + 'BABE'" + result_3: + value: "[CAFE, 0, BABE] == [ 0x43, 0x41, 0x46, 0x45, 0x00, 0x42, 0x41, 0x42, 0x45 ]" + result_4: + value: "[foo, 0, A, 0xa, 42] == [ 0x66, 0x6f, 0x6f, 0x00, 0x41, 0x0a, 0x2a ]" + result_5: + value: "[1, 0x55, '▒,3', 3] == [ 0x01, 0x55, 0xe2, 0x96, 0x92, 0x2c, 0x33, 0x03 ]" +''' + + content = MemoryContent(b'') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.result_0.value, b'') + + self.assertEqual(parsed.result_1.value, b'CAFE\x00BABE') + + + definitions = ''' +seq: + - id: indexes + type: u1 + repeat: eos +instances: + table: + value: "[ [ 1, 2, 3 ], [ 4, 5, 6 ], [ 7, 8, 9 ] ]" + ref: + value: indexes + result_0: + value: table + result_1: + value: ref + result_2: + value: table[indexes[0]][indexes[1] - 1] + result_3: + value: table[indexes[0]][ref[1]] +''' + + content = MemoryContent(b'\x01\x02\x03\x04') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.result_0.value.value, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09') + + self.assertEqual(type(parsed.result_1).__name__, 'RecordDelayed') # result_1 + self.assertEqual(type(parsed.result_1.value).__name__, 'RecordDelayed') # result_1.ref + self.assertEqual(type(parsed.result_1.value.value).__name__, 'RecordList') # result_1.ref.table + + self.assertEqual(parsed.result_1.value.value[3].value, 0x04) + + self.assertEqual(parsed.result_2.value, 5) + + self.assertEqual(parsed.result_3.value, 6) + + + def testArithmeticOperators(self): + """Compute with arithmetic operators.""" + + # Cf. 6.3.1. Arithmetic operators + + definitions = ''' +seq: + - id: op0 + type: u1 + - id: op1 + type: u1 +instances: + result_0: + value: op0 + op1 * 3 + result_1: + value: (2 + op0) * op1 + result_2: + value: 7 * 2.0 + result_3: + value: 7 / 2.0 + result_4: + value: -5 % 3 + result_5: + value: 4 % 3 + result_6: + value: 6 - 3 - -4.0 +''' + + content = MemoryContent(b'\x02\x03\x04\x05') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.result_0.value, 11) + + self.assertEqual(parsed.result_1.value, 12) + + self.assertEqual(parsed.result_2.value, 14.0) + + self.assertEqual(parsed.result_3.value, 3.5) + + self.assertEqual(parsed.result_4.value, 1) + + self.assertEqual(parsed.result_5.value, 1) + + self.assertEqual(parsed.result_6.value, 7.0) + + + definitions = ''' +seq: + - id: base + size: 3 +instances: + result_0: + value: "'xXx ' + base + ' -- %< --'" +''' + + content = MemoryContent(b'ABC') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.result_0.value, b'xXx ABC -- %< --') + + + definitions = ''' +seq: + - id: nums + type: u1 + repeat: eos +instances: + computed: + value: nums[0] + nums[3] + computed2: + value: nums[0] * nums.size + nums[3] + computed3: + value: nums[0] * nums[nums.size - 1] +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x02\x03\x04') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.computed.value, 5) + + self.assertEqual(parsed.computed2.value, 8) + + self.assertEqual(parsed.computed3.value, 4) + + + def testRelationalOperators(self): + """Compute with relational operators.""" + + # Cf. 6.3.2. Relational operators + + definitions = ''' +seq: + - id: op0 + type: u1 + - id: op1 + type: u1 + - id: op2 + size: 3 +instances: + result0: + value: op0 == op1 + result1: + value: op0 != op1 + result2: + value: op2 == 'ABC' + result3: + value: op2 < 'ABCD' + result4: + value: (op0 + 1) >= op1 + result5: + value: "(op0 + 1) == 'ABC'.length" +''' + + content = MemoryContent(b'\x02\x03ABCD') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertFalse(parsed.result0.value) + + self.assertTrue(parsed.result1.value) + + self.assertTrue(parsed.result2.value) + + self.assertTrue(parsed.result3.value) + + self.assertTrue(parsed.result4.value) + + self.assertTrue(parsed.result5.value) + + + def testBitwiseOperators(self): + """Compute with bitwise operators.""" + + # Cf. 6.3.3. Bitwise operators + + definitions = ''' +seq: + - id: op0 + type: u1 + - id: op1 + type: u1 + - id: op2 + type: u1 +instances: + result_0: + value: op0 & op1 + result_1: + value: op1 << op0 >> 1 + result_2: + value: (op2 | 0x80) >> 1 +''' + + content = MemoryContent(b'\x02\x07\x01') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.result_0.value, 0x2) + + self.assertEqual(parsed.result_1.value, 14) + + self.assertEqual(parsed.result_2.value, 0x40) + + + def testLogicalOperators(self): + """Compute with logical boolean operators.""" + + # Cf. 6.3.4. Logical (boolean) operators + + definitions = ''' +seq: + - id: op0 + type: u1 + - id: op1 + type: u1 +instances: + result_0: + value: (op0 > 0) and not false + result_1: + value: op0 == 1 or op1 == 2 +''' + + content = MemoryContent(b'\x01\x02') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertTrue(parsed.result_0.value) + + self.assertTrue(parsed.result_1.value) + + + def testTernaryOperator(self): + """Offer challenges to the ternary operator.""" + + # Cf. 6.3.5. Ternary (if-then-else) operator + + definitions = ''' +seq: + - id: op0 + type: u1 + - id: op1 + type: u1 + - id: op2 + type: u1 +instances: + result_0: + value: 'op0 == 0x80 ? op1 + 1 : op1 * op2' + result_1: + value: 'op0 < 0x80 ? op1 + 1 : op1 * op2' + result_1: + value: 'op0 < 0x80 ? op1 + 1 : op1 * op2' + result_2: + value: '(op0 + 0x10) >= 0x90 ? true : 123' + result_3: + value: '(op0 + 0x10) >= 0x90 and false ? true : 123' +''' + + content = MemoryContent(b'\x80\x03\x04') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.result_0.value, 4) + + self.assertEqual(parsed.result_1.value, 12) + + self.assertTrue(parsed.result_2.value) + + self.assertEqual(parsed.result_3.value, 123) + + + def testIntegersMethods(self): + """Run methods from integers.""" + + # Cf. 6.4.1. Integers + + definitions = ''' +instances: + bytes1: + value: 123.to_s == "123" and -123.to_s == '-123' +''' + + content = MemoryContent(b'') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertTrue(parsed.bytes1.value) + + + def testFloatsMethods(self): + """Run methods from floating numbers.""" + + # Cf. 6.4.2. Floating point numbers + + definitions = ''' +instances: + result_0: + value: 2.32.to_i == 2 and -7.0.to_i == -7 +''' + + content = MemoryContent(b'') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertTrue(parsed.result_0.value) + + + def XXXtestByteArraysAndStringsMethods(self): + """Run methods from byte arrays and strings.""" + + # Cf. 6.4.3. Byte arrays + # 6.4.4. Strings + + definitions = ''' +instances: + result_1: + value: '[].length == 0' + result_2: + value: "'edcba'.reverse == 'XXabcdeXX'.substring(2, 6)" + result_3: + value: "'123'.to_i == 123 and '-123'.to_i == -123" + result_4: + value: "[ 0x50, 0x72, 0x65, 0x73, 0x65, 0x6e, 0x74, 0x61, 0x63, 0x69, 0xc3, 0xb3, 0x6e, 0x2e, 0x73, 0x78, 0x69 ].to_s('utf-8')" + result_5: + value: "'1010'.to_i(2) == 10 and 'cc'.to_i(16) == 204" +''' + + content = MemoryContent(b'') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertTrue(parsed.result_1.value) + + self.assertTrue(parsed.result_2.value) + + self.assertTrue(parsed.result_3.value) + + # Cf. https://docs.gtk.org/glib/character-set.html + # https://developer-old.gnome.org/glib/stable/glib-Character-Set-Conversion.html#g-convert + self.assertEqual(parsed.result_4.value.decode('utf-8'), 'Presentación.sxi') + + self.assertTrue(parsed.result_5.value) + + + def __passed__testEnumsMethods(self): + """Run methods from booleans.""" + + # Cf. 6.4.5. Enums + + pass + + + def testBooleansMethods(self): + """Run methods from booleans.""" + + # Cf. 6.4.6. Booleans + + definitions = ''' +instances: + result_0: + value: true.to_i == 1 + result_1: + value: (1 == 2).to_i == 0 +''' + + content = MemoryContent(b'') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertTrue(parsed.result_0.value) + + self.assertTrue(parsed.result_1.value) + + + def testUserDefinedTypes(self): + """Retrieve user-defined types.""" + + # Cf. 6.4.7. User-defined types + + definitions = ''' +instances: + result_0: + value: _root +''' + + content = MemoryContent(b'') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.result_0.value, parsed) + + + def __passed__testArraysMethods(self): + """Run methods from arrays.""" + + # Cf. 6.4.8. Array types + + pass + + + def __passed__testStreamsMethods(self): + """Run methods from streams.""" + + # Cf. 6.4.9. Streams + + pass + + + + ############################## + ### 7. Advanced techniques + ############################## + + + def testSwitchOverStrings(self): + """Switch over strings.""" + + # Cf. 7.1.1. Switching over strings + + definitions = ''' +seq: + - id: rec_type + type: strz + - id: body + type: + switch-on: rec_type + cases: + '"KETCHUP"': rec_type_1 + '"MUSTARD"': rec_type_2 + '"GUACAMOLE"': rec_type_3 +types: + rec_type_1: + instances: + direct: + value: 1 + rec_type_2: + instances: + direct: + value: 2 + rec_type_3: + instances: + direct: + value: 3 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'GUACAMOLE\x00') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.rec_type.value, b'GUACAMOLE') + + self.assertEqual(parsed.body.direct.value, 3) + + + def testSwitchOverEnums(self): + """Switch over enumerations.""" + + # Cf. 7.1.2. Switching over enums + + definitions = ''' +seq: + - id: rec_type + type: u1 + enum: media + - id: body + type: + switch-on: rec_type + cases: + 'media::cdrom': rec_type_1 + 'media::dvdrom': rec_type_2 + 'media::cassette': rec_type_3 +types: + rec_type_1: + instances: + direct: + value: 1 + rec_type_2: + instances: + direct: + value: 2 + rec_type_3: + instances: + direct: + value: 3 +enums: + media: + 1: cdrom + 2: dvdrom + 3: cassette +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.rec_type.value, 1) + + self.assertEqual(parsed.body.direct.value, 1) + + + def testFourCC(self): + """Recognize four character code.""" + + # Cf. 7.1.3. FourCC + + definitions = ''' +seq: + - id: fourcc + type: u4le + enum: pixel_formats + - id: len + type: u1 + - id: body + size: len + type: + switch-on: fourcc + cases: + 'pixel_formats::rgb2': block_rgb2 + 'pixel_formats::rle4': block_rle4 + 'pixel_formats::rle8': block_rle8 +types: + block_rgb2: + instances: + direct: + value: 2 + block_rle4: + instances: + direct: + value: 4 + block_rle8: + instances: + direct: + value: 8 +enums: + pixel_formats: + 0x32424752: rgb2 + 0x34454C52: rle4 + 0x38454C52: rle8 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'RLE4\x05ABCDE') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.fourcc.value, 0x34454C52) + + self.assertEqual(parsed.len.value, 0x5) + + self.assertEqual(parsed.body.direct.value, 4) + + + def testNothing(self): + """Do nothing.""" + + # Cf. 7.2. Do nothing + + definitions = ''' +seq: + - id: field_0 + size: 1 + - id: field_1 + type: dummy_1 + - id: field_2 + type: dummy_2 + - id: field_3 + type: dummy_3 + - id: field_4 + type: dummy_4 + - id: field_5 + size: 1 +types: + # One can use empty JSON object syntax to avoid specifying any of + # `seq`, `instances`, etc, sections. + dummy_1: {} + # One can use explicit doc to note that there's nothing there. + dummy_2: + doc: This type is intentionally left blank. + # One can use empty `seq` or `instances` or `types` section, any + # other empty sections, or any combination of thereof. + dummy_3: + seq: [] + instances: {} + types: {} + # One can use a very explicit notion of the fact that we want to parse 0 bytes. + dummy_4: + seq: + - id: no_value + size: 0 +''' + + content = MemoryContent(b'az') + + kstruct = KaitaiStruct(definitions) + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.field_0.value, b'a') + + self.assertEqual(type(parsed.field_1).__name__, 'RecordEmpty') + self.assertEqual(parsed.field_1.range.length, 0) + + self.assertEqual(type(parsed.field_2).__name__, 'RecordEmpty') + self.assertEqual(parsed.field_2.range.length, 0) + + self.assertEqual(type(parsed.field_3).__name__, 'RecordEmpty') + self.assertEqual(parsed.field_3.range.length, 0) + + self.assertEqual(type(parsed.field_4.no_value).__name__, 'RecordEmpty') + self.assertEqual(parsed.field_4.no_value.range.length, 0) + + self.assertEqual(parsed.field_5.value, b'z') + + + def testConsumeIncludeTerminators(self): + """Consume and/or include terminators.""" + + # Cf. 7.3.1. Terminator: consume or include? + + definitions = ''' +seq: + - id: str1 + type: str + terminator: 0x2e # `.` + - id: str2 + type: str + terminator: 0x2e # `.` +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'foo.bar.') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.str1.value, b'foo') + + self.assertEqual(parsed.str2.value, b'bar') + + + definitions = ''' +seq: + - id: str1 + type: str + terminator: 0x2e # `.` + include: true + - id: str2 + type: str + terminator: 0x2e # `.` + eos-error: false +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'foo.bar') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.str1.value, b'foo.') + + self.assertEqual(parsed.str2.value, b'bar') + + + definitions = ''' +seq: + - id: str1 + type: str + terminator: 0x2e # `.` + consume: false + - id: the_rest + type: str + size-eos: true +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'foo.bar.') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.str1.value, b'foo') + + self.assertEqual(parsed.the_rest.value, b'.bar.') + + + definitions = ''' +seq: + - id: str1 + type: str + terminator: . + - id: the_rest + type: str + size-eos: true +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'foo.bar.') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.str1.value, b'foo') + + self.assertEqual(parsed.the_rest.value, b'bar.') + + + definitions = ''' +seq: + - id: str1 + type: str + terminator: xxx. + - id: the_rest + type: str + size-eos: true +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'fooxxx.bar.') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.str1.value, b'foo') + + self.assertEqual(parsed.the_rest.value, b'bar.') + + + def testIgnoreErrorsInDelimitedStructures(self): + """Ignore errors in delimited structures.""" + + # Cf. 7.3.2. Ignoring errors in delimited structures + + definitions = ''' +seq: + - id: my_string + type: str + terminator: 0 + eos-error: false +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x61\x62\x63\x00\x64\x65\x66') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.my_string.value, b'abc') + + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x61\x62\x63\x00') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.my_string.value, b'abc') + + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x61\x62\x63') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.my_string.value, b'abc') + + + def __passed__testImportTypesFromOtherFiles(self): + """Import types from other files.""" + + # Cf. 7.4. Importing types from other files + + pass + + + def __passed__testPlugExternalCodeForOpaqueTypes(self): + """Plug external code for opaque types.""" + + # Cf. 7.5. Opaque types: plugging in external code + + pass + + + def __passed__testCustomProcessingRoutines(self): + """Handle custom processing routines.""" + + # Cf. 7.6. Custom processing routines + + pass + + + def __passed__testParentTypeEnforcing(self): + """Enforce parent type.""" + + # Cf. 7.7. Enforcing parent type + + pass + + + def testTypecasting(self): + """Ensure there is no need for typecasting.""" + + # Cf. 7.8. Typecasting + + definitions = ''' +seq: + - id: num_sections + type: u1 + - id: sections + type: section + repeat: expr + repeat-expr: num_sections +types: + section: + seq: + - id: sect_type + type: u1 + - id: body + type: + switch-on: sect_type + cases: + 1: sect_header + 2: sect_color_data + sect_header: + seq: + - id: width + type: u1 + - id: height + type: u1 + sect_color_data: + seq: + - id: rgb + size: 3 +instances: + check_0: + value: sections[0].body.width * sections[0].body.height + check_1: + value: sections[1].body.rgb + check_2: + value: sections[2].body.width * sections[2].body.height + check_3: + value: sections[3].body.rgb +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x04\x01\x02\x04\x02ABC\x01\x03\x05\x02UVW') + + parsed = kstruct.parse(content) + + # Vérifications externes + + self.assertEqual(parsed.num_sections.value, 4) + + self.assertEqual(len(parsed.sections), 4) + + self.assertEqual(parsed.sections[0].body.width.value + parsed.sections[0].body.height.value, 6) + + self.assertEqual(parsed.sections[1].body.rgb.value, b'ABC') + + self.assertEqual(parsed.sections[2].body.width.value + parsed.sections[2].body.height.value, 8) + + self.assertEqual(parsed.sections[3].body.rgb.value, b'UVW') + + # Vérifications internes + + self.assertEqual(parsed.check_0.value, 8) + + self.assertEqual(parsed.check_1.value.value, b'ABC') + + self.assertEqual(parsed.check_2.value, 15) + + self.assertEqual(parsed.check_3.value.value, b'UVW') + + + + ########################## + ### 8. Common pitfalls + ########################## + + + def testReadTypeWithSubstream(self): + """Read user-type with substream.""" + + # Cf. 8.1. Specifying size creates a substream + + definitions = ''' +seq: + - id: header + size: 4 + - id: block + type: block + size: 4 # <= important size designation, creates a substream +instances: + byte_3: + pos: 3 + type: u1 +types: + block: + instances: + byte_3: + pos: 3 + type: u1 + byte_3_alt: + io: _root._io # <= thanks to this, always points to a byte in main stream + pos: 3 + type: u1 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x00\x01\x02\x03\x04\x05\x06\x07') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.header.value, b'\x00\x01\x02\x03') + + self.assertEqual(parsed.byte_3.value, 0x03) + + self.assertEqual(parsed.block.byte_3.value, 0x07) + + self.assertEqual(parsed.block.byte_3_alt.value, 0x03) + + + definitions = ''' +seq: + - id: header + size: 4 + - id: block + type: block +instances: + byte_3: + pos: 3 + type: u1 +types: + block: + instances: + byte_3: + pos: 3 + type: u1 + byte_3_alt: + io: _root._io # <= thanks to this, always points to a byte in main stream + pos: 3 + type: u1 +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x00\x01\x02\x03\x04\x05\x06\x07') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.header.value, b'\x00\x01\x02\x03') + + self.assertEqual(parsed.byte_3.value, 0x03) + + self.assertEqual(parsed.block.byte_3.value, 0x03) + + self.assertEqual(parsed.block.byte_3_alt.value, 0x03) + + + def testReadTypeWithoutSubstream(self): + """Read user-type without substream.""" + + # Cf. 8.2. Not specifying size does not create a substream + + definitions = ''' +seq: + - id: header + size: 2 + - id: block_as_type1 + type: type1 + size: 2 # <= important, creates a substream +types: + type1: + seq: + - id: val1 + size: 2 + type2: + seq: + - id: val2 + size: 2 +instances: + block_as_type2: + io: block_as_type1._io + pos: 0 + type: type2 + internal_check: + value: block_as_type2._io == _root._io +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'aabb') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.header.value, b'aa') + + self.assertEqual(parsed.block_as_type1.val1.value, b'bb') + + self.assertEqual(parsed.block_as_type2.val2.value, b'bb') + + self.assertFalse(parsed.internal_check.value) + + + definitions = ''' +seq: + - id: header + size: 2 + - id: block_as_type1 + type: type1 +types: + type1: + seq: + - id: val1 + size: 2 + type2: + seq: + - id: val2 + size: 2 +instances: + block_as_type2: + io: block_as_type1._io + pos: 0 + type: type2 + internal_check: + value: block_as_type2._io == _root._io +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'aabb') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.header.value, b'aa') + + self.assertEqual(parsed.block_as_type1.val1.value, b'bb') + + self.assertEqual(parsed.block_as_type2.val2.value, b'aa') + + self.assertTrue(parsed.internal_check.value) + + + def __passed__testSizedProcess(self): + """Provide a sized data to processing.""" + + # Cf. 8.3. Applying process without a size + + pass + + + def __passed__testRelatedKeys(self): + """Check refering keys and their related YAML nodes.""" + + # Cf. 8.4. Keys relating to the whole array and to each element in repeated attributes + + pass + + + + ####################### + ### x. Extra checks + ####################### + + + def testMssingField(self): + """Raise error on missing field.""" + + definitions = ''' +seq: + - id: field0 + size-eos: true +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\x01\x02\x02\x03') + + parsed = kstruct.parse(content) + self.assertIsNotNone(parsed) + + self.assertEqual(parsed.field0.creator.raw_id, 'field0') + + self.assertEqual(parsed.field0.value, b'\x01\x02\x02\x03') + + # AttributeError: 'pychrysalide.plugins.kaitai.records.RecordList' object has no attribute 'xxxx' + with self.assertRaisesRegex(AttributeError, "object has no attribute 'xxxx'"): + print(parsed.xxxx) + + + def testLEB128Values(self): + """Read some Little Endian Base 128 values.""" + + definitions = ''' +seq: + - id: groups + type: group + repeat: until + repeat-until: not _.has_next +types: + group: + -webide-representation: '{value}' + doc: | + One byte group, clearly divided into 7-bit "value" chunk and 1-bit "continuation" flag. + seq: + - id: b + type: u1 + instances: + has_next: + value: (b & 0b1000_0000) != 0 + doc: If true, then we have more bytes to read + value: + value: b & 0b0111_1111 + doc: The 7-bit (base128) numeric value chunk of this group +instances: + len: + value: groups.size + value: + value: >- + groups[0].value + + (len >= 2 ? (groups[1].value << 7) : 0) + + (len >= 3 ? (groups[2].value << 14) : 0) + + (len >= 4 ? (groups[3].value << 21) : 0) + + (len >= 5 ? (groups[4].value << 28) : 0) + + (len >= 6 ? (groups[5].value << 35) : 0) + + (len >= 7 ? (groups[6].value << 42) : 0) + + (len >= 8 ? (groups[7].value << 49) : 0) + doc: Resulting unsigned value as normal integer + sign_bit: + value: '1 << (7 * len - 1)' + value_signed: + value: '(value ^ sign_bit) - sign_bit' + doc-ref: https://graphics.stanford.edu/~seander/bithacks.html#VariableSignExtend +''' + + kstruct = KaitaiStruct(definitions) + + content = MemoryContent(b'\xe5\x8e\x26') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.len.value, 3) + + self.assertEqual(parsed.value.value, parsed.value_signed.value) + + self.assertEqual(parsed.value.value, 624485) + + + content = MemoryContent(b'\xc0\xbb\x78') + + parsed = kstruct.parse(content) + + self.assertEqual(parsed.len.value, 3) + + self.assertNotEqual(parsed.value.value, parsed.value_signed.value) + + self.assertEqual(parsed.value_signed.value, -123456) diff --git a/tests/plugins/kaitai/rost.py b/tests/plugins/kaitai/rost.py new file mode 100644 index 0000000..4a29ef8 --- /dev/null +++ b/tests/plugins/kaitai/rost.py @@ -0,0 +1,170 @@ +#!/usr/bin/python3-dbg +# -*- coding: utf-8 -*- + +import locale + +from chrysacase import ChrysalideTestCase +from pychrysalide.analysis.contents import MemoryContent +from pychrysalide.analysis.scan import ContentScanner +from pychrysalide.analysis.scan import ScanOptions +from pychrysalide.analysis.scan.patterns.backends import AcismBackend +from pychrysalide import core +from pychrysalide.plugins.kaitai.parsers import KaitaiStruct +from pychrysalide.plugins.kaitai.rost import KaitaiTrigger + + +class TestScansWithKaitai(ChrysalideTestCase): + """TestCase for ROST scan with the KaitaiStruct parsing.""" + + @classmethod + def setUpClass(cls): + + super(TestScansWithKaitai, cls).setUpClass() + + cls._options = ScanOptions() + cls._options.backend_for_data = AcismBackend + + + def testSimpleKaitaiDefinitionForScanning(self): + """Rely on basic Kaitai simple definition for scanning.""" + + definitions = ''' +meta: + id: basic_test +seq: + - id: field0 + type: u1 +''' + + kstruct = KaitaiStruct(definitions) + + trigger = KaitaiTrigger(kstruct) + + root_ns = core.get_rost_root_namespace() + + ns = root_ns.resolve('kaitai') + ns.register_item(trigger) + + ns = ns.resolve('basic_test') + self.assertEqual(ns, trigger) + + cnt = MemoryContent(b'\x01\x02\x03') + + rule = ''' +rule testing { + + condition: + kaitai.basic_test.field0 == 1 + +} +''' + + scanner = ContentScanner(rule) + ctx = scanner.analyze(self._options, cnt) + + self.assertIsNotNone(ctx) + + self.assertFalse(ctx.has_match_for_rule('no_such_rule')) + + self.assertTrue(ctx.has_match_for_rule('testing')) + + + definitions = ''' +meta: + id: other_basic_test +seq: + - id: field0 + type: u1 + - id: field1 + type: u1 +''' + + kstruct = KaitaiStruct(definitions) + + trigger = KaitaiTrigger(kstruct) + + root_ns = core.get_rost_root_namespace() + + ns = root_ns.resolve('kaitai') + ns.register_item(trigger) + + ns = ns.resolve('other_basic_test') + self.assertEqual(ns, trigger) + + cnt = MemoryContent(b'\x01\x02\x03') + + rule = ''' +rule testing { + + condition: + kaitai.other_basic_test.field0 == 1 and kaitai.other_basic_test.field1 == 2 + +} +''' + + scanner = ContentScanner(rule) + ctx = scanner.analyze(self._options, cnt) + + self.assertIsNotNone(ctx) + + self.assertTrue(ctx.has_match_for_rule('testing')) + + + rule = ''' +rule testing { + + condition: + kaitai.other_basic_test.field0 == 1 and kaitai.other_basic_testXXXX.field1 == 2 + +} +''' + + scanner = ContentScanner(rule) + ctx = scanner.analyze(self._options, cnt) + + self.assertIsNotNone(ctx) + + self.assertFalse(ctx.has_match_for_rule('testing')) + + + def testKaitaiDefinitionWithListForScanning(self): + """Access list items from Kaitai definition when scanning with ROST.""" + + definitions = ''' +meta: + id: test_with_list +seq: + - id: field0 + type: u1 + repeat: eos +''' + + kstruct = KaitaiStruct(definitions) + + trigger = KaitaiTrigger(kstruct) + + root_ns = core.get_rost_root_namespace() + + ns = root_ns.resolve('kaitai') + ns.register_item(trigger) + + ns = ns.resolve('test_with_list') + self.assertEqual(ns, trigger) + + cnt = MemoryContent(b'\x01\x02\x03') + + rule = ''' +rule testing { + + condition: + kaitai.test_with_list.field0[0] == 1 and kaitai.test_with_list.field0[1] == 2 and kaitai.test_with_list.field0[2] == 3 + +} +''' + + scanner = ContentScanner(rule) + ctx = scanner.analyze(self._options, cnt) + + self.assertIsNotNone(ctx) + + self.assertTrue(ctx.has_match_for_rule('testing')) diff --git a/tests/plugins/yaml.py b/tests/plugins/yaml.py new file mode 100644 index 0000000..4d2680c --- /dev/null +++ b/tests/plugins/yaml.py @@ -0,0 +1,175 @@ +#!/usr/bin/python3-dbg +# -*- coding: utf-8 -*- + + +from chrysacase import ChrysalideTestCase +from pychrysalide.plugins import yaml + + +class TestYamlSupport(ChrysalideTestCase): + """TestCase for the YAML support.""" + + + def testParseSimpleYamlContent(self): + """Parse basic YAML content.""" + + definitions = ''' +a: av +b: bv +c: cv +''' + + root = yaml.parse_from_text(definitions) + + self.assertFalse(root.is_sequence) + + self.assertEqual(root.nodes[0].key, 'a') + self.assertEqual(root.nodes[1].key, 'b') + self.assertEqual(root.nodes[2].key, 'c') + + self.assertEqual(root.nodes[0].value, 'av') + self.assertEqual(root.nodes[1].value, 'bv') + self.assertEqual(root.nodes[2].value, 'cv') + + definitions = ''' +- a: av +- b: bv +- c: cv +''' + + root = yaml.parse_from_text(definitions) + + self.assertTrue(root.is_sequence) + + self.assertEqual(root.nodes[0].nodes[0].key, 'a') + self.assertEqual(root.nodes[1].nodes[0].key, 'b') + self.assertEqual(root.nodes[2].nodes[0].key, 'c') + + self.assertEqual(root.nodes[0].nodes[0].value, 'av') + self.assertEqual(root.nodes[1].nodes[0].value, 'bv') + self.assertEqual(root.nodes[2].nodes[0].value, 'cv') + + + def testSearchYamlNodes(self): + """Search YAML nodes related to paths.""" + + definitions = ''' +root: + a: v0 + b: v1 + c: v2 + sub: + aa: v00 + bb: v01 + cc: + - i: w + - j: x + - k: c + d: v3 +''' + + root = yaml.parse_from_text(definitions) + + found = root.find_first_by_path('/root/a') + + self.assertEqual(found.value, 'v0') + + found = root.find_first_by_path('/root/sub') + + self.assertEqual(found.value, None) + + found = root.find_first_by_path('/root/sub/cc') + + self.assertEqual(found.value, None) + + found = root.find_first_by_path('/root/sub/cc/j') + + self.assertEqual(found.value, 'x') + + found = root.find_first_by_path('/root/d') + + self.assertEqual(found.value, 'v3') + + + definitions = ''' +root: + - a: av + aa: aav + ab: abv + - b: bv + ba: bav + bb: bbv +''' + + root = yaml.parse_from_text(definitions) + + found = root.find_first_by_path('/root/ba') + + self.assertEqual(found.value, 'bav') + + found = root.find_first_by_path('/root/b') + + self.assertEqual(found.value, 'bv') + + found = root.find_first_by_path('/root/') + + self.assertTrue(found.is_sequence) + self.assertFalse(found.nodes[0].is_sequence) + self.assertEqual(found.nodes[0].nodes[0].value, 'av') + + + def testComplexYamlContent(self): + """Process more complex YAML content.""" + + definitions = ''' +root: + a: 'v0' + b: 'v1 ? 1 : 2' + c: v2 # final comment + d: "'xx::xx'" +''' + + root = yaml.parse_from_text(definitions) + + found = root.find_first_by_path('/root/a') + + self.assertEqual(found.value, 'v0') + + found = root.find_first_by_path('/root/b') + + self.assertEqual(found.value, 'v1 ? 1 : 2') + + found = root.find_first_by_path('/root/c') + + self.assertEqual(found.value, 'v2') + + found = root.find_first_by_path('/root/d') + + self.assertEqual(found.value, "'xx::xx'") + + + def testArrayAsSeq(self): + """Handle array as YAML block sequence.""" + + definitions = ''' +root: + a: [ a, 'b', 0xcc, "\td\n\\"'" ] +''' + + root = yaml.parse_from_text(definitions) + + found = root.find_first_by_path('/root/a') + + self.assertIsNone(found.value) + + self.assertEqual(len(found.children.nodes), 4) + + self.assertEqual(found.children.nodes[0].key, 'a') + + self.assertEqual(found.children.nodes[1].key, 'b') + + self.assertEqual(found.children.nodes[2].key, '0xcc') + + self.assertEqual(found.children.nodes[3].key, "\td \"'") + + self.assertEqual(found.aggregate_value(), '[ a, \'b\', 0xcc, " d \"\'" ]') diff --git a/tests/plugins/yamlrdr.py b/tests/plugins/yamlrdr.py deleted file mode 100644 index 47f02ba..0000000 --- a/tests/plugins/yamlrdr.py +++ /dev/null @@ -1,277 +0,0 @@ -#!/usr/bin/python3-dbg -# -*- coding: utf-8 -*- - - -from chrysacase import ChrysalideTestCase -from pychrysalide.plugins.yaml import YamlReader -import tempfile - - -class TestYamlReader(ChrysalideTestCase): - """TestCase for the Yaml reader.""" - - - @classmethod - def setUpClass(cls): - - super(TestYamlReader, cls).setUpClass() - - cls._simple_map = tempfile.NamedTemporaryFile() - - cls._simple_map_data = b''' -a: av -b: bv -c: cv - -''' - - cls._simple_seq = tempfile.NamedTemporaryFile() - - cls._simple_seq_data = b''' -- a: av -- b: bv -- c: cv - -''' - - cls._nested = tempfile.NamedTemporaryFile() - - cls._nested_data = b''' -root: - a: v0 - b: v1 - c: v2 - sub: - aa: v00 - bb: v01 - cc: v02 - - i: w - - j: x - - k: c - d: v3 - -''' - - cls._mixed = tempfile.NamedTemporaryFile() - - cls._mixed_data = b''' -root: - - a: av - aa: aav - ab: abv - - b: bv - ba: bav - bb: bbv - -''' - - tmp = [ - [ cls._simple_map, cls._simple_map_data ], - [ cls._simple_seq, cls._simple_seq_data ], - [ cls._nested, cls._nested_data ], - [ cls._mixed, cls._mixed_data ], - ] - - for f, d in tmp: - - f.write(d) - f.flush() - - cls.log('Using temporary file "%s"' % f.name) - - - @classmethod - def tearDownClass(cls): - - super(TestYamlReader, cls).tearDownClass() - - tmp = [ - cls._simple_map, - cls._simple_seq, - cls._nested, - cls._mixed, - ] - - for f in tmp: - - cls.log('Delete file "%s"' % f.name) - - f.close() - - - def testSimpleYamlContent(self): - """Validate Yaml content readers.""" - - def _build_node_desc(node, left, extra = ''): - - if hasattr(node, 'key'): - - line = node.yaml_line - - prefix = '- ' if line.is_list_item else extra - desc = left + prefix + line.key + ':' + (' ' + line.value if line.value else '') + '\n' - indent = ' ' - - collec = node.collection - - else: - - desc = '' - indent = '' - - if hasattr(node, 'nodes'): - collec = node - - if collec: - - if collec.is_sequence: - extra = ' ' - - for child in collec.nodes: - desc += _build_node_desc(child, left + indent, extra) - - return desc - - - reader = YamlReader.new_from_path(self._simple_map.name) - self.assertIsNotNone(reader) - self.assertIsNotNone(reader.tree) - - fulldesc = _build_node_desc(reader.tree.root, '') - - self.assertEqual('\n' + fulldesc + '\n', self._simple_map_data.decode('ascii')) - - reader = YamlReader.new_from_path(self._simple_seq.name) - self.assertIsNotNone(reader) - self.assertIsNotNone(reader.tree) - - fulldesc = _build_node_desc(reader.tree.root, '') - - self.assertEqual('\n' + fulldesc + '\n', self._simple_seq_data.decode('ascii')) - - reader = YamlReader.new_from_path(self._nested.name) - self.assertIsNotNone(reader) - self.assertIsNotNone(reader.tree) - - fulldesc = _build_node_desc(reader.tree.root, '') - - self.assertEqual('\n' + fulldesc + '\n', self._nested_data.decode('ascii')) - - reader = YamlReader.new_from_path(self._mixed.name) - self.assertIsNotNone(reader) - self.assertIsNotNone(reader.tree) - - fulldesc = _build_node_desc(reader.tree.root, '') - - self.assertEqual('\n' + fulldesc + '\n', self._mixed_data.decode('ascii')) - - - def testSimpleYamlContentFinder(self): - """Validate Yaml nested content search.""" - - reader = YamlReader.new_from_path(self._nested.name) - self.assertIsNotNone(reader) - - found = reader.tree.find_by_path('/root/sub') - - self.assertEqual(len(found), 1) - - if len(found) == 1: - self.assertEqual(found[0].key, 'sub') - - found = reader.tree.find_by_path('/root/sub/') - - self.assertEqual(len(found), 3) - - found = reader.tree.find_by_path('/root/sub/xx') - - self.assertEqual(len(found), 0) - - found = reader.tree.find_by_path('/root/sub/cc/i') - - self.assertEqual(len(found), 1) - - if len(found) == 1: - self.assertEqual(found[0].key, 'i') - self.assertEqual(found[0].yaml_line.is_list_item, True) - - found = reader.tree.find_by_path('/root/sub/cc') - - self.assertEqual(len(found), 1) - - if len(found) == 1: - - root = found[0] - - found = root.find_by_path('cc/i') - - self.assertEqual(len(found), 1) - - if len(found) == 1: - - self.assertEqual(found[0].key, 'i') - self.assertEqual(found[0].yaml_line.is_list_item, True) - - found = root.find_by_path('/cc/i') - - self.assertEqual(len(found), 1) - - if len(found) == 1: - - self.assertEqual(found[0].key, 'i') - self.assertEqual(found[0].yaml_line.is_list_item, True) - - found = root.find_by_path('//i') - - self.assertEqual(len(found), 1) - - if len(found) == 1: - - self.assertEqual(found[0].key, 'i') - self.assertEqual(found[0].yaml_line.is_list_item, True) - - - def testMixedYamlContentFinder(self): - """Validate Yaml mixed content search.""" - - reader = YamlReader.new_from_path(self._mixed.name) - self.assertIsNotNone(reader) - - found = reader.tree.find_by_path('/root') - - self.assertEqual(len(found), 1) - - if len(found) == 1: - self.assertEqual(found[0].key, 'root') - - found = reader.tree.find_by_path('/root/', True) - - self.assertEqual(len(found), 1) - - found = reader.tree.find_one_by_path('/root/', True) - - self.assertIsNotNone(found) - - if found: - - sub = found.find_one_by_path('/a') - self.assertIsNotNone(sub) - self.assertEqual(sub.key, 'a') - - sub = found.find_one_by_path('/aa') - self.assertIsNotNone(sub) - self.assertEqual(sub.key, 'aa') - - found = reader.tree.find_by_path('/root/') - - self.assertEqual(len(found), 2) - - if len(found) == 2: - - sub = found[0].find_one_by_path('/a') - self.assertIsNotNone(sub) - self.assertEqual(sub.key, 'a') - - sub = found[0].find_one_by_path('/aa') - self.assertIsNotNone(sub) - self.assertEqual(sub.key, 'aa') |