Compare commits

..

19 commits

Author SHA1 Message Date
f4b7d0548b
Update if-journey.rst with save/restore bug fix details 2026-02-10 13:15:16 -05:00
c52e59c5d4
Process V3 save branch on restore to advance PC past branch data
The Quetzal spec stores the PC pointing at the save instruction's
branch data. On restore, this branch must be processed as "save
succeeded" to advance the PC to the actual next instruction. Without
this, the branch bytes were decoded as an opcode, corrupting execution.

Detect the save opcode (0xB5) immediately before the restored PC to
distinguish in-game saves from out-of-band saves (which don't need
branch processing). Also improve error diagnostics: pop_stack now
raises ZStackPopError with frame context, and the instruction trace
dumps on all exceptions.
2026-02-10 13:15:16 -05:00
8526e48247
Fix Quetzal Stks field mapping: return_pc to caller, varnum to frame
return_pc for each frame belongs on the caller's program_counter (the
resume address when this routine exits). varnum is the store variable
that receives the return value, stored as return_addr on the frame
itself. Also handle flags bit 4 (discard result → return_addr=None).
2026-02-10 12:39:40 -05:00
776cfba021
Pad restored local vars to 15 slots
The Quetzal save format only stores the number of local vars the
routine declared, but the runtime indexes local_vars[0..14] for any
variable access. Restored routines had short lists, causing IndexError
on the first command after restore.
2026-02-10 12:00:15 -05:00
1ee89e5e3c
Log full traceback on interpreter crash
The error handler only captured str(e), losing the stack trace.
Now logs the full traceback so crashes are actually debuggable.
2026-02-10 12:00:10 -05:00
3627ce8245
Add z-machine save file inspection script
Offline diagnostic tool that loads a story + save file and shows
PC, stack frames, player location, objects, and can disassemble
at arbitrary addresses.
2026-02-10 11:51:55 -05:00
65a080608a
Fix stack manager references after Quetzal restore
QuetzalParser._parse_stks() creates a new ZStackManager and sets it on
zmachine._stackmanager, but ZCpu._stackmanager and ZOpDecoder._stack
still pointed to the old empty stack. After restore, all stack ops
(local var reads, routine returns, stack pops) used the wrong stack,
causing the interpreter to crash on the first command.
2026-02-10 11:51:50 -05:00
15e1d807aa
Move z-machine restore before interpreter thread start
Replaces the async _do_restore() (called after thread launch) with a
synchronous _try_restore() called before the thread starts. This
eliminates the race condition where restore mutates z-machine state
while the interpreter thread is running.

The restore prefix message is now part of start()'s return value
instead of being sent separately in play.py.
2026-02-10 11:51:45 -05:00
224c1f0645
Add reconnect tintin command 2026-02-10 11:26:41 -05:00
8b4493ea39
Update if-journey docs with Level 2 integration milestone 2026-02-10 11:18:22 -05:00
b6d933acc5
Add tests for embedded z-machine MUD integration
Unit tests for MUD UI components (screen, input stream, filesystem)
and integration tests with real zork1.z3 (session lifecycle, escape
commands, save/restore round-trip, state inspection).
2026-02-10 11:18:19 -05:00
7c1d1efcdb
Wire embedded z-machine interpreter into MUD mode stack
EmbeddedIFSession runs the hybrid interpreter in a daemon thread,
bridged to the async MUD loop via threading.Event synchronization.
.z3 files use the embedded path; other formats fall back to dfrotz.

- MUD ZUI components: MudScreen (buffered output), MudInputStream
  (thread-safe input), MudFilesystem (quetzal saves), NullAudio
- save/restore via QuetzalWriter/QuetzalParser and :: escape commands
- state inspection: get_location_name(), get_room_objects()
- error reporting for interpreter crashes
- fix quetzal parser bit slice bug: _parse_stks used [0:3] (3 bits,
  max 7 locals) instead of [0:4] (4 bits, max 15) — Zork uses 15
2026-02-10 11:18:16 -05:00
5b7cb252b5
Update if-journey docs with save/restore completion
Save/restore is now fully implemented in the hybrid interpreter. Updated
open question 7 to reflect completion, marked the what-to-do-next item as
done, and updated the milestone section to include save/restore in the
"what works" list.

Also noted the QuetzalParser off-by-one bug fix (return_pc parsing).
2026-02-10 10:13:45 -05:00
1ffc4e14c2
Add round-trip save/restore integration test
Verifies complete save/restore pipeline by generating save data with
QuetzalWriter and restoring it with QuetzalParser. Tests cover:
- Basic round-trip with memory, stack, and PC restoration
- Multiple nested call frames
- Preservation of unchanged memory bytes (run-length encoding)
- Empty stack (no routine frames)

Each test confirms that state modified after save is correctly
restored to original values from the save data.
2026-02-10 10:13:45 -05:00
b0fb9b5e2c
Wire op_restore to QuetzalParser and filesystem
Implement V3 restore opcode:
- Add QuetzalParser.load_from_bytes() to parse save data from memory
- Wire op_restore to call filesystem.restore_game() and parse result
- Validate IFhd matches current story (release/serial/checksum)
- Restore dynamic memory, call stack, and program counter
- Branch true on success, false on failure/cancellation

Fix IFF chunk padding bug:
- Add padding byte to odd-length chunks in QuetzalWriter
- Ensures proper chunk alignment for parser compatibility

Add comprehensive tests:
- Branch false when filesystem returns None
- Branch false without zmachine reference
- Branch true on successful restore
- Verify memory state matches saved values
- Handle malformed save data gracefully
2026-02-10 10:13:45 -05:00
a5053e10f2
Wire op_save to QuetzalWriter and filesystem
Implement full save functionality for V3 z-machine:
- Fixed QuetzalWriter._generate_anno_chunk() to return bytes
- Added QuetzalWriter.generate_save_data() to produce IFF container
- Updated QuetzalWriter.write() to use new method and binary mode
- Added zmachine reference to ZCpu for QuetzalWriter access
- Added _program_counter property to ZCpu for Quetzal access
- Implemented op_save to call QuetzalWriter and filesystem
- Updated tests for op_save (success, failure, IFF validation)
- Added filesystem mock to MockUI for testing
- Added _call_stack to MockStackManager for QuetzalWriter

All tests pass. Save now generates valid IFF/FORM/IFZS data with
IFhd, CMem, Stks, and ANNO chunks.
2026-02-10 10:13:45 -05:00
69b1ef8a59
Implement QuetzalWriter CMem and Stks chunk generators
Adds comprehensive test coverage for CMem chunk generation:
- test_cmem_all_unchanged: Empty output when memory unchanged
- test_cmem_single_byte_change: Single byte modification
- test_cmem_multiple_scattered_changes: Multiple changes
- test_cmem_roundtrip_with_parser: Writer/parser integration
- test_cmem_consecutive_zeros: Run-length encoding validation
2026-02-10 10:13:33 -05:00
2b8c177977
Fix off-by-one in QuetzalParser return_pc parsing 2026-02-10 10:13:33 -05:00
0c6eadb0da
Implement QuetzalWriter._generate_ifhd_chunk()
The IFhd chunk contains 13 bytes of metadata identifying the story
and current execution state:
- Release number (2 bytes) from header
- Serial number (6 bytes) from header
- Checksum (2 bytes) from header
- Program counter (3 bytes) from CPU state

This allows save files to be validated against the story file.
2026-02-10 09:47:24 -05:00
18 changed files with 3032 additions and 123 deletions

View file

@ -205,6 +205,8 @@ All V3 gaps have been resolved. sread tokenization works correctly. save/restore
Do they represent z-machine memory the same way? Both use bytearrays, but header parsing, object table offsets, string encoding — are these compatible enough that porting opcodes is translation, or is it a deeper rewrite? Do they represent z-machine memory the same way? Both use bytearrays, but header parsing, object table offsets, string encoding — are these compatible enough that porting opcodes is translation, or is it a deeper rewrite?
UPDATE: Less urgent now that the hybrid interpreter works end-to-end for V3. The layout question mainly matters for V5 opcode porting (Lost Pig, Wizard Sniffer). The hybrid already handles all V3 memory operations correctly.
3. Async model 3. Async model
~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~
@ -234,12 +236,16 @@ How hard is it to add words to a z-machine dictionary at runtime? The dictionary
7. Save/restore in the hybrid interpreter 7. Save/restore in the hybrid interpreter
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
``op_save`` is a stub that always branches false (game prints "Failed."). The infrastructure is mostly there — ``TrivialFilesystem.save_game()`` prompts for a filename and writes to disk, ``QuetzalParser`` can read save files — but two pieces are missing: RESOLVED: save/restore is now fully implemented and working. Key pieces:
- ``QuetzalWriter`` chunk generators (``ifhd``, ``cmem``, ``stks``) are all stubs returning ``"0"`` - ``QuetzalWriter`` chunk generators implemented (``IFhd`` for header, ``CMem`` for XOR-compressed dynamic memory, ``Stks`` for stack frame serialization)
- ``op_save`` doesn't collect game state or call the filesystem - ``op_save`` and ``op_restore`` wired to filesystem layer via ``TrivialFilesystem``
- round-trip tested: save game state, restore it, continue playing
- fixed Quetzal ``Stks`` field mapping: ``return_pc`` belongs on the caller frame's ``program_counter``, not the current frame. ``varnum`` is the store variable on the current frame. round-trip tests masked this because writer and parser had the same bug symmetrically
- fixed V3 save branch processing on restore: in-game saves store PC pointing at branch data after the save opcode (0xB5). ``_try_restore()`` detects this and calls ``_branch(True)`` to advance past it. without this, branch bytes were decoded as instructions
- fixed restored local var padding: save files store only declared locals, runtime expects 15 slots. now zero-pads on restore
To make save work: implement ``QuetzalWriter`` (XOR-compress dynamic memory against original story, serialize stack frames into Quetzal format), then wire ``op_save`` to generate the bytes and call ``self._ui.filesystem.save_game(data)``. Restore should be simpler since ``QuetzalParser`` already works — just need to wire ``op_restore`` to call ``filesystem.restore_game()`` and apply the parsed state. Quetzal format is now fully supported for both reading and writing saves. Diagnostic tooling: ``scripts/zmachine_inspect.py`` for offline state inspection, instruction trace deque (last 20) auto-dumps on crash.
what to do next what to do next
--------------- ---------------
@ -254,7 +260,9 @@ Concrete next steps, roughly ordered. Update as items get done.
- [x] build level 1 prototype: regardless of interpreter choice, implement the terminal object, IF mode, and subprocess dfrotz path. this proves the MUD-side architecture (mode stack, spectators, save/restore) independently of the interpreter question. (done — see ``docs/how/if-terminal.txt``) - [x] build level 1 prototype: regardless of interpreter choice, implement the terminal object, IF mode, and subprocess dfrotz path. this proves the MUD-side architecture (mode stack, spectators, save/restore) independently of the interpreter question. (done — see ``docs/how/if-terminal.txt``)
- [ ] implement save/restore: finish ``QuetzalWriter`` chunk generators and wire ``op_save``/``op_restore`` to the filesystem layer. restore should be easier since ``QuetzalParser`` already works. - [x] implement save/restore: finished ``QuetzalWriter`` chunk generators (IFhd, CMem, Stks) and wired ``op_save``/``op_restore`` to filesystem. quetzal round-trip now works — can save during gameplay, restore, and continue. also fixed parser off-by-one bug in return_pc.
- [x] wire embedded interpreter to MUD: connected the hybrid interpreter to the MUD's mode stack via ``EmbeddedIFSession``. .z3 files use the embedded interpreter; other formats fall back to dfrotz. save/restore works via QuetzalWriter/QuetzalParser. state inspection (room name, objects) enables level 2. found and fixed a quetzal parser bug (bit slice for local vars was 3 bits, needed 4). (done — see ``src/mudlib/embedded_if_session.py``, ``src/mudlib/zmachine/mud_ui.py``)
- [ ] study MojoZork's multiplayer model: read the MultiZork source for how it handles multiple players in one z-machine. document the pattern for our eventual level 4. - [ ] study MojoZork's multiplayer model: read the MultiZork source for how it handles multiple players in one z-machine. document the pattern for our eventual level 4.
@ -273,7 +281,8 @@ What works:
- instruction trace deque (last 20 instructions) for debugging state errors - instruction trace deque (last 20 instructions) for debugging state errors
- smoke test: ``scripts/run_zork1.py`` runs the game headless, exercises core opcode paths - smoke test: ``scripts/run_zork1.py`` runs the game headless, exercises core opcode paths
- parser and lexer: all Zork 1 commands work (look, open mailbox, read leaflet, inventory, take, drop, navigation) - parser and lexer: all Zork 1 commands work (look, open mailbox, read leaflet, inventory, take, drop, navigation)
- the interpreter is fully playable for Zork 1 (save/restore not yet wired — see open question 7) - save/restore: full quetzal format support for persisting and restoring game state
- the interpreter is fully playable for Zork 1
What this enables: What this enables:
@ -284,6 +293,29 @@ What this enables:
The step-based execution model means IF sessions can run in the async MUD game loop without blocking. Each player command advances their z-machine instance by N instructions (until output or a stopping condition). The trace deque captures the last 20 instructions for debugging unexpected state. The step-based execution model means IF sessions can run in the async MUD game loop without blocking. Each player command advances their z-machine instance by N instructions (until output or a stopping condition). The trace deque captures the last 20 instructions for debugging unexpected state.
milestone — Level 2: embedded interpreter wired to MUD
-------------------------------------------------------
The embedded z-machine interpreter is now connected to the MUD engine. Players can ``play zork1`` and the game runs inside the MUD process — no dfrotz subprocess needed for .z3 files.
What works:
- ``EmbeddedIFSession`` wraps the hybrid interpreter with the same interface as the dfrotz-based ``IFSession``
- MUD ZUI components: ``MudScreen`` (buffered output), ``MudInputStream`` (thread-safe input with events), ``MudFilesystem`` (quetzal saves to disk), ``NullAudio``
- interpreter runs in a daemon thread; ``MudInputStream`` uses ``threading.Event`` for async bridge — interpreter blocks on ``read_line()``, async side feeds input and waits for next prompt
- save/restore via ``::save`` and ``::quit`` escape commands (QuetzalWriter), auto-restore on session start (QuetzalParser)
- state inspection: ``get_location_name()`` reads global variable 0 (player location object), ``get_room_objects()`` walks the object tree
- .z3 files use embedded interpreter, other formats fall back to dfrotz
- fixed quetzal parser bug: ``_parse_stks`` bit slice was ``[0:3]`` (3 bits, max 7 locals), should be ``[0:4]`` (4 bits, max 15 locals) — Zork uses 15
- 558 tests pass including unit tests for MUD UI components and integration tests with real zork1.z3
What this enables:
- spectators can see what room the IF player is in (``get_location_name()``)
- MUD code can read the object tree, variables, and attributes
- foundation for level 3 (moldable world — write z-machine state from MUD)
- no external dependency on dfrotz for V3 games
related documents related documents
----------------- -----------------

