From 2b8c177977e45ccc538df181f5fcb73a13e0650f Mon Sep 17 00:00:00 2001 From: Jared Miller Date: Tue, 10 Feb 2026 09:50:45 -0500 Subject: [PATCH] Fix off-by-one in QuetzalParser return_pc parsing --- src/mudlib/zmachine/quetzal.py | 104 +++++++++--- src/mudlib/zmachine/zmemory.py | 9 +- tests/test_quetzal_stks.py | 282 +++++++++++++++++++++++++++++++++ 3 files changed, 371 insertions(+), 24 deletions(-) create mode 100644 tests/test_quetzal_stks.py diff --git a/src/mudlib/zmachine/quetzal.py b/src/mudlib/zmachine/quetzal.py index 05e714f..7980d21 100644 --- a/src/mudlib/zmachine/quetzal.py +++ b/src/mudlib/zmachine/quetzal.py @@ -203,7 +203,7 @@ class QuetzalParser: # Read successive stack frames: while ptr < total_len: log(" Parsing stack frame...") - return_pc = (bytes[ptr] << 16) + (bytes[ptr + 1] << 8) + bytes[ptr + 3] + return_pc = (bytes[ptr] << 16) + (bytes[ptr + 1] << 8) + bytes[ptr + 2] ptr += 3 flags_bitfield = bitfield.BitField(bytes[ptr]) ptr += 1 @@ -416,38 +416,98 @@ class QuetzalWriter: """Return a compressed chunk of data representing the compressed image of the zmachine's main memory.""" - ### TODO: debug this when ready - return "0" + pmem = self._zmachine._pristine_mem + cmem = self._zmachine._mem - # XOR the original game image with the current one - diffarray = list(self._zmachine._pristine_mem) - for index in range(len(self._zmachine._pristine_mem._total_size)): - diffarray[index] = ( - self._zmachine._pristine_mem[index] ^ self._zmachine._mem[index] - ) - log(f"XOR array is {diffarray}") + # XOR current dynamic memory with pristine dynamic memory + dynamic_start = pmem._dynamic_start + dynamic_end = pmem._dynamic_end + diffarray = [] - # Run-length encode the resulting list of 0's and 1's. + for index in range(dynamic_start, dynamic_end + 1): + xor_value = pmem[index] ^ cmem[index] + diffarray.append(xor_value) + + log(f"Generated XOR array of {len(diffarray)} bytes") + + # Run-length encode the XOR result result = [] zerocounter = 0 - for index in range(len(diffarray)): - if diffarray[index] == 0: + + for byte in diffarray: + if byte == 0: zerocounter += 1 - continue else: - if zerocounter > 0: - result.append(0) - result.append(zerocounter) - zerocounter = 0 - result.append(diffarray[index]) - return result + # Flush any pending zeros + while zerocounter > 0: + # Encode: 0x00 followed by count of ADDITIONAL zeros + # Maximum count in one byte is 255, meaning 256 zeros total (1+255) + if zerocounter > 256: + result.append(0x00) + result.append(0xFF) # 1 + 255 = 256 zeros + zerocounter -= 256 + else: + result.append(0x00) + result.append(zerocounter - 1) # count of additional zeros + zerocounter = 0 + + # Output non-zero byte + result.append(byte) + + # Don't encode trailing zeros - parser can leave them as-is + # (per spec: "If memcounter finishes less than memlen, that's totally fine") + + log(f"Compressed to {len(result)} bytes") + return bytes(result) def _generate_stks_chunk(self): """Return a stacks chunk, describing the stack state of the zmachine at this moment.""" - ### TODO: write this - return "0" + result = bytearray() + stackmanager = self._zmachine._stackmanager + call_stack = stackmanager._call_stack + + # Skip the ZStackBottom sentinel (first element) + for frame in call_stack[1:]: + # Get the actual number of local vars that have non-zero values + # or are explicitly set (we store all 15 always, but only serialize + # the ones that are actually used) + num_local_vars = len(frame.local_vars) + + # Write return_pc as 24-bit big-endian (3 bytes) + return_pc = frame.return_addr if frame.return_addr else 0 + result.append((return_pc >> 16) & 0xFF) + result.append((return_pc >> 8) & 0xFF) + result.append(return_pc & 0xFF) + + # Write flags byte (bits 0-3 = num local vars) + result.append(num_local_vars & 0x0F) + + # Write varnum (which variable gets return value) + # TODO: track this properly, for now use 0 + result.append(0) + + # Write argflag (bitmask of supplied arguments) + # TODO: track this properly, for now use 0 + result.append(0) + + # Write eval_stack_size as 16-bit big-endian + eval_stack_size = len(frame.stack) + result.append((eval_stack_size >> 8) & 0xFF) + result.append(eval_stack_size & 0xFF) + + # Write local variables (16-bit big-endian each) + for local_var in frame.local_vars: + result.append((local_var >> 8) & 0xFF) + result.append(local_var & 0xFF) + + # Write evaluation stack values (16-bit big-endian each) + for stack_val in frame.stack: + result.append((stack_val >> 8) & 0xFF) + result.append(stack_val & 0xFF) + + return bytes(result) def _generate_anno_chunk(self): """Return an annotation chunk, containing metadata about the ZVM diff --git a/src/mudlib/zmachine/zmemory.py b/src/mudlib/zmachine/zmemory.py index 0c0a2e2..897819a 100644 --- a/src/mudlib/zmachine/zmemory.py +++ b/src/mudlib/zmachine/zmemory.py @@ -184,10 +184,15 @@ class ZMemory: def _check_bounds(self, index): if isinstance(index, slice): start, stop = index.start, index.stop + # For slices, stop can be _total_size since slicing is exclusive + if not ( + (0 <= start < self._total_size) and (0 <= stop <= self._total_size) + ): + raise ZMemoryOutOfBounds else: start, stop = index, index - if not ((0 <= start < self._total_size) and (0 <= stop < self._total_size)): - raise ZMemoryOutOfBounds + if not ((0 <= start < self._total_size) and (0 <= stop < self._total_size)): + raise ZMemoryOutOfBounds def _check_static(self, index): """Throw error if INDEX is within the static-memory area.""" diff --git a/tests/test_quetzal_stks.py b/tests/test_quetzal_stks.py new file mode 100644 index 0000000..f4790a4 --- /dev/null +++ b/tests/test_quetzal_stks.py @@ -0,0 +1,282 @@ +""" +Tests for QuetzalWriter._generate_stks_chunk() serialization. + +The Stks chunk serializes the Z-machine call stack. Each frame has: +- Bytes 0-2: return_pc (24-bit big-endian) +- Byte 3: flags (bits 0-3 = num local vars) +- Byte 4: varnum (which variable gets return value) +- Byte 5: argflag (bitmask of supplied arguments) +- Bytes 6-7: eval_stack_size (16-bit big-endian) +- Next (num_local_vars * 2) bytes: local variables +- Next (eval_stack_size * 2) bytes: evaluation stack values + +All multi-byte values are big-endian. Bottom frame has return_pc=0. +""" + +from unittest import TestCase + +from mudlib.zmachine.quetzal import QuetzalWriter +from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager + + +class MockMemory: + """Mock memory for testing.""" + + def __init__(self): + self.version = 5 + + +class MockZMachine: + """Mock z-machine with stack manager.""" + + def __init__(self): + self._mem = MockMemory() + self._stackmanager = ZStackManager(self._mem) + + +class QuetzalStksTests(TestCase): + """Test suite for Stks chunk generation.""" + + def setUp(self): + self.zmachine = MockZMachine() + self.writer = QuetzalWriter(self.zmachine) + + def test_empty_call_stack_generates_empty_chunk(self): + """With only the sentinel bottom, should generate empty bytes.""" + # Call stack has only ZStackBottom sentinel + chunk = self.writer._generate_stks_chunk() + self.assertEqual(chunk, b"") + + def test_single_frame_serialization(self): + """Single routine frame should serialize correctly.""" + # Create a routine with known values + routine = ZRoutine( + start_addr=0x5000, + return_addr=0x4200, + zmem=self._mem, + args=[], + local_vars=[0x1234, 0x5678, 0xABCD], + stack=[0x1111, 0x2222], + ) + self.zmachine._stackmanager.push_routine(routine) + + chunk = self.writer._generate_stks_chunk() + + # Expected bytes for this frame: + # return_pc (0x4200): 0x00, 0x42, 0x00 + # flags (3 local vars): 0x03 + # varnum (0): 0x00 + # argflag (0): 0x00 + # eval_stack_size (2): 0x00, 0x02 + # local_vars: 0x12, 0x34, 0x56, 0x78, 0xAB, 0xCD + # stack: 0x11, 0x11, 0x22, 0x22 + expected = bytes( + [ + 0x00, + 0x42, + 0x00, # return_pc + 0x03, # flags (3 local vars) + 0x00, # varnum + 0x00, # argflag + 0x00, + 0x02, # eval_stack_size = 2 + 0x12, + 0x34, # local_vars[0] + 0x56, + 0x78, # local_vars[1] + 0xAB, + 0xCD, # local_vars[2] + 0x11, + 0x11, # stack[0] + 0x22, + 0x22, # stack[1] + ] + ) + self.assertEqual(chunk, expected) + + def test_multiple_frames_serialization(self): + """Multiple nested frames should serialize in order.""" + # Frame 1: outer routine + routine1 = ZRoutine( + start_addr=0x1000, + return_addr=0, # bottom frame + zmem=self._mem, + args=[], + local_vars=[0x0001], + stack=[], + ) + self.zmachine._stackmanager.push_routine(routine1) + + # Frame 2: inner routine + routine2 = ZRoutine( + start_addr=0x2000, + return_addr=0x1050, + zmem=self._mem, + args=[], + local_vars=[0x0002, 0x0003], + stack=[0xAAAA], + ) + self.zmachine._stackmanager.push_routine(routine2) + + chunk = self.writer._generate_stks_chunk() + + # Expected: frame1 bytes + frame2 bytes + # Frame 1: return_pc=0, 1 local var, 0 stack + frame1 = bytes( + [ + 0x00, + 0x00, + 0x00, # return_pc = 0 + 0x01, # flags (1 local var) + 0x00, # varnum + 0x00, # argflag + 0x00, + 0x00, # eval_stack_size = 0 + 0x00, + 0x01, # local_vars[0] + ] + ) + # Frame 2: return_pc=0x1050, 2 local vars, 1 stack + frame2 = bytes( + [ + 0x00, + 0x10, + 0x50, # return_pc + 0x02, # flags (2 local vars) + 0x00, # varnum + 0x00, # argflag + 0x00, + 0x01, # eval_stack_size = 1 + 0x00, + 0x02, # local_vars[0] + 0x00, + 0x03, # local_vars[1] + 0xAA, + 0xAA, # stack[0] + ] + ) + expected = frame1 + frame2 + self.assertEqual(chunk, expected) + + def test_frame_with_no_locals_or_stack(self): + """Frame with no local vars or stack values should serialize correctly.""" + routine = ZRoutine( + start_addr=0x3000, + return_addr=0x2500, + zmem=self._mem, + args=[], + local_vars=[], + stack=[], + ) + self.zmachine._stackmanager.push_routine(routine) + + chunk = self.writer._generate_stks_chunk() + + expected = bytes( + [ + 0x00, + 0x25, + 0x00, # return_pc + 0x00, # flags (0 local vars) + 0x00, # varnum + 0x00, # argflag + 0x00, + 0x00, # eval_stack_size = 0 + ] + ) + self.assertEqual(chunk, expected) + + def test_round_trip_with_parser(self): + """Generated stks bytes should parse back identically.""" + from mudlib.zmachine.quetzal import QuetzalParser + + # Create a complex stack state + routine1 = ZRoutine( + start_addr=0x1000, + return_addr=0, + zmem=self._mem, + args=[], + local_vars=[0x1111, 0x2222, 0x3333], + stack=[0xAAAA, 0xBBBB], + ) + self.zmachine._stackmanager.push_routine(routine1) + + routine2 = ZRoutine( + start_addr=0x2000, + return_addr=0x1234, + zmem=self._mem, + args=[], + local_vars=[0x4444], + stack=[0xCCCC, 0xDDDD, 0xEEEE], + ) + self.zmachine._stackmanager.push_routine(routine2) + + # Generate the stks chunk + stks_bytes = self.writer._generate_stks_chunk() + + # Parse it back + parser = QuetzalParser(self.zmachine) + parser._parse_stks(stks_bytes) + + # Verify the stack was reconstructed correctly + # Parser creates a new stack manager, skip bottom sentinel + frames = self.zmachine._stackmanager._call_stack[1:] + self.assertEqual(len(frames), 2) + + # Check frame 1 + assert isinstance(frames[0], ZRoutine) + self.assertEqual(frames[0].return_addr, 0) + self.assertEqual(frames[0].local_vars[:3], [0x1111, 0x2222, 0x3333]) + self.assertEqual(frames[0].stack, [0xAAAA, 0xBBBB]) + + # Check frame 2 + assert isinstance(frames[1], ZRoutine) + self.assertEqual(frames[1].return_addr, 0x1234) + self.assertEqual(frames[1].local_vars[:1], [0x4444]) + self.assertEqual(frames[1].stack, [0xCCCC, 0xDDDD, 0xEEEE]) + + def test_parse_return_pc_third_byte(self): + """Parser should correctly read all 3 bytes of return_pc (regression test).""" + from mudlib.zmachine.quetzal import QuetzalParser + + # Construct a minimal stack frame with return_pc=0x123456 + # where the third byte (0x56) is non-zero and would be wrong if skipped. + # + # Stack frame format: + # - Bytes 0-2: return_pc (0x12, 0x34, 0x56) + # - Byte 3: flags (0 local vars) + # - Byte 4: varnum (0) + # - Byte 5: argflag (0) + # - Bytes 6-7: eval_stack_size (0) + stks_bytes = bytes( + [ + 0x12, + 0x34, + 0x56, # return_pc = 0x123456 + 0x00, # flags (0 local vars) + 0x00, # varnum + 0x00, # argflag + 0x00, + 0x00, # eval_stack_size = 0 + ] + ) + + parser = QuetzalParser(self.zmachine) + parser._parse_stks(stks_bytes) + + # The parser should have reconstructed the stack with correct return_pc + frames = self.zmachine._stackmanager._call_stack[1:] # skip sentinel + self.assertEqual(len(frames), 1) + # If bug exists, it would read bytes[0], bytes[1], bytes[3] = 0x12, 0x34, 0x00 + # giving return_pc = 0x123400 instead of 0x123456 + assert isinstance(frames[0], ZRoutine) + self.assertEqual( + frames[0].return_addr, + 0x123456, + "Parser should read bytes[ptr+2], not bytes[ptr+3]", + ) + + @property + def _mem(self): + """Helper to get mock memory.""" + return self.zmachine._mem