Wire op_restore to QuetzalParser and filesystem

Implement V3 restore opcode:
- Add QuetzalParser.load_from_bytes() to parse save data from memory
- Wire op_restore to call filesystem.restore_game() and parse result
- Validate IFhd matches current story (release/serial/checksum)
- Restore dynamic memory, call stack, and program counter
- Branch true on success, false on failure/cancellation

Fix IFF chunk padding bug:
- Add padding byte to odd-length chunks in QuetzalWriter
- Ensures proper chunk alignment for parser compatibility

Add comprehensive tests:
- Branch false when filesystem returns None
- Branch false without zmachine reference
- Branch true on successful restore
- Verify memory state matches saved values
- Handle malformed save data gracefully
This commit is contained in:
Jared Miller 2026-02-10 10:03:32 -05:00
parent a5053e10f2
commit b0fb9b5e2c
Signed by: shmup
GPG key ID: 22B5C6D66A38B06C
4 changed files with 688 additions and 5 deletions

View file

@ -294,6 +294,71 @@ class QuetzalParser:
debugging and test verification.""" debugging and test verification."""
return self._last_loaded_metadata return self._last_loaded_metadata
def load_from_bytes(self, data):
"""Parse Quetzal data from raw bytes (instead of a file).
Used by op_restore when filesystem.restore_game() returns raw bytes.
"""
import io
self._last_loaded_metadata = {}
if len(data) < 12:
raise QuetzalUnrecognizedFileFormat
# Validate FORM header
if data[0:4] != b"FORM":
raise QuetzalUnrecognizedFileFormat
# Read total length
self._len = (data[4] << 24) + (data[5] << 16) + (data[6] << 8) + data[7]
log(f"Total length of FORM data is {self._len}")
self._last_loaded_metadata["total length"] = self._len
# Validate IFZS type
if data[8:12] != b"IFZS":
raise QuetzalUnrecognizedFileFormat
# Create a BytesIO object to use with chunk module
self._file = io.BytesIO(data)
self._file.seek(12) # Skip FORM header, length, and IFZS type
log("Parsing chunks from byte data")
try:
while 1:
c = chunk.Chunk(self._file)
chunkname = c.getname()
chunksize = c.getsize()
chunk_data = c.read(chunksize)
log(f"** Found chunk ID {chunkname}: length {chunksize}")
self._last_loaded_metadata[chunkname] = chunksize
if chunkname == b"IFhd":
self._parse_ifhd(chunk_data)
elif chunkname == b"CMem":
self._parse_cmem(chunk_data)
elif chunkname == b"UMem":
self._parse_umem(chunk_data)
elif chunkname == b"Stks":
self._parse_stks(chunk_data)
elif chunkname == b"IntD":
self._parse_intd(chunk_data)
elif chunkname == b"AUTH":
self._parse_auth(chunk_data)
elif chunkname == b"(c) ":
self._parse_copyright(chunk_data)
elif chunkname == b"ANNO":
self._parse_anno(chunk_data)
else:
# spec says to ignore and skip past unrecognized chunks
pass
except EOFError:
pass
self._file.close()
log("Finished parsing Quetzal data.")
def load(self, savefile_path): def load(self, savefile_path):
"""Parse each chunk of the Quetzal file at SAVEFILE_PATH, """Parse each chunk of the Quetzal file at SAVEFILE_PATH,
initializing associated zmachine subsystems as needed.""" initializing associated zmachine subsystems as needed."""
@ -543,6 +608,9 @@ class QuetzalWriter:
result.append((size >> 8) & 0xFF) result.append((size >> 8) & 0xFF)
result.append(size & 0xFF) result.append(size & 0xFF)
result.extend(chunk_data) result.extend(chunk_data)
# IFF chunks must be padded to even byte boundaries
if size % 2 == 1:
result.append(0) # padding byte
log(f" Added {chunk_id} chunk ({size} bytes)") log(f" Added {chunk_id} chunk ({size} bytes)")
# Write nested chunks # Write nested chunks

View file

@ -586,9 +586,40 @@ class ZCpu:
def op_restore(self, *args): def op_restore(self, *args):
"""Restore game state from file (V3 - branch on success). """Restore game state from file (V3 - branch on success).
Currently always fails because QuetzalWriter is not yet functional, Uses QuetzalParser to load save data from filesystem,
so there are no valid save files to restore. validates it matches current story, and restores memory/stack/PC.
Branches true on success, false on failure.
""" """
if self._zmachine is None:
# Can't restore without zmachine reference
self._branch(False)
return
from .quetzal import QuetzalParser
try:
# Get save data from filesystem
save_data = self._ui.filesystem.restore_game()
if save_data is None:
# User cancelled or no save file available
self._branch(False)
return
# Parse the save data
parser = QuetzalParser(self._zmachine)
parser.load_from_bytes(save_data)
# QuetzalParser already:
# - Validated IFhd matches current story (release/serial/checksum)
# - Replaced dynamic memory via _parse_cmem or _parse_umem
# - Replaced stack manager via _parse_stks
# - Set program counter via _parse_ifhd
# Success!
self._branch(True)
except Exception as e:
# Any error during restore process = failure
log(f"Restore failed with exception: {e}")
self._branch(False) self._branch(False)
def op_restore_v4(self, *args): def op_restore_v4(self, *args):