View file

@ -12,7 +12,10 @@
#alias {fse} {fly southeast} #alias {fse} {fly southeast}
#alias {fsw} {fly southwest} #alias {fsw} {fly southwest}
#NOP combat aliases (pr/pl/dr/dl/f/v are built into the MUD)
#NOP these are extras for single-key convenience
#alias {o} {sweep} #alias {o} {sweep}
#alias {r} {roundhouse} #alias {r} {roundhouse}
#alias {reconnect} {
#zap mud;
#session mud localhost 6789;
}

399
scripts/zmachine_inspect.py Executable file
View file

@ -0,0 +1,399 @@
#!/usr/bin/env -S uv run --script
"""Z-Machine state inspection and debugging tool.
Loads a story file and optionally applies a Quetzal save, then displays
machine state for debugging z-machine interpreter issues.
"""
import argparse
import sys
from pathlib import Path
# Add src to path so we can import mudlib
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))
from mudlib.zmachine.quetzal import ( # noqa: E402
QuetzalError,
QuetzalMismatchedFile,
QuetzalParser,
)
from mudlib.zmachine.trivialzui import create_zui # noqa: E402
from mudlib.zmachine.zmachine import ZMachine # noqa: E402
# Opcode name tables for disassembly
OP2_NAMES = {
1: "je",
2: "jl",
3: "jg",
4: "dec_chk",
5: "inc_chk",
6: "jin",
7: "test",
8: "or",
9: "and",
10: "test_attr",
11: "set_attr",
12: "clear_attr",
13: "store",
14: "insert_obj",
15: "loadw",
16: "loadb",
17: "get_prop",
18: "get_prop_addr",
19: "get_next_prop",
20: "add",
21: "sub",
22: "mul",
23: "div",
24: "mod",
}
OP1_NAMES = {
0: "jz",
1: "get_sibling",
2: "get_child",
3: "get_parent",
4: "get_prop_len",
5: "inc",
6: "dec",
7: "print_addr",
8: "call_1s",
9: "remove_obj",
10: "print_obj",
11: "ret",
12: "jump",
13: "print_paddr",
14: "load",
15: "not/call_1n",
}
OP0_NAMES = {
0: "rtrue",
1: "rfalse",
2: "print",
3: "print_ret",
4: "nop",
5: "save",
6: "restore",
7: "restart",
8: "ret_popped",
9: "pop/catch",
10: "quit",
11: "new_line",
12: "show_status",
13: "verify",
15: "piracy",
}
VAR_NAMES = {
0: "call",
1: "storew",
2: "storeb",
3: "put_prop",
4: "sread",
5: "print_char",
6: "print_num",
7: "random",
8: "push",
9: "pull",
10: "split_window",
11: "set_window",
19: "output_stream",
20: "input_stream",
21: "sound_effect",
}
def decode_opcode_class(opcode_byte):
"""Determine opcode class from the opcode byte."""
if opcode_byte < 0x80:
return "2OP"
elif opcode_byte < 0xC0:
op_type = (opcode_byte >> 4) & 3
if op_type == 3:
return "0OP"
else:
return "1OP"
elif opcode_byte < 0xE0:
return "2OP"
else:
return "VAR"
def get_opcode_name(opcode_class, opcode_num):
"""Get the name of an opcode."""
if opcode_class == "2OP":
return OP2_NAMES.get(opcode_num, f"unknown_{opcode_num}")
elif opcode_class == "1OP":
return OP1_NAMES.get(opcode_num, f"unknown_{opcode_num}")
elif opcode_class == "0OP":
return OP0_NAMES.get(opcode_num, f"unknown_{opcode_num}")
elif opcode_class == "VAR":
return VAR_NAMES.get(opcode_num, f"unknown_{opcode_num}")
else:
return "unknown"
def parse_operand_types(mem, pc, opcode_byte):
"""Parse operand types without evaluating them. Returns (types, bytes_consumed)."""
types = []
pos = pc
if opcode_byte < 0x80:
# Long form 2OP
types.append("var" if (opcode_byte & 0x40) else "small")
types.append("var" if (opcode_byte & 0x20) else "small")
return types, pos - pc
elif opcode_byte < 0xC0:
# Short form
op_type = (opcode_byte >> 4) & 3
if op_type == 0:
types.append("large")
elif op_type == 1:
types.append("small")
elif op_type == 2:
types.append("var")
# op_type == 3 means 0OP, no operands
return types, pos - pc
else:
# Variable form - read types byte
types_byte = mem[pos]
pos += 1
for i in range(4):
t = (types_byte >> (6 - i * 2)) & 3
if t == 3:
break
elif t == 0:
types.append("large")
elif t == 1:
types.append("small")
elif t == 2:
types.append("var")
return types, pos - pc
def disassemble_at(zmachine, addr, count):
"""Disassemble count instructions starting at addr."""
mem = zmachine._mem
pc = addr
print(f"\n--- disassembly at 0x{addr:06x} ({count} instructions) ---")
for _ in range(count):
if pc >= len(mem._memory):
print(f" {pc:06x} (out of bounds)")
break
start_pc = pc
opcode_byte = mem[pc]
pc += 1
# Decode opcode class and number
if opcode_byte < 0x80:
# Long form 2OP
op_class = "2OP"
op_num = opcode_byte & 0x1F
elif opcode_byte < 0xC0:
# Short form
op_type = (opcode_byte >> 4) & 3
if op_type == 3:
op_class = "0OP"
op_num = opcode_byte & 0x0F
else:
op_class = "1OP"
op_num = opcode_byte & 0x0F
elif opcode_byte < 0xE0:
# Variable form 2OP
op_class = "2OP"
op_num = opcode_byte & 0x1F
else:
# Variable form VAR
op_class = "VAR"
op_num = opcode_byte & 0x1F
op_name = get_opcode_name(op_class, op_num)
# Parse operand types
operand_types, type_bytes = parse_operand_types(mem, pc, opcode_byte)
pc += type_bytes
# Skip past operand values (without reading them)
operand_str_parts = []
for ot in operand_types:
if ot == "large":
if pc + 1 < len(mem._memory):
val = (mem[pc] << 8) | mem[pc + 1]
operand_str_parts.append(f"#{val}")
pc += 2
elif ot == "small":
if pc < len(mem._memory):
operand_str_parts.append(f"#{mem[pc]}")
pc += 1
elif ot == "var" and pc < len(mem._memory):
operand_str_parts.append(f"V{mem[pc]:02x}")
pc += 1
operands_str = ", ".join(operand_str_parts)
print(f" {start_pc:06x} {op_class}:{op_num:02d} {op_name}({operands_str})")
def get_location_info(zmachine):
"""Get current location object and its contents."""
try:
# Global variable 0 (opcode variable 0x10) is the player location
location_obj = zmachine._mem.read_global(0x10)
if location_obj == 0:
return None, []
obj_parser = zmachine._objectparser
# Try to get the location name
try:
location_name = obj_parser.get_shortname(location_obj)
except Exception:
# Invalid object number - return None
return None, []
# Get objects in this location (children)
objects = []
child = obj_parser.get_child(location_obj)
while child != 0:
try:
child_name = obj_parser.get_shortname(child)
objects.append(child_name)
except Exception:
objects.append(f"object #{child}")
child = obj_parser.get_sibling(child)
return (location_obj, location_name), objects
except Exception:
return None, []
def validate_save(zmachine, save_path):
"""Validate a save file and print diagnostic info."""
print("\n--- save file validation ---")
# Read the save file
try:
with open(save_path, "rb") as f:
save_data = f.read()
except OSError as e:
print(f"ERROR: Cannot read save file: {e}")
return False
# Parse it
parser = QuetzalParser(zmachine)
try:
parser.load_from_bytes(save_data)
except QuetzalMismatchedFile:
print("ERROR: Save file does not match story file")
print(" (release number, serial, or checksum mismatch)")
return False
except QuetzalError as e:
print(f"ERROR: Invalid Quetzal file: {e}")
return False
except Exception as e:
print(f"ERROR: Failed to parse save file: {e}")
return False
# Get metadata
metadata = parser.get_last_loaded()
print("Save file loaded successfully")
print(f" release: {metadata.get('release number', 'unknown')}")
print(f" serial: {metadata.get('serial number', 'unknown')}")
print(f" checksum: 0x{metadata.get('checksum', 0):04x}")
print(f" PC: 0x{metadata.get('program counter', 0):06x}")
# Check PC is in bounds
pc = metadata.get("program counter", 0)
mem_size = len(zmachine._mem._memory)
if pc >= mem_size:
print(f" WARNING: PC 0x{pc:06x} is out of bounds (size: {mem_size})")
return False
print(" PC is within story file bounds")
return True
def main():
parser = argparse.ArgumentParser(
description="Inspect z-machine state for debugging"
)
parser.add_argument("story", help="Path to story file (.z3/.z5/.z8)")
parser.add_argument("--save", help="Path to Quetzal save file (.qzl)")
parser.add_argument(
"--disasm", type=lambda x: int(x, 0), help="Address to disassemble (hex)"
)
parser.add_argument(
"--count", type=int, default=5, help="Number of instructions to disassemble"
)
parser.add_argument(
"--globals", action="store_true", help="Show first 16 global variables"
)
args = parser.parse_args()
story_path = Path(args.story)
if not story_path.exists():
print(f"ERROR: Story file not found: {story_path}")
sys.exit(1)
# Load story
with open(story_path, "rb") as f:
story_bytes = f.read()
# Create ZMachine
ui = create_zui()
zmachine = ZMachine(story_bytes, ui)
# Load save if provided
if args.save:
save_path = Path(args.save)
if not save_path.exists():
print(f"ERROR: Save file not found: {save_path}")
sys.exit(1)
if not validate_save(zmachine, save_path):
sys.exit(1)
# Display state
print("\nzmachine state")
print("==============")
print(f"version: {zmachine._mem.version}")
print(f"story size: {len(story_bytes)} bytes")
print(f"PC: 0x{zmachine._opdecoder.program_counter:06x}")
print(f"stack: {len(zmachine._stackmanager._call_stack) - 1} frames")
# Location info
location, objects = get_location_info(zmachine)
if location:
loc_obj, loc_name = location
print(f"\nlocation: {loc_name} (#{loc_obj})")
if objects:
print("objects: " + ", ".join(objects))
else:
print("objects: (none)")
else:
print("\nlocation: (unknown)")
# Global variables
if args.globals:
print("\nglobals (first 16):")
for i in range(16):
global_val = zmachine._mem.read_global(0x10 + i)
print(f" G{i:02d} (V{0x10 + i:02x}): 0x{global_val:04x} ({global_val})")
# Disassembly
if args.disasm is not None:
disassemble_at(zmachine, args.disasm, args.count)
else:
# Disassemble instructions at PC
disassemble_at(zmachine, zmachine._opdecoder.program_counter, args.count)
if __name__ == "__main__":
main()

View file

@ -3,6 +3,7 @@
import pathlib import pathlib
from mudlib.commands import CommandDefinition, register from mudlib.commands import CommandDefinition, register
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.if_session import IFSession, broadcast_to_spectators from mudlib.if_session import IFSession, broadcast_to_spectators
from mudlib.player import Player from mudlib.player import Player
@ -60,8 +61,20 @@ async def cmd_play(player: Player, args: str) -> None:
await player.send(msg) await player.send(msg)
return return
# Create and start IF session # Ensure story_path is a Path object (for mocking compatibility)
session = IFSession(player, str(story_path), game_name) if not isinstance(story_path, pathlib.Path):
story_path = pathlib.Path(story_path)
# Use embedded interpreter for z3 files, dfrotz for others
if story_path.suffix == ".z3":
try:
session = EmbeddedIFSession(player, str(story_path), game_name)
except (FileNotFoundError, OSError) as e:
await player.send(f"error starting game: {e}\r\n")
return
else:
session = IFSession(player, str(story_path), game_name)
try: try:
intro = await session.start() intro = await session.start()
except FileNotFoundError: except FileNotFoundError:
@ -78,18 +91,9 @@ async def cmd_play(player: Player, args: str) -> None:
await player.send("(type ::help for escape commands)\r\n") await player.send("(type ::help for escape commands)\r\n")
# Check for saved game if intro:
if session.save_path.exists():
await player.send("restoring saved game...\r\n")
restored_text = await session._do_restore()
if restored_text:
await player.send(restored_text + "\r\n")
# Broadcast restored text to spectators
spectator_msg = f"[{player.name}'s terminal]\r\n{restored_text}\r\n"
await broadcast_to_spectators(player, spectator_msg)
elif intro:
await player.send(intro + "\r\n") await player.send(intro + "\r\n")
# Broadcast intro to spectators # Broadcast to spectators
spectator_msg = f"[{player.name}'s terminal]\r\n{intro}\r\n" spectator_msg = f"[{player.name}'s terminal]\r\n{intro}\r\n"
await broadcast_to_spectators(player, spectator_msg) await broadcast_to_spectators(player, spectator_msg)

View file

@ -0,0 +1,174 @@
import asyncio
import logging
import re
import threading
import traceback
from pathlib import Path
from typing import TYPE_CHECKING
from mudlib.if_session import IFResponse
from mudlib.zmachine.mud_ui import create_mud_ui
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
from mudlib.zmachine.zcpu import ZCpuQuit, ZCpuRestart
from mudlib.zmachine.zmachine import ZMachine
if TYPE_CHECKING:
from mudlib.player import Player
logger = logging.getLogger(__name__)
class EmbeddedIFSession:
"""Wraps z-machine interpreter for MUD integration."""
def __init__(self, player: "Player", story_path: str, game_name: str = ""):
self.player = player
self.story_path = story_path
self.game_name = game_name or Path(story_path).stem
self._data_dir = Path(__file__).resolve().parents[2] / "data"
self._thread: threading.Thread | None = None
self._done = False
self._error: str | None = None
story_bytes = Path(story_path).read_bytes()
save_path = self.save_path
self._ui, self._screen, self._keyboard = create_mud_ui(save_path)
self._zmachine = ZMachine(story_bytes, self._ui)
self._filesystem = self._ui.filesystem
@property
def save_path(self) -> Path:
safe_name = re.sub(r"[^a-zA-Z0-9_-]", "_", self.player.name)
return self._data_dir / "if_saves" / safe_name / f"{self.game_name}.qzl"
def _try_restore(self) -> bool:
"""Try to restore from save file before interpreter starts.
Must be called before the interpreter thread is launched.
Returns True if state was restored successfully.
"""
if not self.save_path.exists():
return False
try:
save_data = self.save_path.read_bytes()
parser = QuetzalParser(self._zmachine)
parser.load_from_bytes(save_data)
# In V1-3, the saved PC points to branch data after the save
# instruction. Process the branch as "save succeeded" so the
# PC advances past it. Detect by checking for save opcode (0xB5)
# immediately before the restored PC.
pc = self._zmachine._opdecoder.program_counter
if (
self._zmachine._mem.version <= 3
and pc > 0
and self._zmachine._mem[pc - 1] == 0xB5
):
self._zmachine._cpu._branch(True)
return True
except Exception as e:
logger.debug(f"Restore failed: {e}")
return False
async def start(self) -> str:
"""Start the z-machine interpreter, restoring from save if available."""
restored = self._try_restore()
self._thread = threading.Thread(target=self._run_interpreter, daemon=True)
self._thread.start()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._keyboard._waiting.wait)
output = self._screen.flush()
if restored:
prefix = "restoring saved game...\r\nrestored."
return f"{prefix}\r\n\r\n{output}" if output else prefix
return output
async def handle_input(self, text: str) -> IFResponse:
if text.lower() == "::quit":
await self._do_save()
return IFResponse(output="game saved.", done=True)
if text.lower() == "::help":
help_text = """escape commands:
::quit - exit the game
::save - save game progress
::help - show this help"""
return IFResponse(output=help_text, done=False)
if text.lower() == "::save":
confirmation = await self._do_save()
return IFResponse(output=confirmation, done=False)
self._keyboard._waiting.clear()
self._keyboard.feed(text)
loop = asyncio.get_running_loop()
def wait_for_next_input():
while not self._done and not self._keyboard._waiting.is_set():
self._keyboard._waiting.wait(timeout=0.1)
await loop.run_in_executor(None, wait_for_next_input)
output = self._screen.flush()
if self._done and self._error:
output = f"{output}\r\n{self._error}" if output else self._error
return IFResponse(output=output, done=self._done)
async def stop(self):
self._done = True
if self._keyboard._waiting.is_set():
self._keyboard.feed("")
def _run_interpreter(self):
try:
self._zmachine.run()
except ZCpuQuit:
logger.debug("Interpreter quit normally")
except ZCpuRestart:
logger.debug("Interpreter restart requested")
except Exception as e:
tb = traceback.format_exc()
logger.error(f"Interpreter crashed:\n{tb}")
self._error = f"interpreter error: {e}"
finally:
self._done = True
self._keyboard._waiting.set()
async def _do_save(self) -> str:
try:
writer = QuetzalWriter(self._zmachine)
save_data = writer.generate_save_data()
success = self._filesystem.save_game(save_data)
if success:
return "saved."
return "error: save failed"
except Exception as e:
return f"error: save failed ({e})"
def get_location_name(self) -> str | None:
try:
location_obj = self._zmachine._mem.read_global(0)
if location_obj == 0:
return None
return self._zmachine._objectparser.get_shortname(location_obj)
except Exception:
return None
def get_room_objects(self) -> list[str]:
try:
location_obj = self._zmachine._mem.read_global(0)
if location_obj == 0:
return []
objects = []
child = self._zmachine._objectparser.get_child(location_obj)
while child != 0:
name = self._zmachine._objectparser.get_shortname(child)
objects.append(name)
child = self._zmachine._objectparser.get_sibling(child)
return objects
except Exception:
return []

View file

@ -10,6 +10,7 @@ from mudlib.entity import Entity
if TYPE_CHECKING: if TYPE_CHECKING:
from mudlib.editor import Editor from mudlib.editor import Editor
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.if_session import IFSession from mudlib.if_session import IFSession
@ -23,7 +24,7 @@ class Player(Entity):
mode_stack: list[str] = field(default_factory=lambda: ["normal"]) mode_stack: list[str] = field(default_factory=lambda: ["normal"])
caps: ClientCaps = field(default_factory=lambda: ClientCaps(ansi=True)) caps: ClientCaps = field(default_factory=lambda: ClientCaps(ansi=True))
editor: Editor | None = None editor: Editor | None = None
if_session: IFSession | None = None if_session: IFSession | EmbeddedIFSession | None = None
@property @property
def mode(self) -> str: def mode(self) -> str:

View file

@ -0,0 +1,150 @@
import logging
import queue
import threading
from pathlib import Path
from . import zaudio, zfilesystem, zscreen, zstream, zui
logger = logging.getLogger(__name__)
class NullAudio(zaudio.ZAudio):
def __init__(self):
super().__init__()
self.features = {"has_more_than_a_bleep": False}
def play_bleep(self, bleep_type):
pass
class MudScreen(zscreen.ZScreen):
def __init__(self):
super().__init__()
self._buffer = []
self._columns = 80
self._rows = zscreen.INFINITE_ROWS
self.features = {
"has_status_line": False,
"has_upper_window": False,
"has_graphics_font": False,
"has_text_colors": False,
}
def write(self, string):
self._buffer.append(string)
def flush(self) -> str:
result = "".join(self._buffer)
self._buffer.clear()
return result
def split_window(self, height):
logger.debug(f"split_window({height}) - no-op")
def select_window(self, window):
logger.debug(f"select_window({window}) - no-op")
def set_cursor_position(self, x, y):
logger.debug(f"set_cursor_position({x}, {y}) - no-op")
def erase_window(self, window=zscreen.WINDOW_LOWER, color=zscreen.COLOR_CURRENT):
logger.debug(f"erase_window({window}, {color}) - no-op")
def erase_line(self):
logger.debug("erase_line() - no-op")
def print_status_score_turns(self, text, score, turns):
pass
def print_status_time(self, hours, minutes):
pass
def set_font(self, font_number):
if font_number == zscreen.FONT_NORMAL:
return font_number
return None
def set_text_style(self, style):
pass
def set_text_color(self, foreground_color, background_color):
pass
class MudInputStream(zstream.ZInputStream):
def __init__(self):
super().__init__()
self._input_queue = queue.Queue()
self._waiting = threading.Event()
self._ready = threading.Event()
self._done = False
self.features = {"has_timed_input": False}
def read_line(
self,
original_text=None,
max_length=0,
terminating_characters=None,
timed_input_routine=None,
timed_input_interval=0,
):
self._waiting.set()
self._ready.wait()
self._ready.clear()
self._waiting.clear()
text = self._input_queue.get()
if max_length > 0:
text = text[:max_length]
return text
def read_char(self, timed_input_routine=None, timed_input_interval=0):
self._waiting.set()
self._ready.wait()
self._ready.clear()
self._waiting.clear()
text = self._input_queue.get()
if text:
return ord(text[0])
return 0
def feed(self, text: str):
self._input_queue.put(text)
self._ready.set()
class MudFilesystem(zfilesystem.ZFilesystem):
def __init__(self, save_path: Path):
self.save_path = save_path
def save_game(self, data, suggested_filename=None):
try:
self.save_path.parent.mkdir(parents=True, exist_ok=True)
self.save_path.write_bytes(data)
return True
except Exception as e:
logger.error(f"Failed to save game: {e}")
return False
def restore_game(self):
if self.save_path.exists():
try:
return self.save_path.read_bytes()
except Exception as e:
logger.error(f"Failed to restore game: {e}")
return None
return None
def open_transcript_file_for_writing(self):
return None
def open_transcript_file_for_reading(self):
return None
def create_mud_ui(save_path: Path) -> tuple[zui.ZUI, MudScreen, MudInputStream]:
audio = NullAudio()
screen = MudScreen()
keyboard = MudInputStream()
filesystem = MudFilesystem(save_path)
ui = zui.ZUI(audio, screen, keyboard, filesystem)
return ui, screen, keyboard

View file