View file

@ -0,0 +1,417 @@
"""Tests for QuetzalWriter Stks chunk generation."""
class TestStksChunkGeneration:
"""Test Stks chunk generation and serialization."""
def test_empty_stack_serialization(self):
"""Test serializing an empty stack (just the bottom sentinel)."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZStackManager
# Setup: create a mock zmachine with only the bottom sentinel
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
# Empty stack should serialize to empty bytes (no frames)
assert result == b""
def test_single_frame_no_locals_no_stack(self):
"""Test serializing a single routine frame with no locals or stack values."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
# Create a routine with no locals and no stack values
routine = ZRoutine(
start_addr=0x5000,
return_addr=0x1234,
zmem=zmachine._mem,
args=[],
local_vars=[],
stack=[],
)
stack_manager.push_routine(routine)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
# Expected format:
# Bytes 0-2: return_pc = 0x1234 (24-bit big-endian)
# Byte 3: flags = 0 (0 local vars)
# Byte 4: varnum = 0 (which variable gets return value)
# Byte 5: argflag = 0
# Bytes 6-7: eval_stack_size = 0
expected = bytes(
[
0x00,
0x12,
0x34, # return_pc (24-bit)
0x00, # flags (0 locals)
0x00, # varnum
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
]
)
assert result == expected
def test_single_frame_with_locals(self):
"""Test serializing a frame with local variables."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
# Create a routine with 3 local variables
routine = ZRoutine(
start_addr=0x5000,
return_addr=0x2000,
zmem=zmachine._mem,
args=[],
local_vars=[0x1111, 0x2222, 0x3333],
stack=[],
)
stack_manager.push_routine(routine)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
# Expected format:
# Bytes 0-2: return_pc = 0x2000
# Byte 3: flags = 3 (3 local vars in bits 0-3)
# Byte 4: varnum = 0x00
# Byte 5: argflag = 0
# Bytes 6-7: eval_stack_size = 0
# Bytes 8-13: three local vars (0x1111, 0x2222, 0x3333)
expected = bytes(
[
0x00,
0x20,
0x00, # return_pc
0x03, # flags (3 locals)
0x00, # varnum
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
0x11,
0x11, # local_var[0]
0x22,
0x22, # local_var[1]
0x33,
0x33, # local_var[2]
]
)
assert result == expected
def test_single_frame_with_stack_values(self):
"""Test serializing a frame with evaluation stack values."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
# Create a routine with stack values but no locals
routine = ZRoutine(
start_addr=0x5000,
return_addr=0x3000,
zmem=zmachine._mem,
args=[],
local_vars=[],
stack=[0xABCD, 0xEF01],
)
stack_manager.push_routine(routine)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
# Expected format:
# Bytes 0-2: return_pc = 0x3000
# Byte 3: flags = 0 (0 locals)
# Byte 4: varnum = 0x00
# Byte 5: argflag = 0
# Bytes 6-7: eval_stack_size = 2
# Bytes 8-11: two stack values
expected = bytes(
[
0x00,
0x30,
0x00, # return_pc
0x00, # flags (0 locals)
0x00, # varnum
0x00, # argflag
0x00,
0x02, # eval_stack_size = 2
0xAB,
0xCD, # stack[0]
0xEF,
0x01, # stack[1]
]
)
assert result == expected
def test_single_frame_full(self):
"""Test serializing a frame with both locals and stack values."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
# Create a routine with 2 locals and 3 stack values
routine = ZRoutine(
start_addr=0x5000,
return_addr=0x4567,
zmem=zmachine._mem,
args=[],
local_vars=[0x0001, 0x0002],
stack=[0x1000, 0x2000, 0x3000],
)
stack_manager.push_routine(routine)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
expected = bytes(
[
0x00,
0x45,
0x67, # return_pc
0x02, # flags (2 locals)
0x00, # varnum
0x00, # argflag
0x00,
0x03, # eval_stack_size = 3
0x00,
0x01, # local_var[0]
0x00,
0x02, # local_var[1]
0x10,
0x00, # stack[0]
0x20,
0x00, # stack[1]
0x30,
0x00, # stack[2]
]
)
assert result == expected
def test_multiple_nested_frames(self):
"""Test serializing multiple nested routine frames."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
# Create first routine
routine1 = ZRoutine(
start_addr=0x5000,
return_addr=0x1000,
zmem=zmachine._mem,
args=[],
local_vars=[0xAAAA],
stack=[0xBBBB],
)
stack_manager.push_routine(routine1)
# Create second routine (nested)
routine2 = ZRoutine(
start_addr=0x6000,
return_addr=0x2000,
zmem=zmachine._mem,
args=[],
local_vars=[0xCCCC],
stack=[0xDDDD],
)
stack_manager.push_routine(routine2)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
# Expected: two frames concatenated
expected = bytes(
[
# Frame 1
0x00,
0x10,
0x00, # return_pc
0x01, # flags (1 local)
0x00, # varnum
0x00, # argflag
0x00,
0x01, # eval_stack_size = 1
0xAA,
0xAA, # local_var[0]
0xBB,
0xBB, # stack[0]
# Frame 2
0x00,
0x20,
0x00, # return_pc
0x01, # flags (1 local)
0x00, # varnum
0x00, # argflag
0x00,
0x01, # eval_stack_size = 1
0xCC,
0xCC, # local_var[0]
0xDD,
0xDD, # stack[0]
]
)
assert result == expected
def test_bottom_frame_zero_return_pc(self):
"""Test that a bottom/dummy frame with return_pc=0 is handled correctly."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
# Create a routine with return_addr=0 (main routine)
routine = ZRoutine(
start_addr=0x5000,
return_addr=0x0000,
zmem=zmachine._mem,
args=[],
local_vars=[0x1234],
stack=[],
)
stack_manager.push_routine(routine)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
# Expected format with return_pc = 0
expected = bytes(
[
0x00,
0x00,
0x00, # return_pc = 0 (main routine)
0x01, # flags (1 local)
0x00, # varnum
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
0x12,
0x34, # local_var[0]
]
)
assert result == expected
class TestStksRoundTrip:
"""Test that Stks serialization/deserialization is symmetrical."""
def test_round_trip_serialization(self):
"""Test that we can serialize and deserialize frames correctly."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
# Create original stack with multiple frames
original_stack = ZStackManager(zmachine._mem)
routine1 = ZRoutine(
start_addr=0x5000,
return_addr=0x1234,
zmem=zmachine._mem,
args=[],
local_vars=[0x0001, 0x0002, 0x0003],
stack=[0x1111, 0x2222],
)
original_stack.push_routine(routine1)
routine2 = ZRoutine(
start_addr=0x6000,
return_addr=0x5678,
zmem=zmachine._mem,
args=[],
local_vars=[0xAAAA],
stack=[0xBBBB, 0xCCCC, 0xDDDD],
)
original_stack.push_routine(routine2)
zmachine._stackmanager = original_stack
# Serialize
writer = QuetzalWriter(zmachine)
stks_data = writer._generate_stks_chunk()
# Deserialize
parser = QuetzalParser(zmachine)
parser._parse_stks(stks_data)
# Verify the deserialized stack matches
restored_stack = zmachine._stackmanager
# Should have 2 frames plus the bottom sentinel
assert len(restored_stack._call_stack) == 3
# Check frame 1
frame1 = restored_stack._call_stack[1]
assert frame1.return_addr == 0x1234
assert frame1.local_vars[:3] == [0x0001, 0x0002, 0x0003]
assert frame1.stack == [0x1111, 0x2222]
# Check frame 2
frame2 = restored_stack._call_stack[2]
assert frame2.return_addr == 0x5678
assert frame2.local_vars[:1] == [0xAAAA]
assert frame2.stack == [0xBBBB, 0xCCCC, 0xDDDD]