@ -203,24 +203,31 @@ class QuetzalParser:
# Read successive stack frames: # Read successive stack frames:
while ptr < total_len: while ptr < total_len:
log(" Parsing stack frame...") log(" Parsing stack frame...")
return_pc = (bytes[ptr] << 16) + (bytes[ptr + 1] << 8) + bytes[ptr + 3] return_pc = (bytes[ptr] << 16) + (bytes[ptr + 1] << 8) + bytes[ptr + 2]
ptr += 3 ptr += 3
flags_bitfield = bitfield.BitField(bytes[ptr]) flags_bitfield = bitfield.BitField(bytes[ptr])
ptr += 1 ptr += 1
_varnum = bytes[ptr] ### TODO: tells us which variable gets the result varnum = bytes[ptr]
ptr += 1 ptr += 1
_argflag = bytes[ptr] _argflag = bytes[ptr]
ptr += 1 ptr += 1
evalstack_size = (bytes[ptr] << 8) + bytes[ptr + 1] evalstack_size = (bytes[ptr] << 8) + bytes[ptr + 1]
ptr += 2 ptr += 2
# read anywhere from 0 to 15 local vars # Quetzal flags bit 4: if set, routine discards its return value
discard_result = flags_bitfield[4]
store_var = None if discard_result else varnum
# read anywhere from 0 to 15 local vars, pad to 15
num_locals = flags_bitfield[0:4]
local_vars = [] local_vars = []
for _i in range(flags_bitfield[0:3]): for _i in range(num_locals):
var = (bytes[ptr] << 8) + bytes[ptr + 1] var = (bytes[ptr] << 8) + bytes[ptr + 1]
ptr += 2 ptr += 2
local_vars.append(var) local_vars.append(var)
log(f" Found {len(local_vars)} local vars") # runtime expects 15 slots, save only stores declared count
local_vars.extend([0] * (15 - num_locals))
log(f" Found {num_locals} local vars (padded to 15)")
# least recent to most recent stack values: # least recent to most recent stack values:
stack_values = [] stack_values = []
@ -230,16 +237,16 @@ class QuetzalParser:
stack_values.append(val) stack_values.append(val)
log(f" Found {len(stack_values)} local stack values") log(f" Found {len(stack_values)} local stack values")
### Interesting... the reconstructed stack frames have no 'start # return_pc belongs on the CALLER frame (previous in the stack).
### address'. I guess it doesn't matter, since we only need to # When this routine finishes, finish_routine() returns
### pop back to particular return addresses to resume each # caller.program_counter as the resume address.
### routine. prev_frame = stackmanager._call_stack[-1]
prev_frame.program_counter = return_pc
### TODO: I can exactly which of the 7 args is "supplied", but I
### don't understand where the args *are*??
# store_var (varnum) is the variable that receives the return
# value when this routine finishes — NOT the return PC.
routine = zstackmanager.ZRoutine( routine = zstackmanager.ZRoutine(
0, return_pc, self._zmachine._mem, [], local_vars, stack_values 0, store_var, self._zmachine._mem, [], local_vars, stack_values
) )
stackmanager.push_routine(routine) stackmanager.push_routine(routine)
log(" Added new frame to stack.") log(" Added new frame to stack.")
@ -248,6 +255,12 @@ class QuetzalParser:
raise QuetzalStackFrameOverflow raise QuetzalStackFrameOverflow
self._zmachine._stackmanager = stackmanager self._zmachine._stackmanager = stackmanager
# Update cached references in subsystems that store the stack manager
# (they cache the stack at init time and won't see the replacement)
if hasattr(self._zmachine, "_cpu"):
self._zmachine._cpu._stackmanager = stackmanager
if hasattr(self._zmachine, "_opdecoder"):
self._zmachine._opdecoder._stack = stackmanager
log(" Successfully installed all stack frames.") log(" Successfully installed all stack frames.")
def _parse_intd(self, data): def _parse_intd(self, data):
@ -294,6 +307,71 @@ class QuetzalParser:
debugging and test verification.""" debugging and test verification."""
return self._last_loaded_metadata return self._last_loaded_metadata
def load_from_bytes(self, data):
"""Parse Quetzal data from raw bytes (instead of a file).
Used by op_restore when filesystem.restore_game() returns raw bytes.
"""
import io
self._last_loaded_metadata = {}
if len(data) < 12:
raise QuetzalUnrecognizedFileFormat
# Validate FORM header
if data[0:4] != b"FORM":
raise QuetzalUnrecognizedFileFormat
# Read total length
self._len = (data[4] << 24) + (data[5] << 16) + (data[6] << 8) + data[7]
log(f"Total length of FORM data is {self._len}")
self._last_loaded_metadata["total length"] = self._len
# Validate IFZS type
if data[8:12] != b"IFZS":
raise QuetzalUnrecognizedFileFormat
# Create a BytesIO object to use with chunk module
self._file = io.BytesIO(data)
self._file.seek(12) # Skip FORM header, length, and IFZS type
log("Parsing chunks from byte data")
try:
while 1:
c = chunk.Chunk(self._file)
chunkname = c.getname()
chunksize = c.getsize()
chunk_data = c.read(chunksize)
log(f"** Found chunk ID {chunkname}: length {chunksize}")
self._last_loaded_metadata[chunkname] = chunksize
if chunkname == b"IFhd":
self._parse_ifhd(chunk_data)
elif chunkname == b"CMem":
self._parse_cmem(chunk_data)
elif chunkname == b"UMem":
self._parse_umem(chunk_data)
elif chunkname == b"Stks":
self._parse_stks(chunk_data)
elif chunkname == b"IntD":
self._parse_intd(chunk_data)
elif chunkname == b"AUTH":
self._parse_auth(chunk_data)
elif chunkname == b"(c) ":
self._parse_copyright(chunk_data)
elif chunkname == b"ANNO":
self._parse_anno(chunk_data)
else:
# spec says to ignore and skip past unrecognized chunks
pass
except EOFError:
pass
self._file.close()
log("Finished parsing Quetzal data.")
def load(self, savefile_path): def load(self, savefile_path):
"""Parse each chunk of the Quetzal file at SAVEFILE_PATH, """Parse each chunk of the Quetzal file at SAVEFILE_PATH,
initializing associated zmachine subsystems as needed.""" initializing associated zmachine subsystems as needed."""
@ -377,86 +455,211 @@ class QuetzalWriter:
"""Return a chunk of type IFhd, containing metadata about the """Return a chunk of type IFhd, containing metadata about the
zmachine and story being played.""" zmachine and story being played."""
### TODO: write this. payload must be *exactly* 13 bytes, even if mem = self._zmachine._mem
### it means padding the program counter.
### Some old infocom games don't have checksums stored in header. # Release number (2 bytes, big-endian) from header bytes 2-3
### If not, generate it from the *original* story file memory release = mem.read_word(2)
### image and put it into this chunk. See ZMemory.generate_checksum().
pass
return "0" # Serial number (6 bytes) from header bytes 0x12-0x17
serial = bytes(mem[0x12:0x18])
# Checksum (2 bytes, big-endian) from header bytes 0x1C-0x1D
checksum = mem.read_word(0x1C)
# Program counter (3 bytes, big-endian) - current PC
pc = self._zmachine._cpu._program_counter
# Build the 13-byte chunk
chunk_data = bytearray(13)
# Bytes 0-1: Release number
chunk_data[0] = (release >> 8) & 0xFF
chunk_data[1] = release & 0xFF
# Bytes 2-7: Serial number
chunk_data[2:8] = serial
# Bytes 8-9: Checksum
chunk_data[8] = (checksum >> 8) & 0xFF
chunk_data[9] = checksum & 0xFF
# Bytes 10-12: Program counter (24-bit)
chunk_data[10] = (pc >> 16) & 0xFF
chunk_data[11] = (pc >> 8) & 0xFF
chunk_data[12] = pc & 0xFF
return bytes(chunk_data)
def _generate_cmem_chunk(self): def _generate_cmem_chunk(self):
"""Return a compressed chunk of data representing the compressed """Return a compressed chunk of data representing the compressed
image of the zmachine's main memory.""" image of the zmachine's main memory."""
### TODO: debug this when ready pmem = self._zmachine._pristine_mem
return "0" cmem = self._zmachine._mem
# XOR the original game image with the current one # XOR current dynamic memory with pristine dynamic memory
diffarray = list(self._zmachine._pristine_mem) dynamic_start = pmem._dynamic_start
for index in range(len(self._zmachine._pristine_mem._total_size)): dynamic_end = pmem._dynamic_end
diffarray[index] = ( diffarray = []
self._zmachine._pristine_mem[index] ^ self._zmachine._mem[index]
)
log(f"XOR array is {diffarray}")
# Run-length encode the resulting list of 0's and 1's. for index in range(dynamic_start, dynamic_end + 1):
xor_value = pmem[index] ^ cmem[index]
diffarray.append(xor_value)
log(f"Generated XOR array of {len(diffarray)} bytes")
# Run-length encode the XOR result
result = [] result = []
zerocounter = 0 zerocounter = 0
for index in range(len(diffarray)):
if diffarray[index] == 0: for byte in diffarray:
if byte == 0:
zerocounter += 1 zerocounter += 1
continue
else: else:
if zerocounter > 0: # Flush any pending zeros
result.append(0) while zerocounter > 0:
result.append(zerocounter) # Encode: 0x00 followed by count of ADDITIONAL zeros
zerocounter = 0 # Maximum count in one byte is 255, meaning 256 zeros total (1+255)
result.append(diffarray[index]) if zerocounter > 256:
return result result.append(0x00)
result.append(0xFF) # 1 + 255 = 256 zeros
zerocounter -= 256
else:
result.append(0x00)
result.append(zerocounter - 1) # count of additional zeros
zerocounter = 0
# Output non-zero byte
result.append(byte)
# Don't encode trailing zeros - parser can leave them as-is
# (per spec: "If memcounter finishes less than memlen, that's totally fine")
log(f"Compressed to {len(result)} bytes")
return bytes(result)
def _generate_stks_chunk(self): def _generate_stks_chunk(self):
"""Return a stacks chunk, describing the stack state of the """Return a stacks chunk, describing the stack state of the
zmachine at this moment.""" zmachine at this moment."""
### TODO: write this result = bytearray()
return "0" stackmanager = self._zmachine._stackmanager
call_stack = stackmanager._call_stack
# Skip the ZStackBottom sentinel (first element)
for i, frame in enumerate(call_stack[1:], start=1):
num_local_vars = len(frame.local_vars)
# Quetzal return_pc = caller's saved program counter.
# The previous frame (caller) stores the resume PC that
# finish_routine() returns when this frame exits.
prev_frame = call_stack[i - 1]
return_pc = prev_frame.program_counter or 0
result.append((return_pc >> 16) & 0xFF)
result.append((return_pc >> 8) & 0xFF)
result.append(return_pc & 0xFF)
# Write flags byte (bits 0-3 = num local vars,
# bit 4 = discard return value)
flags = num_local_vars & 0x0F
if frame.return_addr is None:
flags |= 0x10
result.append(flags)
# Write varnum (which variable gets the return value)
varnum = frame.return_addr if frame.return_addr is not None else 0
result.append(varnum & 0xFF)
# Write argflag (bitmask of supplied arguments)
# TODO: track this properly, for now use 0
result.append(0)
# Write eval_stack_size as 16-bit big-endian
eval_stack_size = len(frame.stack)
result.append((eval_stack_size >> 8) & 0xFF)
result.append(eval_stack_size & 0xFF)
# Write local variables (16-bit big-endian each)
for local_var in frame.local_vars:
result.append((local_var >> 8) & 0xFF)
result.append(local_var & 0xFF)
# Write evaluation stack values (16-bit big-endian each)
for stack_val in frame.stack:
result.append((stack_val >> 8) & 0xFF)
result.append(stack_val & 0xFF)
return bytes(result)
def _generate_anno_chunk(self): def _generate_anno_chunk(self):
"""Return an annotation chunk, containing metadata about the ZVM """Return an annotation chunk, containing metadata about the ZVM
interpreter which created the savefile.""" interpreter which created the savefile."""
### TODO: write this ### TODO: write this
return "0" return b"0"
# --------- Public APIs ----------- # --------- Public APIs -----------
def generate_save_data(self):
"""Generate complete Quetzal save data as bytes (IFF/FORM/IFZS container).
Returns bytes representing the complete save file in IFF format.
"""
log("Generating Quetzal save data")
# Generate all chunks
ifhd_chunk = self._generate_ifhd_chunk()
cmem_chunk = self._generate_cmem_chunk()
stks_chunk = self._generate_stks_chunk()
anno_chunk = self._generate_anno_chunk()
# Build IFF container with proper chunk headers
result = bytearray()
# Helper to write a chunk with its header
def write_chunk(chunk_id, chunk_data):
result.extend(chunk_id.encode("ascii"))
size = len(chunk_data)
result.append((size >> 24) & 0xFF)
result.append((size >> 16) & 0xFF)
result.append((size >> 8) & 0xFF)
result.append(size & 0xFF)
result.extend(chunk_data)
# IFF chunks must be padded to even byte boundaries
if size % 2 == 1:
result.append(0) # padding byte
log(f" Added {chunk_id} chunk ({size} bytes)")
# Write nested chunks
write_chunk("IFhd", ifhd_chunk)
write_chunk("CMem", cmem_chunk)
write_chunk("Stks", stks_chunk)
write_chunk("ANNO", anno_chunk)
# Calculate total size (everything after FORM header + size field)
total_size = len(result) + 4 # +4 for "IFZS"
# Build final FORM container
container = bytearray()
container.extend(b"FORM")
container.append((total_size >> 24) & 0xFF)
container.append((total_size >> 16) & 0xFF)
container.append((total_size >> 8) & 0xFF)
container.append(total_size & 0xFF)
container.extend(b"IFZS")
container.extend(result)
log(f"Generated {len(container)} bytes of save data")
return bytes(container)
def write(self, savefile_path): def write(self, savefile_path):
"""Write the current zmachine state to a new Quetzal-file at """Write the current zmachine state to a new Quetzal-file at
SAVEFILE_PATH.""" SAVEFILE_PATH."""
log(f"Attempting to write game-state to '{savefile_path}'") log(f"Attempting to write game-state to '{savefile_path}'")
self._file = open(savefile_path, "w") # noqa: SIM115 data = self.generate_save_data()
ifhd_chunk = self._generate_ifhd_chunk() with open(savefile_path, "wb") as f:
cmem_chunk = self._generate_cmem_chunk() f.write(data)
stks_chunk = self._generate_stks_chunk()
anno_chunk = self._generate_anno_chunk()
_total_chunk_size = (
len(ifhd_chunk) + len(cmem_chunk) + len(stks_chunk) + len(anno_chunk)
)
# Write main FORM chunk to hold other chunks
self._file.write("FORM")
### TODO: self._file_write(total_chunk_size) -- spread it over 4 bytes
self._file.write("IFZS")
# Write nested chunks.
for chunk_data in (ifhd_chunk, cmem_chunk, stks_chunk, anno_chunk):
self._file.write(chunk_data)
log("Wrote a chunk.")
self._file.close()
log("Done writing game-state to savefile.") log("Done writing game-state to savefile.")

View file

@ -44,7 +44,16 @@ class ZCpuRestart(ZCpuError):
class ZCpu: class ZCpu:
def __init__( def __init__(
self, zmem, zopdecoder, zstack, zobjects, zstring, zstreammanager, zui, zlexer self,
zmem,
zopdecoder,
zstack,
zobjects,
zstring,
zstreammanager,
zui,
zlexer,
zmachine=None,
): ):
self._memory = zmem self._memory = zmem
self._opdecoder = zopdecoder self._opdecoder = zopdecoder
@ -54,8 +63,14 @@ class ZCpu:
self._streammanager = zstreammanager self._streammanager = zstreammanager
self._ui = zui self._ui = zui
self._lexer = zlexer self._lexer = zlexer
self._zmachine = zmachine
self._trace = deque(maxlen=20) self._trace = deque(maxlen=20)
@property
def _program_counter(self):
"""Return the current program counter value."""
return self._opdecoder.program_counter
def _get_handler(self, opcode_class, opcode_number): def _get_handler(self, opcode_class, opcode_number):
try: try:
opcode_decl = self.opcodes[opcode_class][opcode_number] opcode_decl = self.opcodes[opcode_class][opcode_number]
@ -221,8 +236,7 @@ class ZCpu:
except (ZCpuQuit, ZCpuRestart): except (ZCpuQuit, ZCpuRestart):
# Normal control flow - don't dump trace # Normal control flow - don't dump trace
raise raise
except ZCpuError: except Exception:
# All other ZCpu errors - dump trace for debugging
self._dump_trace() self._dump_trace()
raise raise
return True return True
@ -543,9 +557,26 @@ class ZCpu:
def op_save(self, *args): def op_save(self, *args):
"""Save game state to file (V3 - branch on success). """Save game state to file (V3 - branch on success).
Currently always fails because QuetzalWriter is not yet functional. Uses QuetzalWriter to generate save data in IFF/FORM/IFZS format,
then calls the filesystem to write it. Branches true on success,
false on failure.
""" """
self._branch(False) if self._zmachine is None:
# Can't save without zmachine reference
self._branch(False)
return
from .quetzal import QuetzalWriter
try:
writer = QuetzalWriter(self._zmachine)
save_data = writer.generate_save_data()
success = self._ui.filesystem.save_game(save_data)
self._branch(success)
except Exception as e:
# Any error during save process = failure
log(f"Save failed with exception: {e}")
self._branch(False)
def op_save_v4(self, *args): def op_save_v4(self, *args):
"""TODO: Write docstring here.""" """TODO: Write docstring here."""
@ -554,10 +585,41 @@ class ZCpu:
def op_restore(self, *args): def op_restore(self, *args):
"""Restore game state from file (V3 - branch on success). """Restore game state from file (V3 - branch on success).
Currently always fails because QuetzalWriter is not yet functional, Uses QuetzalParser to load save data from filesystem,
so there are no valid save files to restore. validates it matches current story, and restores memory/stack/PC.
Branches true on success, false on failure.
""" """
self._branch(False) if self._zmachine is None:
# Can't restore without zmachine reference
self._branch(False)
return
from .quetzal import QuetzalParser
try:
# Get save data from filesystem
save_data = self._ui.filesystem.restore_game()
if save_data is None:
# User cancelled or no save file available
self._branch(False)
return
# Parse the save data
parser = QuetzalParser(self._zmachine)
parser.load_from_bytes(save_data)
# QuetzalParser already:
# - Validated IFhd matches current story (release/serial/checksum)
# - Replaced dynamic memory via _parse_cmem or _parse_umem
# - Replaced stack manager via _parse_stks
# - Set program counter via _parse_ifhd
# Success!
self._branch(True)
except Exception as e:
# Any error during restore process = failure
log(f"Restore failed with exception: {e}")
self._branch(False)
def op_restore_v4(self, *args): def op_restore_v4(self, *args):
"""TODO: Write docstring here.""" """TODO: Write docstring here."""

View file

@ -44,6 +44,7 @@ class ZMachine:
self._stream_manager, self._stream_manager,
self._ui, self._ui,
self._lexer, self._lexer,
zmachine=self,
) )
# --------- Public APIs ----------- # --------- Public APIs -----------

View file

@ -184,10 +184,15 @@ class ZMemory:
def _check_bounds(self, index): def _check_bounds(self, index):
if isinstance(index, slice): if isinstance(index, slice):
start, stop = index.start, index.stop start, stop = index.start, index.stop
# For slices, stop can be _total_size since slicing is exclusive
if not (
(0 <= start < self._total_size) and (0 <= stop <= self._total_size)
):
raise ZMemoryOutOfBounds
else: else:
start, stop = index, index start, stop = index, index
if not ((0 <= start < self._total_size) and (0 <= stop < self._total_size)): if not ((0 <= start < self._total_size) and (0 <= stop < self._total_size)):
raise ZMemoryOutOfBounds raise ZMemoryOutOfBounds
def _check_static(self, index): def _check_static(self, index):
"""Throw error if INDEX is within the static-memory area.""" """Throw error if INDEX is within the static-memory area."""

View file

@ -153,6 +153,11 @@ class ZStackManager:
"Remove and return value from the top of the data stack." "Remove and return value from the top of the data stack."
current_routine = self._call_stack[-1] current_routine = self._call_stack[-1]
if not current_routine.stack:
frame_idx = len(self._call_stack) - 1
ra = getattr(current_routine, "return_addr", "N/A")
pc = getattr(current_routine, "program_counter", 0)
raise ZStackPopError(f"frame {frame_idx}, return_addr={ra}, pc=0x{pc:06x}")
return current_routine.stack.pop() return current_routine.stack.pop()
def get_stack_frame_index(self): def get_stack_frame_index(self):

329
tests/test_embedded_if.py Normal file
View file

@ -0,0 +1,329 @@
"""Tests for embedded z-machine MUD integration.
Tests the MUD UI components and EmbeddedIFSession integration with real zork1.z3.
"""
import threading
from dataclasses import dataclass
from pathlib import Path
import pytest
from mudlib.zmachine.mud_ui import MudFilesystem, MudInputStream, MudScreen
ZORK_PATH = Path(__file__).parent.parent / "content" / "stories" / "zork1.z3"
requires_zork = pytest.mark.skipif(not ZORK_PATH.exists(), reason="zork1.z3 not found")
@dataclass
class MockWriter:
def write(self, data):
pass
async def drain(self):
pass
# Unit tests for MUD UI components
def test_mud_screen_captures_output():
"""MudScreen captures written text and flush returns it."""
screen = MudScreen()
screen.write("Hello ")
screen.write("world!")
output = screen.flush()
assert output == "Hello world!"
def test_mud_screen_flush_clears_buffer():
"""MudScreen flush clears buffer, second flush returns empty."""
screen = MudScreen()
screen.write("test")
first = screen.flush()
assert first == "test"
second = screen.flush()
assert second == ""
def test_mud_input_stream_feed_and_read():
"""MudInputStream feed and read_line work with threading."""
stream = MudInputStream()
result = []
def reader():
result.append(stream.read_line())
t = threading.Thread(target=reader)
t.start()
# Wait for stream to signal it's waiting
stream._waiting.wait(timeout=2)
stream.feed("hello")
t.join(timeout=2)
assert result == ["hello"]
def test_mud_filesystem_save_restore(tmp_path):
"""MudFilesystem save and restore bytes correctly."""
save_path = tmp_path / "test.qzl"
filesystem = MudFilesystem(save_path)
test_data = b"\x01\x02\x03\x04\x05"
success = filesystem.save_game(test_data)
assert success
assert save_path.exists()
restored = filesystem.restore_game()
assert restored == test_data
# Integration tests with real zork1.z3
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_start():
"""EmbeddedIFSession starts and returns intro containing game info."""
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
player = Player(name="tester", x=5, y=5, writer=mock_writer)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
# Clean up any existing save to get a fresh start
if session.save_path.exists():
session.save_path.unlink()
intro = await session.start()
assert intro is not None
assert len(intro) > 0
# Intro should contain game title or location
assert "ZORK" in intro or "West of House" in intro
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_handle_input():
"""EmbeddedIFSession handles input and returns response."""
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
player = Player(name="tester", x=5, y=5, writer=mock_writer)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
# Clean up any existing save to get a fresh start
if session.save_path.exists():
session.save_path.unlink()
await session.start()
response = await session.handle_input("look")
assert response is not None
assert response.done is False
assert len(response.output) > 0
# Looking should describe the starting location
assert "West of House" in response.output or "house" in response.output
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_escape_help():
"""EmbeddedIFSession ::help returns help text."""
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
player = Player(name="tester", x=5, y=5, writer=mock_writer)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
await session.start()
response = await session.handle_input("::help")
assert response.done is False
assert "::quit" in response.output
assert "::save" in response.output
assert "::help" in response.output
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_escape_quit():
"""EmbeddedIFSession ::quit returns done=True."""
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
player = Player(name="tester", x=5, y=5, writer=mock_writer)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
await session.start()
response = await session.handle_input("::quit")
assert response.done is True
assert "saved" in response.output.lower()
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_location_name():
"""EmbeddedIFSession get_location_name returns location after input."""
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
player = Player(name="tester", x=5, y=5, writer=mock_writer)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
await session.start()
# Send a command to advance game state
await session.handle_input("look")
location = session.get_location_name()
# Location may be None or a string depending on game state
assert location is None or isinstance(location, str)
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_room_objects():
"""EmbeddedIFSession get_room_objects returns a list after start."""
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
player = Player(name="tester", x=5, y=5, writer=mock_writer)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
await session.start()
objects = session.get_room_objects()
assert isinstance(objects, list)
# Zork1 starting location usually has some objects
assert len(objects) >= 0 # May or may not have visible objects initially
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_try_restore_before_thread():
"""_try_restore() is called synchronously before interpreter thread starts."""
from unittest.mock import patch
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
player = Player(name="tester", x=5, y=5, writer=mock_writer)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
# Create a save file
if session.save_path.exists():
session.save_path.unlink()
session.save_path.parent.mkdir(parents=True, exist_ok=True)
# Write a minimal valid save (header only, won't actually restore correctly)
session.save_path.write_bytes(b"FORM\x00\x00\x00\x08IFZSQUTZ\x00\x00\x00\x00")
call_order = []
original_try_restore = session._try_restore
original_run_interpreter = session._run_interpreter
def track_try_restore():
call_order.append("try_restore")
return original_try_restore()
def track_run_interpreter():
call_order.append("run_interpreter")
original_run_interpreter()
with (
patch.object(session, "_try_restore", side_effect=track_try_restore),
patch.object(session, "_run_interpreter", side_effect=track_run_interpreter),
):
await session.start()
# Verify _try_restore was called before _run_interpreter
assert call_order[0] == "try_restore"
assert call_order[1] == "run_interpreter"
await session.stop()
if session.save_path.exists():
session.save_path.unlink()
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_no_restore_without_save():
"""start() does not restore when no save file exists."""
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
player = Player(name="nosaveplayer", writer=mock_writer, x=0, y=0)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
# Ensure no save file exists
if session.save_path.exists():
session.save_path.unlink()
intro = await session.start()
# Should NOT contain restore message
assert "restoring" not in intro.lower()
# Should contain normal game intro
assert "ZORK" in intro or "West of House" in intro
await session.stop()
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_save_and_restore():
"""Save a game, create new session, restore it via start()."""
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
# Start first session
player = Player(name="testplayer", writer=mock_writer, x=0, y=0)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
# Clean up any existing save to get a fresh start
if session.save_path.exists():
session.save_path.unlink()
await session.start()
# Do something to change state
await session.handle_input("open mailbox")
# Save
save_result = await session.handle_input("::save")
assert "saved" in save_result.output.lower()
await session.stop()
# Start new session - should auto-restore via start()
# start() calls _try_restore() BEFORE launching the interpreter thread
session2 = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
intro = await session2.start()
# Should contain restore message prefixed to output
assert "restoring saved game" in intro.lower()
assert "restored" in intro.lower()
# The game state should reflect the restored state
# (location may differ after restore, just verify it works)
response = await session2.handle_input("look")
assert response.output # Should get some output
await session2.stop()
# Clean up save file
if session2.save_path.exists():
session2.save_path.unlink()

View file

@ -70,9 +70,9 @@ async def test_play_enters_if_mode(player):
with patch("mudlib.commands.play.IFSession") as MockIFSession: with patch("mudlib.commands.play.IFSession") as MockIFSession:
MockIFSession.return_value = mock_session MockIFSession.return_value = mock_session
# Ensure story file exists check passes # Use .z5 to test dfrotz path
with patch("mudlib.commands.play._find_story") as mock_find: with patch("mudlib.commands.play._find_story") as mock_find:
mock_find.return_value = "/fake/path/zork1.z3" mock_find.return_value = "/fake/path/zork1.z5"
await cmd_play(player, "zork1") await cmd_play(player, "zork1")
@ -108,8 +108,9 @@ async def test_play_handles_dfrotz_missing(player):
with patch("mudlib.commands.play.IFSession") as MockIFSession: with patch("mudlib.commands.play.IFSession") as MockIFSession:
MockIFSession.return_value = mock_session MockIFSession.return_value = mock_session
# Use .z5 to test dfrotz path
with patch("mudlib.commands.play._find_story") as mock_find: with patch("mudlib.commands.play._find_story") as mock_find:
mock_find.return_value = "/fake/path/zork1.z3" mock_find.return_value = "/fake/path/zork1.z5"
await cmd_play(player, "zork1") await cmd_play(player, "zork1")
@ -130,41 +131,35 @@ async def test_play_handles_dfrotz_missing(player):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_play_restores_save_if_exists(player): async def test_play_restores_save_if_exists(player):
"""Playing restores saved game if save file exists.""" """Playing restores saved game if save file exists (via start())."""
from pathlib import Path
from mudlib.commands.play import cmd_play from mudlib.commands.play import cmd_play
# Mock IFSession # Mock IFSession - restore now happens in start() before thread launches
mock_session = Mock() mock_session = Mock()
mock_session.start = AsyncMock(return_value="Welcome to Zork!") restored_output = (
mock_session._do_restore = AsyncMock( "restoring saved game...\r\nrestored.\r\n\r\n"
return_value="West of House\nYou are standing in an open field." "West of House\nYou are standing in an open field."
) )
mock_session.save_path = Mock(spec=Path) mock_session.start = AsyncMock(return_value=restored_output)
mock_session.save_path.exists = Mock(return_value=True)
with patch("mudlib.commands.play.IFSession") as MockIFSession: with patch("mudlib.commands.play.IFSession") as MockIFSession:
MockIFSession.return_value = mock_session MockIFSession.return_value = mock_session
# Use .z5 to test dfrotz path
with patch("mudlib.commands.play._find_story") as mock_find: with patch("mudlib.commands.play._find_story") as mock_find:
mock_find.return_value = "/fake/path/zork1.z3" mock_find.return_value = "/fake/path/zork1.z5"
await cmd_play(player, "zork1") await cmd_play(player, "zork1")
# Verify restore was called
mock_session._do_restore.assert_called_once()
# Verify session was created and started # Verify session was created and started
mock_session.start.assert_called_once() mock_session.start.assert_called_once()
# Verify mode was pushed # Verify mode was pushed
assert "if" in player.mode_stack assert "if" in player.mode_stack
# Verify restored text was sent # Verify restored text was sent (start() returns full output with restore)
calls = [call[0][0] for call in player.writer.write.call_args_list] calls = [call[0][0] for call in player.writer.write.call_args_list]
full_output = "".join(calls) full_output = "".join(calls)
assert "restoring" in full_output.lower()
assert "West of House" in full_output assert "West of House" in full_output
assert "open field" in full_output assert "open field" in full_output
@ -172,28 +167,22 @@ async def test_play_restores_save_if_exists(player):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_play_no_restore_if_no_save(player): async def test_play_no_restore_if_no_save(player):
"""Playing does not restore if no save file exists.""" """Playing does not restore if no save file exists."""
from pathlib import Path
from mudlib.commands.play import cmd_play from mudlib.commands.play import cmd_play
# Mock IFSession # Mock IFSession
mock_session = Mock() mock_session = Mock()
mock_session.start = AsyncMock(return_value="Welcome to Zork!") mock_session.start = AsyncMock(return_value="Welcome to Zork!")
mock_session._do_restore = AsyncMock(return_value="")
mock_session.save_path = Mock(spec=Path)
mock_session.save_path.exists = Mock(return_value=False)
with patch("mudlib.commands.play.IFSession") as MockIFSession: with patch("mudlib.commands.play.IFSession") as MockIFSession:
MockIFSession.return_value = mock_session MockIFSession.return_value = mock_session
# Use .z5 to test dfrotz path
with patch("mudlib.commands.play._find_story") as mock_find: with patch("mudlib.commands.play._find_story") as mock_find:
mock_find.return_value = "/fake/path/zork1.z3" mock_find.return_value = "/fake/path/zork1.z5"
await cmd_play(player, "zork1") await cmd_play(player, "zork1")
# Verify restore was NOT called
mock_session._do_restore.assert_not_called()
# Verify session was created and started # Verify session was created and started
mock_session.start.assert_called_once() mock_session.start.assert_called_once()

View file

@ -0,0 +1,297 @@
"""Integration tests for Quetzal save/restore round-trip.
Tests that verify the complete save/restore pipeline works end-to-end by
generating save data with QuetzalWriter and restoring it with QuetzalParser.
Field mapping reminder:
- Quetzal return_pc for frame N caller (frame N-1) program_counter
- Quetzal varnum frame.return_addr (store variable for return value)
"""
class TestQuetzalRoundTrip:
"""Test complete save/restore cycle with real zmachine state."""
def test_basic_round_trip(self):
"""Test saving and restoring basic zmachine state."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
# Create a minimal z-machine story file (V3)
story_data = bytearray(8192)
story_data[0] = 3 # Version 3
story_data[0x02:0x04] = [0x12, 0x34] # Release number
story_data[0x04:0x06] = [0x10, 0x00] # High memory start
story_data[0x06:0x08] = [0x08, 0x00] # Initial PC
story_data[0x0E:0x10] = [0x04, 0x00] # Static memory start
story_data[0x0C:0x0E] = [0x02, 0x00] # Global variables start
story_data[0x12:0x18] = b"860101" # Serial number
story_data[0x1C:0x1E] = [0xAB, 0xCD] # Checksum
pristine_mem = ZMemory(bytes(story_data))
current_mem = ZMemory(bytes(story_data))
# Modify some bytes in dynamic memory (before static start at 0x400)
current_mem[0x100] = 0x42
current_mem[0x101] = 0x43
current_mem[0x200] = 0xFF
# Create a stack with one frame
# return_addr=5 means "store return value in local var 4"
stack_manager = ZStackManager(current_mem)
routine = ZRoutine(
start_addr=0x5000,
return_addr=5,
zmem=current_mem,
args=[],
local_vars=[0x0001, 0x0002, 0x0003],
stack=[0x1111, 0x2222],
)
stack_manager.push_routine(routine)
# Set up mock zmachine
zmachine = Mock()
zmachine._pristine_mem = pristine_mem
zmachine._mem = current_mem
zmachine._stackmanager = stack_manager
zmachine._cpu = Mock()
zmachine._cpu._program_counter = 0x0850
zmachine._opdecoder = Mock()
# SAVE
writer = QuetzalWriter(zmachine)
save_data = writer.generate_save_data()
assert save_data[:4] == b"FORM"
assert save_data[8:12] == b"IFZS"
# CORRUPT: Change the zmachine state
current_mem[0x100] = 0x99
current_mem[0x101] = 0x99
current_mem[0x200] = 0x00
stack_manager._call_stack.clear()
from mudlib.zmachine.zstackmanager import ZStackBottom
stack_manager._call_stack.append(ZStackBottom())
zmachine._cpu._program_counter = 0x9999
assert current_mem[0x100] == 0x99
assert len(stack_manager._call_stack) == 1
assert zmachine._cpu._program_counter == 0x9999
# RESTORE
parser = QuetzalParser(zmachine)
parser.load_from_bytes(save_data)
# VERIFY: Memory was restored
assert current_mem[0x100] == 0x42
assert current_mem[0x101] == 0x43
assert current_mem[0x200] == 0xFF
# VERIFY: Stack was restored
restored_stack = zmachine._stackmanager
assert len(restored_stack._call_stack) == 2 # Bottom + one frame
restored_frame = restored_stack._call_stack[1]
# return_addr is the store variable (varnum), not a PC
assert restored_frame.return_addr == 5
assert restored_frame.local_vars[:3] == [0x0001, 0x0002, 0x0003]
assert restored_frame.stack == [0x1111, 0x2222]
# VERIFY: Program counter was restored
assert zmachine._opdecoder.program_counter == 0x0850
def test_round_trip_with_multiple_frames(self):
"""Test save/restore with nested call frames."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
story_data = bytearray(8192)
story_data[0] = 3
story_data[0x02:0x04] = [0x10, 0x00]
story_data[0x04:0x06] = [0x10, 0x00]
story_data[0x06:0x08] = [0x08, 0x00]
story_data[0x0E:0x10] = [0x04, 0x00]
story_data[0x0C:0x0E] = [0x02, 0x00]
story_data[0x12:0x18] = b"860101"
story_data[0x1C:0x1E] = [0x00, 0x00]
pristine_mem = ZMemory(bytes(story_data))
current_mem = ZMemory(bytes(story_data))
# Create nested call frames with proper semantics:
# return_addr = store variable (varnum)
# program_counter = resume PC for when the next frame returns
stack_manager = ZStackManager(current_mem)
routine1 = ZRoutine(
start_addr=0x5000,
return_addr=0, # varnum 0 = push to stack
zmem=current_mem,
args=[],
local_vars=[0xAAAA, 0xBBBB],
stack=[0x1111],
)
stack_manager.push_routine(routine1)
# resume PC in routine1 after routine2 returns
routine1.program_counter = 0x5123
routine2 = ZRoutine(
start_addr=0x6000,
return_addr=3, # varnum 3 = local var 2
zmem=current_mem,
args=[],
local_vars=[0xCCCC],
stack=[0x2222, 0x3333],
)
stack_manager.push_routine(routine2)
zmachine = Mock()
zmachine._pristine_mem = pristine_mem
zmachine._mem = current_mem
zmachine._stackmanager = stack_manager
zmachine._cpu = Mock()
zmachine._cpu._program_counter = 0x0900
zmachine._opdecoder = Mock()
# Save
writer = QuetzalWriter(zmachine)
save_data = writer.generate_save_data()
# Clear stack
stack_manager._call_stack.clear()
from mudlib.zmachine.zstackmanager import ZStackBottom
stack_manager._call_stack.append(ZStackBottom())
# Restore
parser = QuetzalParser(zmachine)
parser.load_from_bytes(save_data)
# Verify both frames restored
restored_stack = zmachine._stackmanager
assert len(restored_stack._call_stack) == 3 # Bottom + two frames
frame1 = restored_stack._call_stack[1]
assert frame1.return_addr == 0 # varnum
assert frame1.local_vars[:2] == [0xAAAA, 0xBBBB]
assert frame1.stack == [0x1111]
# frame1 should have the resume PC for after frame2 returns
assert frame1.program_counter == 0x5123
frame2 = restored_stack._call_stack[2]
assert frame2.return_addr == 3 # varnum
assert frame2.local_vars[:1] == [0xCCCC]
assert frame2.stack == [0x2222, 0x3333]
def test_round_trip_preserves_unchanged_memory(self):
"""Test that unchanged memory bytes are preserved correctly."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
from mudlib.zmachine.zstackmanager import ZStackManager
story_data = bytearray(8192)
story_data[0] = 3
story_data[0x02:0x04] = [0x10, 0x00]
story_data[0x04:0x06] = [0x10, 0x00]
story_data[0x06:0x08] = [0x08, 0x00]
story_data[0x0E:0x10] = [0x04, 0x00]
story_data[0x0C:0x0E] = [0x02, 0x00]
story_data[0x12:0x18] = b"860101"
story_data[0x1C:0x1E] = [0x00, 0x00]
for i in range(0x100, 0x200):
story_data[i] = i & 0xFF
pristine_mem = ZMemory(bytes(story_data))
current_mem = ZMemory(bytes(story_data))
current_mem[0x150] = 0xFF
current_mem[0x180] = 0xAA
stack_manager = ZStackManager(current_mem)
zmachine = Mock()
zmachine._pristine_mem = pristine_mem
zmachine._mem = current_mem
zmachine._stackmanager = stack_manager
zmachine._cpu = Mock()
zmachine._cpu._program_counter = 0x0800
zmachine._opdecoder = Mock()
writer = QuetzalWriter(zmachine)
save_data = writer.generate_save_data()
for i in range(0x100, 0x200):
current_mem[i] = 0x00
parser = QuetzalParser(zmachine)
parser.load_from_bytes(save_data)
for i in range(0x100, 0x200):
if i == 0x150:
assert current_mem[i] == 0xFF
elif i == 0x180:
assert current_mem[i] == 0xAA
else:
assert current_mem[i] == (i & 0xFF)
def test_round_trip_empty_stack(self):
"""Test save/restore with no routine frames (just bottom sentinel)."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
story_data = bytearray(8192)
story_data[0] = 3
story_data[0x02:0x04] = [0x10, 0x00]
story_data[0x04:0x06] = [0x10, 0x00]
story_data[0x06:0x08] = [0x08, 0x00]
story_data[0x0E:0x10] = [0x04, 0x00]
story_data[0x0C:0x0E] = [0x02, 0x00]
story_data[0x12:0x18] = b"860101"
story_data[0x1C:0x1E] = [0x00, 0x00]
pristine_mem = ZMemory(bytes(story_data))
current_mem = ZMemory(bytes(story_data))
current_mem[0x100] = 0x42
stack_manager = ZStackManager(current_mem)
zmachine = Mock()
zmachine._pristine_mem = pristine_mem
zmachine._mem = current_mem
zmachine._stackmanager = stack_manager
zmachine._cpu = Mock()
zmachine._cpu._program_counter = 0x0800
zmachine._opdecoder = Mock()
writer = QuetzalWriter(zmachine)
save_data = writer.generate_save_data()
# Add a dummy frame to verify it gets cleared
dummy = ZRoutine(
start_addr=0x5000,
return_addr=1,
zmem=current_mem,
args=[],
local_vars=[0x9999],
stack=[],
)
stack_manager.push_routine(dummy)
parser = QuetzalParser(zmachine)
parser.load_from_bytes(save_data)
restored_stack = zmachine._stackmanager
assert len(restored_stack._call_stack) == 1
assert current_mem[0x100] == 0x42

356
tests/test_quetzal_stks.py Normal file
View file

@ -0,0 +1,356 @@
"""
Tests for QuetzalWriter._generate_stks_chunk() serialization.
The Stks chunk serializes the Z-machine call stack. Each frame has:
- Bytes 0-2: return_pc (24-bit big-endian) caller's resume PC
- Byte 3: flags (bits 0-3 = num local vars, bit 4 = discard result)
- Byte 4: varnum (which variable gets return value)
- Byte 5: argflag (bitmask of supplied arguments)
- Bytes 6-7: eval_stack_size (16-bit big-endian)
- Next (num_local_vars * 2) bytes: local variables
- Next (eval_stack_size * 2) bytes: evaluation stack values
All multi-byte values are big-endian. Bottom frame has return_pc=0.
Field mapping to runtime:
- return_pc for frame N stored as program_counter on frame N-1 (the caller)
- varnum stored as return_addr on the frame (the store variable)
"""
from unittest import TestCase
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
class MockMemory:
"""Mock memory for testing."""
def __init__(self):
self.version = 5
class MockZMachine:
"""Mock z-machine with stack manager."""
def __init__(self):
self._mem = MockMemory()
self._stackmanager = ZStackManager(self._mem)
class QuetzalStksTests(TestCase):
"""Test suite for Stks chunk generation."""
def setUp(self):
self.zmachine = MockZMachine()
self.writer = QuetzalWriter(self.zmachine)
def test_empty_call_stack_generates_empty_chunk(self):
"""With only the sentinel bottom, should generate empty bytes."""
# Call stack has only ZStackBottom sentinel
chunk = self.writer._generate_stks_chunk()
self.assertEqual(chunk, b"")
def test_single_frame_serialization(self):
"""Single routine frame should serialize correctly."""
# Set up caller resume PC on ZStackBottom
sentinel = self.zmachine._stackmanager._call_stack[0]
sentinel.program_counter = 0x4200
# Create a routine with return_addr = store variable (varnum 5)
routine = ZRoutine(
start_addr=0x5000,
return_addr=5,
zmem=self._mem,
args=[],
local_vars=[0x1234, 0x5678, 0xABCD],
stack=[0x1111, 0x2222],
)
self.zmachine._stackmanager.push_routine(routine)
chunk = self.writer._generate_stks_chunk()
# return_pc comes from ZStackBottom.program_counter (0x4200)
# varnum is frame.return_addr (5)
expected = bytes(
[
0x00,
0x42,
0x00, # return_pc (from caller's program_counter)
0x03, # flags (3 local vars)
0x05, # varnum (store variable)
0x00, # argflag
0x00,
0x02, # eval_stack_size = 2
0x12,
0x34, # local_vars[0]
0x56,
0x78, # local_vars[1]
0xAB,
0xCD, # local_vars[2]
0x11,
0x11, # stack[0]
0x22,
0x22, # stack[1]
]
)
self.assertEqual(chunk, expected)
def test_multiple_frames_serialization(self):
"""Multiple nested frames should serialize in order."""
sentinel = self.zmachine._stackmanager._call_stack[0]
sentinel.program_counter = 0 # main routine has no caller
# Frame 1: outer routine (varnum=0 means push result to stack)
routine1 = ZRoutine(
start_addr=0x1000,
return_addr=0,
zmem=self._mem,
args=[],
local_vars=[0x0001],
stack=[],
)
self.zmachine._stackmanager.push_routine(routine1)
# Set routine1's resume PC (where to go after frame2 returns)
routine1.program_counter = 0x1050
# Frame 2: inner routine (varnum=3)
routine2 = ZRoutine(
start_addr=0x2000,
return_addr=3,
zmem=self._mem,
args=[],
local_vars=[0x0002, 0x0003],
stack=[0xAAAA],
)
self.zmachine._stackmanager.push_routine(routine2)
chunk = self.writer._generate_stks_chunk()
# Frame 1: return_pc from sentinel.pc (0), varnum=0
frame1 = bytes(
[
0x00,
0x00,
0x00, # return_pc = 0 (from sentinel)
0x01, # flags (1 local var)
0x00, # varnum = 0 (push to stack)
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
0x00,
0x01, # local_vars[0]
]
)
# Frame 2: return_pc from routine1.pc (0x1050), varnum=3
frame2 = bytes(
[
0x00,
0x10,
0x50, # return_pc (from routine1.program_counter)
0x02, # flags (2 local vars)
0x03, # varnum = 3
0x00, # argflag
0x00,
0x01, # eval_stack_size = 1
0x00,
0x02, # local_vars[0]
0x00,
0x03, # local_vars[1]
0xAA,
0xAA, # stack[0]
]
)
expected = frame1 + frame2
self.assertEqual(chunk, expected)
def test_frame_with_no_locals_or_stack(self):
"""Frame with no local vars or stack values should serialize correctly."""
sentinel = self.zmachine._stackmanager._call_stack[0]
sentinel.program_counter = 0x2500
routine = ZRoutine(
start_addr=0x3000,
return_addr=1,
zmem=self._mem,
args=[],
local_vars=[],
stack=[],
)
self.zmachine._stackmanager.push_routine(routine)
chunk = self.writer._generate_stks_chunk()
expected = bytes(
[
0x00,
0x25,
0x00, # return_pc (from sentinel.program_counter)
0x00, # flags (0 local vars)
0x01, # varnum = 1
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
]
)
self.assertEqual(chunk, expected)
def test_discard_result_sets_flags_bit4(self):
"""Frame with return_addr=None should set bit 4 in flags byte."""
sentinel = self.zmachine._stackmanager._call_stack[0]
sentinel.program_counter = 0
routine = ZRoutine(
start_addr=0x1000,
return_addr=None,
zmem=self._mem,
args=[],
local_vars=[0x0001, 0x0002],
stack=[],
)
self.zmachine._stackmanager.push_routine(routine)
chunk = self.writer._generate_stks_chunk()
# flags = 0x02 (2 locals) | 0x10 (discard) = 0x12
expected = bytes(
[
0x00,
0x00,
0x00, # return_pc
0x12, # flags (2 locals + discard bit)
0x00, # varnum (0 when discarding)
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
0x00,
0x01, # local_vars[0]
0x00,
0x02, # local_vars[1]
]
)
self.assertEqual(chunk, expected)
def test_round_trip_with_parser(self):
"""Generated stks bytes should parse back identically."""
from mudlib.zmachine.quetzal import QuetzalParser
sentinel = self.zmachine._stackmanager._call_stack[0]
sentinel.program_counter = 0 # main routine caller
# Create a complex stack state
routine1 = ZRoutine(
start_addr=0x1000,
return_addr=0,
zmem=self._mem,
args=[],
local_vars=[0x1111, 0x2222, 0x3333],
stack=[0xAAAA, 0xBBBB],
)
self.zmachine._stackmanager.push_routine(routine1)
routine1.program_counter = 0x1234 # resume PC for after routine2
routine2 = ZRoutine(
start_addr=0x2000,
return_addr=5,
zmem=self._mem,
args=[],
local_vars=[0x4444],
stack=[0xCCCC, 0xDDDD, 0xEEEE],
)
self.zmachine._stackmanager.push_routine(routine2)
# Generate the stks chunk
stks_bytes = self.writer._generate_stks_chunk()
# Parse it back
parser = QuetzalParser(self.zmachine)
parser._parse_stks(stks_bytes)
# Verify the stack was reconstructed correctly
# Parser creates a new stack manager, skip bottom sentinel
call_stack = self.zmachine._stackmanager._call_stack
frames = call_stack[1:]
self.assertEqual(len(frames), 2)
# Check frame 1: return_addr = varnum, caller PC on sentinel
assert isinstance(frames[0], ZRoutine)
self.assertEqual(frames[0].return_addr, 0)
self.assertEqual(frames[0].local_vars[:3], [0x1111, 0x2222, 0x3333])
self.assertEqual(frames[0].stack, [0xAAAA, 0xBBBB])
# Sentinel should have frame1's return_pc (0)
self.assertEqual(call_stack[0].program_counter, 0)
# Check frame 2: return_addr = varnum, caller PC on frame1
assert isinstance(frames[1], ZRoutine)
self.assertEqual(frames[1].return_addr, 5)
self.assertEqual(frames[1].local_vars[:1], [0x4444])
self.assertEqual(frames[1].stack, [0xCCCC, 0xDDDD, 0xEEEE])
# Frame1 should have frame2's return_pc (0x1234)
self.assertEqual(frames[0].program_counter, 0x1234)
def test_parse_return_pc_goes_to_caller(self):
"""Parser should put return_pc on the caller frame's program_counter."""
from mudlib.zmachine.quetzal import QuetzalParser
# Construct a minimal stack frame with return_pc=0x123456
stks_bytes = bytes(
[
0x12,
0x34,
0x56, # return_pc = 0x123456
0x00, # flags (0 local vars)
0x07, # varnum = 7
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
]
)
parser = QuetzalParser(self.zmachine)
parser._parse_stks(stks_bytes)
call_stack = self.zmachine._stackmanager._call_stack
frames = call_stack[1:] # skip sentinel
self.assertEqual(len(frames), 1)
# return_pc goes to caller (sentinel) program_counter
self.assertEqual(call_stack[0].program_counter, 0x123456)
# varnum goes to frame's return_addr
assert isinstance(frames[0], ZRoutine)
self.assertEqual(frames[0].return_addr, 7)
def test_parse_discard_bit_restores_none(self):
"""Parser should set return_addr=None when flags bit 4 is set."""
from mudlib.zmachine.quetzal import QuetzalParser
# flags = 0x12 = 2 locals + discard bit
stks_bytes = bytes(
[
0x00,
0x00,
0x00, # return_pc = 0
0x12, # flags (2 locals + discard)
0x00, # varnum (ignored when discarding)
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
0x00,
0x01, # local_vars[0]
0x00,
0x02, # local_vars[1]
]
)
parser = QuetzalParser(self.zmachine)
parser._parse_stks(stks_bytes)
frames = self.zmachine._stackmanager._call_stack[1:]
self.assertEqual(len(frames), 1)
self.assertIsNone(frames[0].return_addr)
self.assertEqual(frames[0].local_vars[:2], [1, 2])
@property
def _mem(self):
"""Helper to get mock memory."""
return self.zmachine._mem