View file

@ -784,8 +784,26 @@ class ZMachineComplexOpcodeTests(TestCase):
# Should not have branched (test is false) # Should not have branched (test is false)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc) self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
def test_op_restore_v3_branches_false(self): def test_op_restore_v3_branches_false_when_filesystem_returns_none(self):
"""Test restore (V3) branches false (no valid save files).""" """Test restore (V3) branches false when filesystem returns None."""
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
zmachine_mock = Mock()
zmachine_mock._mem = ZMemory(bytes(story))
zmachine_mock._pristine_mem = ZMemory(bytes(story))
zmachine_mock._cpu = self.cpu
zmachine_mock._stackmanager = self.stack
self.cpu._zmachine = zmachine_mock
# Mock filesystem to return None (user cancelled or no file)
self.ui.filesystem.restore_game = Mock(return_value=None)
self.decoder.branch_condition = True self.decoder.branch_condition = True
self.decoder.branch_offset = 100 self.decoder.branch_offset = 100
old_pc = self.cpu._opdecoder.program_counter old_pc = self.cpu._opdecoder.program_counter
@ -795,6 +813,155 @@ class ZMachineComplexOpcodeTests(TestCase):
# Should not have branched (test is false) # Should not have branched (test is false)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc) self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
def test_op_restore_v3_branches_false_when_no_zmachine(self):
"""Test restore (V3) branches false when zmachine is not set."""
self.cpu._zmachine = None
self.decoder.branch_condition = True
self.decoder.branch_offset = 100
old_pc = self.cpu._opdecoder.program_counter
self.cpu.op_restore()
# Should not have branched (test is false)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
def test_op_restore_v3_branches_true_on_success(self):
"""Test restore (V3) branches true when restore succeeds."""
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
# Create a story with some dynamic memory
story = bytearray(1024)
story[0] = 3 # version 3
story[0x0E] = 0x04 # static memory starts at 0x0400
story[0x0F] = 0x00
# Set header values
story[0x02] = 0x12 # release high byte
story[0x03] = 0x34 # release low byte
for i, byte in enumerate(b"860509"):
story[0x12 + i] = byte
story[0x1C] = 0xAB # checksum high
story[0x1D] = 0xCD # checksum low
# Create zmachine with modified memory state
zmachine_mock = Mock()
zmachine_mock._mem = ZMemory(bytes(story))
zmachine_mock._pristine_mem = ZMemory(bytes(story))
zmachine_mock._cpu = self.cpu
zmachine_mock._stackmanager = self.stack
# Modify some dynamic memory to create a save state
zmachine_mock._mem[0x100] = 0x42
zmachine_mock._mem[0x200] = 0x99
self.cpu._zmachine = zmachine_mock
# Generate save data using QuetzalWriter
writer = QuetzalWriter(zmachine_mock)
save_data = writer.generate_save_data()
# Mock filesystem to return the save data
self.ui.filesystem.restore_game = Mock(return_value=save_data)
self.decoder.branch_condition = True
self.decoder.branch_offset = 10
old_pc = self.cpu._opdecoder.program_counter
self.cpu.op_restore()
# Should have branched (test is true)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc + 8)
def test_op_restore_v3_restores_memory_state(self):
"""Test restore (V3) correctly restores dynamic memory."""
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
# Create a story
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
story[0x02] = 0x12
story[0x03] = 0x34
for i, byte in enumerate(b"860509"):
story[0x12 + i] = byte
story[0x1C] = 0xAB
story[0x1D] = 0xCD
# Create zmachine with saved state
saved_zmachine = Mock()
saved_zmachine._mem = ZMemory(bytes(story))
saved_zmachine._pristine_mem = ZMemory(bytes(story))
saved_zmachine._cpu = self.cpu
saved_zmachine._stackmanager = self.stack
# Modify memory to create unique save state
saved_zmachine._mem[0x50] = 0xAA
saved_zmachine._mem[0x150] = 0xBB
saved_zmachine._mem[0x250] = 0xCC
# Generate save data
writer = QuetzalWriter(saved_zmachine)
save_data = writer.generate_save_data()
# Create fresh zmachine with different state
current_zmachine = Mock()
current_zmachine._mem = ZMemory(bytes(story))
current_zmachine._pristine_mem = ZMemory(bytes(story))
current_zmachine._cpu = self.cpu
current_zmachine._stackmanager = self.stack
# Different values in memory
current_zmachine._mem[0x50] = 0x11
current_zmachine._mem[0x150] = 0x22
current_zmachine._mem[0x250] = 0x33
self.cpu._zmachine = current_zmachine
# Mock filesystem to return save data
self.ui.filesystem.restore_game = Mock(return_value=save_data)
self.decoder.branch_condition = True
self.decoder.branch_offset = 10
self.cpu.op_restore()
# Memory should now match saved state
self.assertEqual(current_zmachine._mem[0x50], 0xAA)
self.assertEqual(current_zmachine._mem[0x150], 0xBB)
self.assertEqual(current_zmachine._mem[0x250], 0xCC)
def test_op_restore_v3_branches_false_on_malformed_data(self):
"""Test restore (V3) branches false when save data is malformed."""
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
zmachine_mock = Mock()
zmachine_mock._mem = ZMemory(bytes(story))
zmachine_mock._pristine_mem = ZMemory(bytes(story))
zmachine_mock._cpu = self.cpu
zmachine_mock._stackmanager = self.stack
self.cpu._zmachine = zmachine_mock
# Mock filesystem to return invalid data
self.ui.filesystem.restore_game = Mock(return_value=b"invalid data")
self.decoder.branch_condition = True
self.decoder.branch_offset = 10
old_pc = self.cpu._opdecoder.program_counter
self.cpu.op_restore()
# Should not have branched (test is false)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
def test_op_save_v3_branches_true_on_success(self): def test_op_save_v3_branches_true_on_success(self):
"""Test save (V3) branches true when filesystem succeeds.""" """Test save (V3) branches true when filesystem succeeds."""
# Create minimal zmachine mock # Create minimal zmachine mock