View file

@ -0,0 +1,406 @@
"""Tests for QuetzalWriter Stks chunk generation.
Field mapping:
- Quetzal return_pc previous frame's program_counter (caller resume PC)
- Quetzal varnum frame.return_addr (store variable for return value)
"""
class TestStksChunkGeneration:
"""Test Stks chunk generation and serialization."""
def test_empty_stack_serialization(self):
"""Test serializing an empty stack (just the bottom sentinel)."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
assert result == b""
def test_single_frame_no_locals_no_stack(self):
"""Test serializing a single routine frame with no locals or stack values."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
# Set caller resume PC on sentinel
stack_manager._call_stack[0].program_counter = 0x1234
routine = ZRoutine(
start_addr=0x5000,
return_addr=7, # varnum: store to local var 6
zmem=zmachine._mem,
args=[],
local_vars=[],
stack=[],
)
stack_manager.push_routine(routine)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
expected = bytes(
[
0x00,
0x12,
0x34, # return_pc (from sentinel.program_counter)
0x00, # flags (0 locals)
0x07, # varnum (frame.return_addr)
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
]
)
assert result == expected
def test_single_frame_with_locals(self):
"""Test serializing a frame with local variables."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
stack_manager._call_stack[0].program_counter = 0x2000
routine = ZRoutine(
start_addr=0x5000,
return_addr=0x10, # varnum: store to global var 0x10
zmem=zmachine._mem,
args=[],
local_vars=[0x1111, 0x2222, 0x3333],
stack=[],
)
stack_manager.push_routine(routine)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
expected = bytes(
[
0x00,
0x20,
0x00, # return_pc (from sentinel.program_counter)
0x03, # flags (3 locals)
0x10, # varnum (frame.return_addr)
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
0x11,
0x11, # local_var[0]
0x22,
0x22, # local_var[1]
0x33,
0x33, # local_var[2]
]
)
assert result == expected
def test_single_frame_with_stack_values(self):
"""Test serializing a frame with evaluation stack values."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
stack_manager._call_stack[0].program_counter = 0x3000
routine = ZRoutine(
start_addr=0x5000,
return_addr=0, # varnum 0: push result to eval stack
zmem=zmachine._mem,
args=[],
local_vars=[],
stack=[0xABCD, 0xEF01],
)
stack_manager.push_routine(routine)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
expected = bytes(
[
0x00,
0x30,
0x00, # return_pc
0x00, # flags (0 locals)
0x00, # varnum (push to stack)
0x00, # argflag
0x00,
0x02, # eval_stack_size = 2
0xAB,
0xCD, # stack[0]
0xEF,
0x01, # stack[1]
]
)
assert result == expected
def test_single_frame_full(self):
"""Test serializing a frame with both locals and stack values."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
stack_manager._call_stack[0].program_counter = 0x4567
routine = ZRoutine(
start_addr=0x5000,
return_addr=2, # varnum: store to local var 1
zmem=zmachine._mem,
args=[],
local_vars=[0x0001, 0x0002],
stack=[0x1000, 0x2000, 0x3000],
)
stack_manager.push_routine(routine)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
expected = bytes(
[
0x00,
0x45,
0x67, # return_pc (from sentinel.program_counter)
0x02, # flags (2 locals)
0x02, # varnum
0x00, # argflag
0x00,
0x03, # eval_stack_size = 3
0x00,
0x01, # local_var[0]
0x00,
0x02, # local_var[1]
0x10,
0x00, # stack[0]
0x20,
0x00, # stack[1]
0x30,
0x00, # stack[2]
]
)
assert result == expected
def test_multiple_nested_frames(self):
"""Test serializing multiple nested routine frames."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
# Sentinel has return PC for frame 1
stack_manager._call_stack[0].program_counter = 0x1000
routine1 = ZRoutine(
start_addr=0x5000,
return_addr=0, # varnum: push to stack
zmem=zmachine._mem,
args=[],
local_vars=[0xAAAA],
stack=[0xBBBB],
)
stack_manager.push_routine(routine1)
# Frame1 has return PC for frame 2
routine1.program_counter = 0x2000
routine2 = ZRoutine(
start_addr=0x6000,
return_addr=5, # varnum: store to local var 4
zmem=zmachine._mem,
args=[],
local_vars=[0xCCCC],
stack=[0xDDDD],
)
stack_manager.push_routine(routine2)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
expected = bytes(
[
# Frame 1: return_pc from sentinel (0x1000), varnum=0
0x00,
0x10,
0x00, # return_pc
0x01, # flags (1 local)
0x00, # varnum
0x00, # argflag
0x00,
0x01, # eval_stack_size = 1
0xAA,
0xAA, # local_var[0]
0xBB,
0xBB, # stack[0]
# Frame 2: return_pc from routine1 (0x2000), varnum=5
0x00,
0x20,
0x00, # return_pc
0x01, # flags (1 local)
0x05, # varnum
0x00, # argflag
0x00,
0x01, # eval_stack_size = 1
0xCC,
0xCC, # local_var[0]
0xDD,
0xDD, # stack[0]
]
)
assert result == expected
def test_bottom_frame_zero_return_pc(self):
"""Test that a bottom/dummy frame with return_pc=0 is handled correctly."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
# Sentinel PC = 0 (main routine has no caller)
stack_manager._call_stack[0].program_counter = 0
routine = ZRoutine(
start_addr=0x5000,
return_addr=0, # varnum: push to stack
zmem=zmachine._mem,
args=[],
local_vars=[0x1234],
stack=[],
)
stack_manager.push_routine(routine)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
expected = bytes(
[
0x00,
0x00,
0x00, # return_pc = 0
0x01, # flags (1 local)
0x00, # varnum
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
0x12,
0x34, # local_var[0]
]
)
assert result == expected
class TestStksRoundTrip:
"""Test that Stks serialization/deserialization is symmetrical."""
def test_round_trip_serialization(self):
"""Test that we can serialize and deserialize frames correctly."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
original_stack = ZStackManager(zmachine._mem)
# Sentinel has return PC for frame 1
original_stack._call_stack[0].program_counter = 0
routine1 = ZRoutine(
start_addr=0x5000,
return_addr=5, # varnum: store to local var 4
zmem=zmachine._mem,
args=[],
local_vars=[0x0001, 0x0002, 0x0003],
stack=[0x1111, 0x2222],
)
original_stack.push_routine(routine1)
routine1.program_counter = 0x5678 # resume PC for after routine2
routine2 = ZRoutine(
start_addr=0x6000,
return_addr=3, # varnum: store to local var 2
zmem=zmachine._mem,
args=[],
local_vars=[0xAAAA],
stack=[0xBBBB, 0xCCCC, 0xDDDD],
)
original_stack.push_routine(routine2)
zmachine._stackmanager = original_stack
# Serialize
writer = QuetzalWriter(zmachine)
stks_data = writer._generate_stks_chunk()
# Deserialize
parser = QuetzalParser(zmachine)
parser._parse_stks(stks_data)
restored_stack = zmachine._stackmanager
assert len(restored_stack._call_stack) == 3
# Check frame 1: return_addr is varnum
frame1 = restored_stack._call_stack[1]
assert frame1.return_addr == 5
assert frame1.local_vars[:3] == [0x0001, 0x0002, 0x0003]
assert frame1.stack == [0x1111, 0x2222]
# Frame1's program_counter was set from frame2's return_pc
assert frame1.program_counter == 0x5678
# Check frame 2
frame2 = restored_stack._call_stack[2]
assert frame2.return_addr == 3
assert frame2.local_vars[:1] == [0xAAAA]
assert frame2.stack == [0xBBBB, 0xCCCC, 0xDDDD]

View file

@ -50,6 +50,10 @@ class MockStackManager:
def __init__(self): def __init__(self):
self.stack = [] self.stack = []
self.locals = [0] * 15 self.locals = [0] * 15
# For QuetzalWriter support - empty call stack
from mudlib.zmachine.zstackmanager import ZStackBottom
self._call_stack = [ZStackBottom()]
def push_stack(self, value): def push_stack(self, value):
self.stack.append(value) self.stack.append(value)
@ -92,6 +96,7 @@ class MockUI:
self.screen.write = Mock() self.screen.write = Mock()
self.keyboard_input = Mock() self.keyboard_input = Mock()
self.keyboard_input.read_line = Mock() self.keyboard_input.read_line = Mock()
self.filesystem = Mock()
class ZMachineOpcodeTests(TestCase): class ZMachineOpcodeTests(TestCase):
@ -114,6 +119,7 @@ class ZMachineOpcodeTests(TestCase):
Mock(), # stream manager Mock(), # stream manager
self.ui, self.ui,
Mock(), # lexer Mock(), # lexer
zmachine=None,
) )
def test_op_nop(self): def test_op_nop(self):
@ -451,6 +457,7 @@ class ZMachineObjectOpcodeTests(TestCase):
Mock(), # stream manager Mock(), # stream manager
self.ui, self.ui,
Mock(), # lexer Mock(), # lexer
zmachine=None,
) )
def test_op_get_sibling_with_sibling(self): def test_op_get_sibling_with_sibling(self):
@ -661,6 +668,7 @@ class ZMachineComplexOpcodeTests(TestCase):
Mock(), # stream manager Mock(), # stream manager
self.ui, self.ui,
Mock(), # lexer Mock(), # lexer
zmachine=None,
) )
def test_op_sread_v3_basic_input(self): def test_op_sread_v3_basic_input(self):
@ -746,8 +754,27 @@ class ZMachineComplexOpcodeTests(TestCase):
# Should have called show_status once # Should have called show_status once
self.assertEqual(call_count[0], 1) self.assertEqual(call_count[0], 1)
def test_op_save_v3_branches_false(self): def test_op_save_v3_branches_false_when_filesystem_fails(self):
"""Test save (V3) branches false (QuetzalWriter not functional).""" """Test save (V3) branches false when filesystem returns False."""
# Need a valid zmachine for the test to proceed
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
zmachine_mock = Mock()
zmachine_mock._mem = ZMemory(bytes(story))
zmachine_mock._pristine_mem = ZMemory(bytes(story))
zmachine_mock._cpu = self.cpu
zmachine_mock._stackmanager = self.stack
self.cpu._zmachine = zmachine_mock
# Mock filesystem to fail
self.ui.filesystem.save_game = Mock(return_value=False)
self.decoder.branch_condition = True self.decoder.branch_condition = True
self.decoder.branch_offset = 100 self.decoder.branch_offset = 100
old_pc = self.cpu._opdecoder.program_counter old_pc = self.cpu._opdecoder.program_counter
@ -757,8 +784,26 @@ class ZMachineComplexOpcodeTests(TestCase):
# Should not have branched (test is false) # Should not have branched (test is false)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc) self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
def test_op_restore_v3_branches_false(self): def test_op_restore_v3_branches_false_when_filesystem_returns_none(self):
"""Test restore (V3) branches false (no valid save files).""" """Test restore (V3) branches false when filesystem returns None."""
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
zmachine_mock = Mock()
zmachine_mock._mem = ZMemory(bytes(story))
zmachine_mock._pristine_mem = ZMemory(bytes(story))
zmachine_mock._cpu = self.cpu
zmachine_mock._stackmanager = self.stack
self.cpu._zmachine = zmachine_mock
# Mock filesystem to return None (user cancelled or no file)
self.ui.filesystem.restore_game = Mock(return_value=None)
self.decoder.branch_condition = True self.decoder.branch_condition = True
self.decoder.branch_offset = 100 self.decoder.branch_offset = 100
old_pc = self.cpu._opdecoder.program_counter old_pc = self.cpu._opdecoder.program_counter
@ -768,6 +813,256 @@ class ZMachineComplexOpcodeTests(TestCase):
# Should not have branched (test is false) # Should not have branched (test is false)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc) self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
def test_op_restore_v3_branches_false_when_no_zmachine(self):
"""Test restore (V3) branches false when zmachine is not set."""
self.cpu._zmachine = None
self.decoder.branch_condition = True
self.decoder.branch_offset = 100
old_pc = self.cpu._opdecoder.program_counter
self.cpu.op_restore()
# Should not have branched (test is false)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
def test_op_restore_v3_branches_true_on_success(self):
"""Test restore (V3) branches true when restore succeeds."""
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
# Create a story with some dynamic memory
story = bytearray(1024)
story[0] = 3 # version 3
story[0x0E] = 0x04 # static memory starts at 0x0400
story[0x0F] = 0x00
# Set header values
story[0x02] = 0x12 # release high byte
story[0x03] = 0x34 # release low byte
for i, byte in enumerate(b"860509"):
story[0x12 + i] = byte
story[0x1C] = 0xAB # checksum high
story[0x1D] = 0xCD # checksum low
# Create zmachine with modified memory state
zmachine_mock = Mock()
zmachine_mock._mem = ZMemory(bytes(story))
zmachine_mock._pristine_mem = ZMemory(bytes(story))
zmachine_mock._cpu = self.cpu
zmachine_mock._stackmanager = self.stack
# Modify some dynamic memory to create a save state
zmachine_mock._mem[0x100] = 0x42
zmachine_mock._mem[0x200] = 0x99
self.cpu._zmachine = zmachine_mock
# Generate save data using QuetzalWriter
writer = QuetzalWriter(zmachine_mock)
save_data = writer.generate_save_data()
# Mock filesystem to return the save data
self.ui.filesystem.restore_game = Mock(return_value=save_data)
self.decoder.branch_condition = True
self.decoder.branch_offset = 10
old_pc = self.cpu._opdecoder.program_counter
self.cpu.op_restore()
# Should have branched (test is true)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc + 8)
def test_op_restore_v3_restores_memory_state(self):
"""Test restore (V3) correctly restores dynamic memory."""
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
# Create a story
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
story[0x02] = 0x12
story[0x03] = 0x34
for i, byte in enumerate(b"860509"):
story[0x12 + i] = byte
story[0x1C] = 0xAB
story[0x1D] = 0xCD
# Create zmachine with saved state
saved_zmachine = Mock()
saved_zmachine._mem = ZMemory(bytes(story))
saved_zmachine._pristine_mem = ZMemory(bytes(story))
saved_zmachine._cpu = self.cpu
saved_zmachine._stackmanager = self.stack
# Modify memory to create unique save state
saved_zmachine._mem[0x50] = 0xAA
saved_zmachine._mem[0x150] = 0xBB
saved_zmachine._mem[0x250] = 0xCC
# Generate save data
writer = QuetzalWriter(saved_zmachine)
save_data = writer.generate_save_data()
# Create fresh zmachine with different state
current_zmachine = Mock()
current_zmachine._mem = ZMemory(bytes(story))
current_zmachine._pristine_mem = ZMemory(bytes(story))
current_zmachine._cpu = self.cpu
current_zmachine._stackmanager = self.stack
# Different values in memory
current_zmachine._mem[0x50] = 0x11
current_zmachine._mem[0x150] = 0x22
current_zmachine._mem[0x250] = 0x33
self.cpu._zmachine = current_zmachine
# Mock filesystem to return save data
self.ui.filesystem.restore_game = Mock(return_value=save_data)
self.decoder.branch_condition = True
self.decoder.branch_offset = 10
self.cpu.op_restore()
# Memory should now match saved state
self.assertEqual(current_zmachine._mem[0x50], 0xAA)
self.assertEqual(current_zmachine._mem[0x150], 0xBB)
self.assertEqual(current_zmachine._mem[0x250], 0xCC)
def test_op_restore_v3_branches_false_on_malformed_data(self):
"""Test restore (V3) branches false when save data is malformed."""
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
zmachine_mock = Mock()
zmachine_mock._mem = ZMemory(bytes(story))
zmachine_mock._pristine_mem = ZMemory(bytes(story))
zmachine_mock._cpu = self.cpu
zmachine_mock._stackmanager = self.stack
self.cpu._zmachine = zmachine_mock
# Mock filesystem to return invalid data
self.ui.filesystem.restore_game = Mock(return_value=b"invalid data")
self.decoder.branch_condition = True
self.decoder.branch_offset = 10
old_pc = self.cpu._opdecoder.program_counter
self.cpu.op_restore()
# Should not have branched (test is false)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
def test_op_save_v3_branches_true_on_success(self):
"""Test save (V3) branches true when filesystem succeeds."""
# Create minimal zmachine mock
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3 # version 3
story[0x0E] = 0x04 # static memory starts at 0x0400
story[0x0F] = 0x00
zmachine_mock = Mock()
zmachine_mock._mem = ZMemory(bytes(story))
zmachine_mock._pristine_mem = ZMemory(bytes(story))
zmachine_mock._cpu = self.cpu
zmachine_mock._stackmanager = self.stack
# Attach zmachine to cpu
self.cpu._zmachine = zmachine_mock
# Mock filesystem to succeed
self.ui.filesystem.save_game = Mock(return_value=True)
self.decoder.branch_condition = True
self.decoder.branch_offset = 10
old_pc = self.cpu._opdecoder.program_counter
self.cpu.op_save()
# Should have branched (test is true)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc + 8)
# Filesystem should have been called with bytes
self.assertTrue(self.ui.filesystem.save_game.called)
call_args = self.ui.filesystem.save_game.call_args[0]
self.assertIsInstance(call_args[0], bytes)
def test_op_save_v3_generates_valid_iff_data(self):
"""Test save generates valid IFF/FORM/IFZS container."""
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
# Set header values
story[0x02] = 0x12 # release high byte
story[0x03] = 0x34 # release low byte
for i, byte in enumerate(b"860509"):
story[0x12 + i] = byte
story[0x1C] = 0xAB # checksum high
story[0x1D] = 0xCD # checksum low
zmachine_mock = Mock()
zmachine_mock._mem = ZMemory(bytes(story))
zmachine_mock._pristine_mem = ZMemory(bytes(story))
zmachine_mock._cpu = self.cpu
zmachine_mock._stackmanager = self.stack
self.cpu._zmachine = zmachine_mock
# Mock filesystem to capture data
captured_data = []
def capture_save(data):
captured_data.append(data)
return True
self.ui.filesystem.save_game = Mock(side_effect=capture_save)
self.decoder.branch_condition = True
self.decoder.branch_offset = 10
self.cpu.op_save()
# Verify we got data
self.assertEqual(len(captured_data), 1)
data = captured_data[0]
# Verify IFF structure
self.assertEqual(data[0:4], b"FORM") # FORM header
# Bytes 4-7 are the size (big-endian 32-bit)
self.assertEqual(data[8:12], b"IFZS") # IFZS type
# Verify chunks are present
self.assertIn(b"IFhd", data)
self.assertIn(b"CMem", data)
self.assertIn(b"Stks", data)
self.assertIn(b"ANNO", data)
def test_op_save_v3_without_zmachine_branches_false(self):
"""Test save fails gracefully when zmachine is not set."""
# Don't set zmachine
self.cpu._zmachine = None
self.decoder.branch_condition = True
old_pc = self.cpu._opdecoder.program_counter
self.cpu.op_save()
# Should not have branched
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
def test_op_input_stream_is_noop(self): def test_op_input_stream_is_noop(self):
"""Test input_stream is a no-op stub.""" """Test input_stream is a no-op stub."""
# Should not raise # Should not raise
@ -784,6 +1079,204 @@ class ZMachineComplexOpcodeTests(TestCase):
self.cpu.op_restart() self.cpu.op_restart()
class QuetzalWriterTests(TestCase):
"""Test suite for QuetzalWriter save functionality."""
def test_generate_ifhd_chunk(self):
"""Test _generate_ifhd_chunk() produces correct 13-byte IFhd chunk."""
from mudlib.zmachine.quetzal import QuetzalWriter
# Create a mock zmachine with known header values
mock_zmachine = Mock()
mock_zmachine._mem = MockMemory()
# Set header values in memory:
# Bytes 2-3: Release number (0x1234)
mock_zmachine._mem.write_word(0x02, 0x1234)
# Bytes 0x12-0x17: Serial number (6 bytes: "860509")
serial = b"860509"
for i, byte in enumerate(serial):
mock_zmachine._mem[0x12 + i] = byte
# Bytes 0x1C-0x1D: Checksum (0xABCD)
mock_zmachine._mem.write_word(0x1C, 0xABCD)
# Set program counter
mock_cpu = Mock()
mock_cpu._program_counter = 0x123456 # 24-bit PC
mock_zmachine._cpu = mock_cpu
# Create writer and generate chunk
writer = QuetzalWriter(mock_zmachine)
chunk_data = writer._generate_ifhd_chunk()
# Verify chunk is exactly 13 bytes
self.assertEqual(len(chunk_data), 13)
# Verify release number (bytes 0-1)
self.assertEqual(chunk_data[0], 0x12)
self.assertEqual(chunk_data[1], 0x34)
# Verify serial number (bytes 2-7)
for i, expected in enumerate(serial):
self.assertEqual(chunk_data[2 + i], expected)
# Verify checksum (bytes 8-9)
self.assertEqual(chunk_data[8], 0xAB)
self.assertEqual(chunk_data[9], 0xCD)
# Verify program counter (bytes 10-12)
self.assertEqual(chunk_data[10], 0x12)
self.assertEqual(chunk_data[11], 0x34)
self.assertEqual(chunk_data[12], 0x56)
def test_cmem_all_unchanged(self):
"""Test CMem chunk with no changes (all zeros after XOR)."""
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
# Create a minimal z3 story file
story = bytearray(1024)
story[0] = 3 # version 3
story[0x0E] = 0x04 # static memory starts at 0x0400
story[0x0F] = 0x00
pristine = ZMemory(bytes(story))
current = ZMemory(bytes(story))
zmachine = Mock()
zmachine._pristine_mem = pristine
zmachine._mem = current
writer = QuetzalWriter(zmachine)
result = writer._generate_cmem_chunk()
# All identical means no output (trailing zeros omitted)
self.assertIsInstance(result, bytes)
self.assertEqual(len(result), 0)
def test_cmem_single_byte_change(self):
"""Test CMem chunk with one byte changed."""
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
pristine = ZMemory(bytes(story))
current = ZMemory(bytes(story))
current[0x0100] = 0x42
zmachine = Mock()
zmachine._pristine_mem = pristine
zmachine._mem = current
writer = QuetzalWriter(zmachine)
result = writer._generate_cmem_chunk()
self.assertIsInstance(result, bytes)
self.assertIn(0x42, result)
def test_cmem_multiple_scattered_changes(self):
"""Test CMem chunk with multiple changes across memory."""
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
pristine = ZMemory(bytes(story))
current = ZMemory(bytes(story))
current[0x0010] = 0xAA
current[0x0100] = 0xBB
current[0x0200] = 0xCC
zmachine = Mock()
zmachine._pristine_mem = pristine
zmachine._mem = current
writer = QuetzalWriter(zmachine)
result = writer._generate_cmem_chunk()
self.assertIsInstance(result, bytes)
self.assertIn(0xAA, result)
self.assertIn(0xBB, result)
self.assertIn(0xCC, result)
self.assertLess(len(result), 1024)
def test_cmem_roundtrip_with_parser(self):
"""Test that CMem output can be decoded by QuetzalParser._parse_cmem()."""
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
pristine = ZMemory(bytes(story))
current = ZMemory(bytes(story))
current[0x0050] = 0x12
current[0x0051] = 0x34
current[0x0150] = 0xAB
current[0x0300] = 0xFF
zmachine = Mock()
zmachine._pristine_mem = pristine
zmachine._mem = current
writer = QuetzalWriter(zmachine)
compressed_bytes = writer._generate_cmem_chunk()
# Create fresh memory for parsing into
restored = ZMemory(bytes(story))
restored_zmachine = Mock()
restored_zmachine._pristine_mem = pristine
restored_zmachine._mem = restored
parser = QuetzalParser(restored_zmachine)
parser._parse_cmem(compressed_bytes)
# Verify restored memory matches current memory
for addr in [0x0050, 0x0051, 0x0150, 0x0300]:
self.assertEqual(
restored[addr],
current[addr],
f"Mismatch at address 0x{addr:04X}",
)
def test_cmem_consecutive_zeros(self):
"""Test CMem encoding handles consecutive zero XOR results correctly."""
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
pristine = ZMemory(bytes(story))
current = ZMemory(bytes(story))
current[0x0040] = 0x11
current[0x0045] = 0x22
zmachine = Mock()
zmachine._pristine_mem = pristine
zmachine._mem = current
writer = QuetzalWriter(zmachine)
result = writer._generate_cmem_chunk()
self.assertIsInstance(result, bytes)
idx_11 = result.index(0x11)
self.assertEqual(result[idx_11 + 1], 0x00)
self.assertEqual(result[idx_11 + 2], 0x03)
self.assertEqual(result[idx_11 + 3], 0x22)
# Note: ZObjectParser methods are tested through integration tests # Note: ZObjectParser methods are tested through integration tests
# with real story files, not unit tests with mock memory, as the # with real story files, not unit tests with mock memory, as the
# interaction with ZStringFactory makes mocking complex. # interaction with ZStringFactory makes mocking complex.