Compare commits

..

No commits in common. "f4b7d0548bcfbfaba38d5a684e015a5403aa9465" and "8097bbcf55ae86c5c957b70ec5979b969e646ad1" have entirely different histories.

18 changed files with 123 additions and 3032 deletions

View file

@ -205,8 +205,6 @@ All V3 gaps have been resolved. sread tokenization works correctly. save/restore
Do they represent z-machine memory the same way? Both use bytearrays, but header parsing, object table offsets, string encoding — are these compatible enough that porting opcodes is translation, or is it a deeper rewrite?
UPDATE: Less urgent now that the hybrid interpreter works end-to-end for V3. The layout question mainly matters for V5 opcode porting (Lost Pig, Wizard Sniffer). The hybrid already handles all V3 memory operations correctly.
3. Async model
~~~~~~~~~~~~~~
@ -236,16 +234,12 @@ How hard is it to add words to a z-machine dictionary at runtime? The dictionary
7. Save/restore in the hybrid interpreter
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
RESOLVED: save/restore is now fully implemented and working. Key pieces:
``op_save`` is a stub that always branches false (game prints "Failed."). The infrastructure is mostly there — ``TrivialFilesystem.save_game()`` prompts for a filename and writes to disk, ``QuetzalParser`` can read save files — but two pieces are missing:
- ``QuetzalWriter`` chunk generators implemented (``IFhd`` for header, ``CMem`` for XOR-compressed dynamic memory, ``Stks`` for stack frame serialization)
- ``op_save`` and ``op_restore`` wired to filesystem layer via ``TrivialFilesystem``
- round-trip tested: save game state, restore it, continue playing
- fixed Quetzal ``Stks`` field mapping: ``return_pc`` belongs on the caller frame's ``program_counter``, not the current frame. ``varnum`` is the store variable on the current frame. round-trip tests masked this because writer and parser had the same bug symmetrically
- fixed V3 save branch processing on restore: in-game saves store PC pointing at branch data after the save opcode (0xB5). ``_try_restore()`` detects this and calls ``_branch(True)`` to advance past it. without this, branch bytes were decoded as instructions
- fixed restored local var padding: save files store only declared locals, runtime expects 15 slots. now zero-pads on restore
- ``QuetzalWriter`` chunk generators (``ifhd``, ``cmem``, ``stks``) are all stubs returning ``"0"``
- ``op_save`` doesn't collect game state or call the filesystem
Quetzal format is now fully supported for both reading and writing saves. Diagnostic tooling: ``scripts/zmachine_inspect.py`` for offline state inspection, instruction trace deque (last 20) auto-dumps on crash.
To make save work: implement ``QuetzalWriter`` (XOR-compress dynamic memory against original story, serialize stack frames into Quetzal format), then wire ``op_save`` to generate the bytes and call ``self._ui.filesystem.save_game(data)``. Restore should be simpler since ``QuetzalParser`` already works — just need to wire ``op_restore`` to call ``filesystem.restore_game()`` and apply the parsed state.
what to do next
---------------
@ -260,9 +254,7 @@ Concrete next steps, roughly ordered. Update as items get done.
- [x] build level 1 prototype: regardless of interpreter choice, implement the terminal object, IF mode, and subprocess dfrotz path. this proves the MUD-side architecture (mode stack, spectators, save/restore) independently of the interpreter question. (done — see ``docs/how/if-terminal.txt``)
- [x] implement save/restore: finished ``QuetzalWriter`` chunk generators (IFhd, CMem, Stks) and wired ``op_save``/``op_restore`` to filesystem. quetzal round-trip now works — can save during gameplay, restore, and continue. also fixed parser off-by-one bug in return_pc.
- [x] wire embedded interpreter to MUD: connected the hybrid interpreter to the MUD's mode stack via ``EmbeddedIFSession``. .z3 files use the embedded interpreter; other formats fall back to dfrotz. save/restore works via QuetzalWriter/QuetzalParser. state inspection (room name, objects) enables level 2. found and fixed a quetzal parser bug (bit slice for local vars was 3 bits, needed 4). (done — see ``src/mudlib/embedded_if_session.py``, ``src/mudlib/zmachine/mud_ui.py``)
- [ ] implement save/restore: finish ``QuetzalWriter`` chunk generators and wire ``op_save``/``op_restore`` to the filesystem layer. restore should be easier since ``QuetzalParser`` already works.
- [ ] study MojoZork's multiplayer model: read the MultiZork source for how it handles multiple players in one z-machine. document the pattern for our eventual level 4.
@ -281,8 +273,7 @@ What works:
- instruction trace deque (last 20 instructions) for debugging state errors
- smoke test: ``scripts/run_zork1.py`` runs the game headless, exercises core opcode paths
- parser and lexer: all Zork 1 commands work (look, open mailbox, read leaflet, inventory, take, drop, navigation)
- save/restore: full quetzal format support for persisting and restoring game state
- the interpreter is fully playable for Zork 1
- the interpreter is fully playable for Zork 1 (save/restore not yet wired — see open question 7)
What this enables:
@ -293,29 +284,6 @@ What this enables:
The step-based execution model means IF sessions can run in the async MUD game loop without blocking. Each player command advances their z-machine instance by N instructions (until output or a stopping condition). The trace deque captures the last 20 instructions for debugging unexpected state.
milestone — Level 2: embedded interpreter wired to MUD
-------------------------------------------------------
The embedded z-machine interpreter is now connected to the MUD engine. Players can ``play zork1`` and the game runs inside the MUD process — no dfrotz subprocess needed for .z3 files.
What works:
- ``EmbeddedIFSession`` wraps the hybrid interpreter with the same interface as the dfrotz-based ``IFSession``
- MUD ZUI components: ``MudScreen`` (buffered output), ``MudInputStream`` (thread-safe input with events), ``MudFilesystem`` (quetzal saves to disk), ``NullAudio``
- interpreter runs in a daemon thread; ``MudInputStream`` uses ``threading.Event`` for async bridge — interpreter blocks on ``read_line()``, async side feeds input and waits for next prompt
- save/restore via ``::save`` and ``::quit`` escape commands (QuetzalWriter), auto-restore on session start (QuetzalParser)
- state inspection: ``get_location_name()`` reads global variable 0 (player location object), ``get_room_objects()`` walks the object tree
- .z3 files use embedded interpreter, other formats fall back to dfrotz
- fixed quetzal parser bug: ``_parse_stks`` bit slice was ``[0:3]`` (3 bits, max 7 locals), should be ``[0:4]`` (4 bits, max 15 locals) — Zork uses 15
- 558 tests pass including unit tests for MUD UI components and integration tests with real zork1.z3
What this enables:
- spectators can see what room the IF player is in (``get_location_name()``)
- MUD code can read the object tree, variables, and attributes
- foundation for level 3 (moldable world — write z-machine state from MUD)
- no external dependency on dfrotz for V3 games
related documents
-----------------

View file

@ -12,10 +12,7 @@
#alias {fse} {fly southeast}
#alias {fsw} {fly southwest}
#NOP combat aliases (pr/pl/dr/dl/f/v are built into the MUD)
#NOP these are extras for single-key convenience
#alias {o} {sweep}
#alias {r} {roundhouse}
#alias {reconnect} {
#zap mud;
#session mud localhost 6789;
}

View file

@ -1,399 +0,0 @@
#!/usr/bin/env -S uv run --script
"""Z-Machine state inspection and debugging tool.
Loads a story file and optionally applies a Quetzal save, then displays
machine state for debugging z-machine interpreter issues.
"""
import argparse
import sys
from pathlib import Path
# Add src to path so we can import mudlib
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))
from mudlib.zmachine.quetzal import ( # noqa: E402
QuetzalError,
QuetzalMismatchedFile,
QuetzalParser,
)
from mudlib.zmachine.trivialzui import create_zui # noqa: E402
from mudlib.zmachine.zmachine import ZMachine # noqa: E402
# Opcode name tables for disassembly
OP2_NAMES = {
1: "je",
2: "jl",
3: "jg",
4: "dec_chk",
5: "inc_chk",
6: "jin",
7: "test",
8: "or",
9: "and",
10: "test_attr",
11: "set_attr",
12: "clear_attr",
13: "store",
14: "insert_obj",
15: "loadw",
16: "loadb",
17: "get_prop",
18: "get_prop_addr",
19: "get_next_prop",
20: "add",
21: "sub",
22: "mul",
23: "div",
24: "mod",
}
OP1_NAMES = {
0: "jz",
1: "get_sibling",
2: "get_child",
3: "get_parent",
4: "get_prop_len",
5: "inc",
6: "dec",
7: "print_addr",
8: "call_1s",
9: "remove_obj",
10: "print_obj",
11: "ret",
12: "jump",
13: "print_paddr",
14: "load",
15: "not/call_1n",
}
OP0_NAMES = {
0: "rtrue",
1: "rfalse",
2: "print",
3: "print_ret",
4: "nop",
5: "save",
6: "restore",
7: "restart",
8: "ret_popped",
9: "pop/catch",
10: "quit",
11: "new_line",
12: "show_status",
13: "verify",
15: "piracy",
}
VAR_NAMES = {
0: "call",
1: "storew",
2: "storeb",
3: "put_prop",
4: "sread",
5: "print_char",
6: "print_num",
7: "random",
8: "push",
9: "pull",
10: "split_window",
11: "set_window",
19: "output_stream",
20: "input_stream",
21: "sound_effect",
}
def decode_opcode_class(opcode_byte):
"""Determine opcode class from the opcode byte."""
if opcode_byte < 0x80:
return "2OP"
elif opcode_byte < 0xC0:
op_type = (opcode_byte >> 4) & 3
if op_type == 3:
return "0OP"
else:
return "1OP"
elif opcode_byte < 0xE0:
return "2OP"
else:
return "VAR"
def get_opcode_name(opcode_class, opcode_num):
"""Get the name of an opcode."""
if opcode_class == "2OP":
return OP2_NAMES.get(opcode_num, f"unknown_{opcode_num}")
elif opcode_class == "1OP":
return OP1_NAMES.get(opcode_num, f"unknown_{opcode_num}")
elif opcode_class == "0OP":
return OP0_NAMES.get(opcode_num, f"unknown_{opcode_num}")
elif opcode_class == "VAR":
return VAR_NAMES.get(opcode_num, f"unknown_{opcode_num}")
else:
return "unknown"
def parse_operand_types(mem, pc, opcode_byte):
"""Parse operand types without evaluating them. Returns (types, bytes_consumed)."""
types = []
pos = pc
if opcode_byte < 0x80:
# Long form 2OP
types.append("var" if (opcode_byte & 0x40) else "small")
types.append("var" if (opcode_byte & 0x20) else "small")
return types, pos - pc
elif opcode_byte < 0xC0:
# Short form
op_type = (opcode_byte >> 4) & 3
if op_type == 0:
types.append("large")
elif op_type == 1:
types.append("small")
elif op_type == 2:
types.append("var")
# op_type == 3 means 0OP, no operands
return types, pos - pc
else:
# Variable form - read types byte
types_byte = mem[pos]
pos += 1
for i in range(4):
t = (types_byte >> (6 - i * 2)) & 3
if t == 3:
break
elif t == 0:
types.append("large")
elif t == 1:
types.append("small")
elif t == 2:
types.append("var")
return types, pos - pc
def disassemble_at(zmachine, addr, count):
"""Disassemble count instructions starting at addr."""
mem = zmachine._mem
pc = addr
print(f"\n--- disassembly at 0x{addr:06x} ({count} instructions) ---")
for _ in range(count):
if pc >= len(mem._memory):
print(f" {pc:06x} (out of bounds)")
break
start_pc = pc
opcode_byte = mem[pc]
pc += 1
# Decode opcode class and number
if opcode_byte < 0x80:
# Long form 2OP
op_class = "2OP"
op_num = opcode_byte & 0x1F
elif opcode_byte < 0xC0:
# Short form
op_type = (opcode_byte >> 4) & 3
if op_type == 3:
op_class = "0OP"
op_num = opcode_byte & 0x0F
else:
op_class = "1OP"
op_num = opcode_byte & 0x0F
elif opcode_byte < 0xE0:
# Variable form 2OP
op_class = "2OP"
op_num = opcode_byte & 0x1F
else:
# Variable form VAR
op_class = "VAR"
op_num = opcode_byte & 0x1F
op_name = get_opcode_name(op_class, op_num)
# Parse operand types
operand_types, type_bytes = parse_operand_types(mem, pc, opcode_byte)
pc += type_bytes
# Skip past operand values (without reading them)
operand_str_parts = []
for ot in operand_types:
if ot == "large":
if pc + 1 < len(mem._memory):
val = (mem[pc] << 8) | mem[pc + 1]
operand_str_parts.append(f"#{val}")
pc += 2
elif ot == "small":
if pc < len(mem._memory):
operand_str_parts.append(f"#{mem[pc]}")
pc += 1
elif ot == "var" and pc < len(mem._memory):
operand_str_parts.append(f"V{mem[pc]:02x}")
pc += 1
operands_str = ", ".join(operand_str_parts)
print(f" {start_pc:06x} {op_class}:{op_num:02d} {op_name}({operands_str})")
def get_location_info(zmachine):
"""Get current location object and its contents."""
try:
# Global variable 0 (opcode variable 0x10) is the player location
location_obj = zmachine._mem.read_global(0x10)
if location_obj == 0:
return None, []
obj_parser = zmachine._objectparser
# Try to get the location name
try:
location_name = obj_parser.get_shortname(location_obj)
except Exception:
# Invalid object number - return None
return None, []
# Get objects in this location (children)
objects = []
child = obj_parser.get_child(location_obj)
while child != 0:
try:
child_name = obj_parser.get_shortname(child)
objects.append(child_name)
except Exception:
objects.append(f"object #{child}")
child = obj_parser.get_sibling(child)
return (location_obj, location_name), objects
except Exception:
return None, []
def validate_save(zmachine, save_path):
"""Validate a save file and print diagnostic info."""
print("\n--- save file validation ---")
# Read the save file
try:
with open(save_path, "rb") as f:
save_data = f.read()
except OSError as e:
print(f"ERROR: Cannot read save file: {e}")
return False
# Parse it
parser = QuetzalParser(zmachine)
try:
parser.load_from_bytes(save_data)
except QuetzalMismatchedFile:
print("ERROR: Save file does not match story file")
print(" (release number, serial, or checksum mismatch)")
return False
except QuetzalError as e:
print(f"ERROR: Invalid Quetzal file: {e}")
return False
except Exception as e:
print(f"ERROR: Failed to parse save file: {e}")
return False
# Get metadata
metadata = parser.get_last_loaded()
print("Save file loaded successfully")
print(f" release: {metadata.get('release number', 'unknown')}")
print(f" serial: {metadata.get('serial number', 'unknown')}")
print(f" checksum: 0x{metadata.get('checksum', 0):04x}")
print(f" PC: 0x{metadata.get('program counter', 0):06x}")
# Check PC is in bounds
pc = metadata.get("program counter", 0)
mem_size = len(zmachine._mem._memory)
if pc >= mem_size:
print(f" WARNING: PC 0x{pc:06x} is out of bounds (size: {mem_size})")
return False
print(" PC is within story file bounds")
return True
def main():
parser = argparse.ArgumentParser(
description="Inspect z-machine state for debugging"
)
parser.add_argument("story", help="Path to story file (.z3/.z5/.z8)")
parser.add_argument("--save", help="Path to Quetzal save file (.qzl)")
parser.add_argument(
"--disasm", type=lambda x: int(x, 0), help="Address to disassemble (hex)"
)
parser.add_argument(
"--count", type=int, default=5, help="Number of instructions to disassemble"
)
parser.add_argument(
"--globals", action="store_true", help="Show first 16 global variables"
)
args = parser.parse_args()
story_path = Path(args.story)
if not story_path.exists():
print(f"ERROR: Story file not found: {story_path}")
sys.exit(1)
# Load story
with open(story_path, "rb") as f:
story_bytes = f.read()
# Create ZMachine
ui = create_zui()
zmachine = ZMachine(story_bytes, ui)
# Load save if provided
if args.save:
save_path = Path(args.save)
if not save_path.exists():
print(f"ERROR: Save file not found: {save_path}")
sys.exit(1)
if not validate_save(zmachine, save_path):
sys.exit(1)
# Display state
print("\nzmachine state")
print("==============")
print(f"version: {zmachine._mem.version}")
print(f"story size: {len(story_bytes)} bytes")
print(f"PC: 0x{zmachine._opdecoder.program_counter:06x}")
print(f"stack: {len(zmachine._stackmanager._call_stack) - 1} frames")
# Location info
location, objects = get_location_info(zmachine)
if location:
loc_obj, loc_name = location
print(f"\nlocation: {loc_name} (#{loc_obj})")
if objects:
print("objects: " + ", ".join(objects))
else:
print("objects: (none)")
else:
print("\nlocation: (unknown)")
# Global variables
if args.globals:
print("\nglobals (first 16):")
for i in range(16):
global_val = zmachine._mem.read_global(0x10 + i)
print(f" G{i:02d} (V{0x10 + i:02x}): 0x{global_val:04x} ({global_val})")
# Disassembly
if args.disasm is not None:
disassemble_at(zmachine, args.disasm, args.count)
else:
# Disassemble instructions at PC
disassemble_at(zmachine, zmachine._opdecoder.program_counter, args.count)
if __name__ == "__main__":
main()

View file

@ -3,7 +3,6 @@
import pathlib
from mudlib.commands import CommandDefinition, register
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.if_session import IFSession, broadcast_to_spectators
from mudlib.player import Player
@ -61,20 +60,8 @@ async def cmd_play(player: Player, args: str) -> None:
await player.send(msg)
return
# Ensure story_path is a Path object (for mocking compatibility)
if not isinstance(story_path, pathlib.Path):
story_path = pathlib.Path(story_path)
# Use embedded interpreter for z3 files, dfrotz for others
if story_path.suffix == ".z3":
try:
session = EmbeddedIFSession(player, str(story_path), game_name)
except (FileNotFoundError, OSError) as e:
await player.send(f"error starting game: {e}\r\n")
return
else:
session = IFSession(player, str(story_path), game_name)
# Create and start IF session
session = IFSession(player, str(story_path), game_name)
try:
intro = await session.start()
except FileNotFoundError:
@ -91,9 +78,18 @@ async def cmd_play(player: Player, args: str) -> None:
await player.send("(type ::help for escape commands)\r\n")
if intro:
# Check for saved game
if session.save_path.exists():
await player.send("restoring saved game...\r\n")
restored_text = await session._do_restore()
if restored_text:
await player.send(restored_text + "\r\n")
# Broadcast restored text to spectators
spectator_msg = f"[{player.name}'s terminal]\r\n{restored_text}\r\n"
await broadcast_to_spectators(player, spectator_msg)
elif intro:
await player.send(intro + "\r\n")
# Broadcast to spectators
# Broadcast intro to spectators
spectator_msg = f"[{player.name}'s terminal]\r\n{intro}\r\n"
await broadcast_to_spectators(player, spectator_msg)

View file

@ -1,174 +0,0 @@
import asyncio
import logging
import re
import threading
import traceback
from pathlib import Path
from typing import TYPE_CHECKING
from mudlib.if_session import IFResponse
from mudlib.zmachine.mud_ui import create_mud_ui
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
from mudlib.zmachine.zcpu import ZCpuQuit, ZCpuRestart
from mudlib.zmachine.zmachine import ZMachine
if TYPE_CHECKING:
from mudlib.player import Player
logger = logging.getLogger(__name__)
class EmbeddedIFSession:
"""Wraps z-machine interpreter for MUD integration."""
def __init__(self, player: "Player", story_path: str, game_name: str = ""):
self.player = player
self.story_path = story_path
self.game_name = game_name or Path(story_path).stem
self._data_dir = Path(__file__).resolve().parents[2] / "data"
self._thread: threading.Thread | None = None
self._done = False
self._error: str | None = None
story_bytes = Path(story_path).read_bytes()
save_path = self.save_path
self._ui, self._screen, self._keyboard = create_mud_ui(save_path)
self._zmachine = ZMachine(story_bytes, self._ui)
self._filesystem = self._ui.filesystem
@property
def save_path(self) -> Path:
safe_name = re.sub(r"[^a-zA-Z0-9_-]", "_", self.player.name)
return self._data_dir / "if_saves" / safe_name / f"{self.game_name}.qzl"
def _try_restore(self) -> bool:
"""Try to restore from save file before interpreter starts.
Must be called before the interpreter thread is launched.
Returns True if state was restored successfully.
"""
if not self.save_path.exists():
return False
try:
save_data = self.save_path.read_bytes()
parser = QuetzalParser(self._zmachine)
parser.load_from_bytes(save_data)
# In V1-3, the saved PC points to branch data after the save
# instruction. Process the branch as "save succeeded" so the
# PC advances past it. Detect by checking for save opcode (0xB5)
# immediately before the restored PC.
pc = self._zmachine._opdecoder.program_counter
if (
self._zmachine._mem.version <= 3
and pc > 0
and self._zmachine._mem[pc - 1] == 0xB5
):
self._zmachine._cpu._branch(True)
return True
except Exception as e:
logger.debug(f"Restore failed: {e}")
return False
async def start(self) -> str:
"""Start the z-machine interpreter, restoring from save if available."""
restored = self._try_restore()
self._thread = threading.Thread(target=self._run_interpreter, daemon=True)
self._thread.start()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._keyboard._waiting.wait)
output = self._screen.flush()
if restored:
prefix = "restoring saved game...\r\nrestored."
return f"{prefix}\r\n\r\n{output}" if output else prefix
return output
async def handle_input(self, text: str) -> IFResponse:
if text.lower() == "::quit":
await self._do_save()
return IFResponse(output="game saved.", done=True)
if text.lower() == "::help":
help_text = """escape commands:
::quit - exit the game
::save - save game progress
::help - show this help"""
return IFResponse(output=help_text, done=False)
if text.lower() == "::save":
confirmation = await self._do_save()
return IFResponse(output=confirmation, done=False)
self._keyboard._waiting.clear()
self._keyboard.feed(text)
loop = asyncio.get_running_loop()
def wait_for_next_input():
while not self._done and not self._keyboard._waiting.is_set():
self._keyboard._waiting.wait(timeout=0.1)
await loop.run_in_executor(None, wait_for_next_input)
output = self._screen.flush()
if self._done and self._error:
output = f"{output}\r\n{self._error}" if output else self._error
return IFResponse(output=output, done=self._done)
async def stop(self):
self._done = True
if self._keyboard._waiting.is_set():
self._keyboard.feed("")
def _run_interpreter(self):
try:
self._zmachine.run()
except ZCpuQuit:
logger.debug("Interpreter quit normally")
except ZCpuRestart:
logger.debug("Interpreter restart requested")
except Exception as e:
tb = traceback.format_exc()
logger.error(f"Interpreter crashed:\n{tb}")
self._error = f"interpreter error: {e}"
finally:
self._done = True
self._keyboard._waiting.set()
async def _do_save(self) -> str:
try:
writer = QuetzalWriter(self._zmachine)
save_data = writer.generate_save_data()
success = self._filesystem.save_game(save_data)
if success:
return "saved."
return "error: save failed"
except Exception as e:
return f"error: save failed ({e})"
def get_location_name(self) -> str | None:
try:
location_obj = self._zmachine._mem.read_global(0)
if location_obj == 0:
return None
return self._zmachine._objectparser.get_shortname(location_obj)
except Exception:
return None
def get_room_objects(self) -> list[str]:
try:
location_obj = self._zmachine._mem.read_global(0)
if location_obj == 0:
return []
objects = []
child = self._zmachine._objectparser.get_child(location_obj)
while child != 0:
name = self._zmachine._objectparser.get_shortname(child)
objects.append(name)
child = self._zmachine._objectparser.get_sibling(child)
return objects
except Exception:
return []

View file

@ -10,7 +10,6 @@ from mudlib.entity import Entity
if TYPE_CHECKING:
from mudlib.editor import Editor
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.if_session import IFSession
@ -24,7 +23,7 @@ class Player(Entity):
mode_stack: list[str] = field(default_factory=lambda: ["normal"])
caps: ClientCaps = field(default_factory=lambda: ClientCaps(ansi=True))
editor: Editor | None = None
if_session: IFSession | EmbeddedIFSession | None = None
if_session: IFSession | None = None
@property
def mode(self) -> str:

View file

@ -1,150 +0,0 @@
import logging
import queue
import threading
from pathlib import Path
from . import zaudio, zfilesystem, zscreen, zstream, zui
logger = logging.getLogger(__name__)
class NullAudio(zaudio.ZAudio):
def __init__(self):
super().__init__()
self.features = {"has_more_than_a_bleep": False}
def play_bleep(self, bleep_type):
pass
class MudScreen(zscreen.ZScreen):
def __init__(self):
super().__init__()
self._buffer = []
self._columns = 80
self._rows = zscreen.INFINITE_ROWS
self.features = {
"has_status_line": False,
"has_upper_window": False,
"has_graphics_font": False,
"has_text_colors": False,
}
def write(self, string):
self._buffer.append(string)
def flush(self) -> str:
result = "".join(self._buffer)
self._buffer.clear()
return result
def split_window(self, height):
logger.debug(f"split_window({height}) - no-op")
def select_window(self, window):
logger.debug(f"select_window({window}) - no-op")
def set_cursor_position(self, x, y):
logger.debug(f"set_cursor_position({x}, {y}) - no-op")
def erase_window(self, window=zscreen.WINDOW_LOWER, color=zscreen.COLOR_CURRENT):
logger.debug(f"erase_window({window}, {color}) - no-op")
def erase_line(self):
logger.debug("erase_line() - no-op")
def print_status_score_turns(self, text, score, turns):
pass
def print_status_time(self, hours, minutes):
pass
def set_font(self, font_number):
if font_number == zscreen.FONT_NORMAL:
return font_number
return None
def set_text_style(self, style):
pass
def set_text_color(self, foreground_color, background_color):
pass
class MudInputStream(zstream.ZInputStream):
def __init__(self):
super().__init__()
self._input_queue = queue.Queue()
self._waiting = threading.Event()
self._ready = threading.Event()
self._done = False
self.features = {"has_timed_input": False}
def read_line(
self,
original_text=None,
max_length=0,
terminating_characters=None,
timed_input_routine=None,
timed_input_interval=0,
):
self._waiting.set()
self._ready.wait()
self._ready.clear()
self._waiting.clear()
text = self._input_queue.get()
if max_length > 0:
text = text[:max_length]
return text
def read_char(self, timed_input_routine=None, timed_input_interval=0):
self._waiting.set()
self._ready.wait()
self._ready.clear()
self._waiting.clear()
text = self._input_queue.get()
if text:
return ord(text[0])
return 0
def feed(self, text: str):
self._input_queue.put(text)
self._ready.set()
class MudFilesystem(zfilesystem.ZFilesystem):
def __init__(self, save_path: Path):
self.save_path = save_path
def save_game(self, data, suggested_filename=None):
try:
self.save_path.parent.mkdir(parents=True, exist_ok=True)
self.save_path.write_bytes(data)
return True
except Exception as e:
logger.error(f"Failed to save game: {e}")
return False
def restore_game(self):
if self.save_path.exists():
try:
return self.save_path.read_bytes()
except Exception as e:
logger.error(f"Failed to restore game: {e}")
return None
return None
def open_transcript_file_for_writing(self):
return None
def open_transcript_file_for_reading(self):
return None
def create_mud_ui(save_path: Path) -> tuple[zui.ZUI, MudScreen, MudInputStream]:
audio = NullAudio()
screen = MudScreen()
keyboard = MudInputStream()
filesystem = MudFilesystem(save_path)
ui = zui.ZUI(audio, screen, keyboard, filesystem)
return ui, screen, keyboard

View file

@ -203,31 +203,24 @@ class QuetzalParser:
# Read successive stack frames:
while ptr < total_len:
log(" Parsing stack frame...")
return_pc = (bytes[ptr] << 16) + (bytes[ptr + 1] << 8) + bytes[ptr + 2]
return_pc = (bytes[ptr] << 16) + (bytes[ptr + 1] << 8) + bytes[ptr + 3]
ptr += 3
flags_bitfield = bitfield.BitField(bytes[ptr])
ptr += 1
varnum = bytes[ptr]
_varnum = bytes[ptr] ### TODO: tells us which variable gets the result
ptr += 1
_argflag = bytes[ptr]
ptr += 1
evalstack_size = (bytes[ptr] << 8) + bytes[ptr + 1]
ptr += 2
# Quetzal flags bit 4: if set, routine discards its return value
discard_result = flags_bitfield[4]
store_var = None if discard_result else varnum
# read anywhere from 0 to 15 local vars, pad to 15
num_locals = flags_bitfield[0:4]
# read anywhere from 0 to 15 local vars
local_vars = []
for _i in range(num_locals):
for _i in range(flags_bitfield[0:3]):
var = (bytes[ptr] << 8) + bytes[ptr + 1]
ptr += 2
local_vars.append(var)
# runtime expects 15 slots, save only stores declared count
local_vars.extend([0] * (15 - num_locals))
log(f" Found {num_locals} local vars (padded to 15)")
log(f" Found {len(local_vars)} local vars")
# least recent to most recent stack values:
stack_values = []
@ -237,16 +230,16 @@ class QuetzalParser:
stack_values.append(val)
log(f" Found {len(stack_values)} local stack values")
# return_pc belongs on the CALLER frame (previous in the stack).
# When this routine finishes, finish_routine() returns
# caller.program_counter as the resume address.
prev_frame = stackmanager._call_stack[-1]
prev_frame.program_counter = return_pc
### Interesting... the reconstructed stack frames have no 'start
### address'. I guess it doesn't matter, since we only need to
### pop back to particular return addresses to resume each
### routine.
### TODO: I can exactly which of the 7 args is "supplied", but I
### don't understand where the args *are*??
# store_var (varnum) is the variable that receives the return
# value when this routine finishes — NOT the return PC.
routine = zstackmanager.ZRoutine(
0, store_var, self._zmachine._mem, [], local_vars, stack_values
0, return_pc, self._zmachine._mem, [], local_vars, stack_values
)
stackmanager.push_routine(routine)
log(" Added new frame to stack.")
@ -255,12 +248,6 @@ class QuetzalParser:
raise QuetzalStackFrameOverflow
self._zmachine._stackmanager = stackmanager
# Update cached references in subsystems that store the stack manager
# (they cache the stack at init time and won't see the replacement)
if hasattr(self._zmachine, "_cpu"):
self._zmachine._cpu._stackmanager = stackmanager
if hasattr(self._zmachine, "_opdecoder"):
self._zmachine._opdecoder._stack = stackmanager
log(" Successfully installed all stack frames.")
def _parse_intd(self, data):
@ -307,71 +294,6 @@ class QuetzalParser:
debugging and test verification."""
return self._last_loaded_metadata
def load_from_bytes(self, data):
"""Parse Quetzal data from raw bytes (instead of a file).
Used by op_restore when filesystem.restore_game() returns raw bytes.
"""
import io
self._last_loaded_metadata = {}
if len(data) < 12:
raise QuetzalUnrecognizedFileFormat
# Validate FORM header
if data[0:4] != b"FORM":
raise QuetzalUnrecognizedFileFormat
# Read total length
self._len = (data[4] << 24) + (data[5] << 16) + (data[6] << 8) + data[7]
log(f"Total length of FORM data is {self._len}")
self._last_loaded_metadata["total length"] = self._len
# Validate IFZS type
if data[8:12] != b"IFZS":
raise QuetzalUnrecognizedFileFormat
# Create a BytesIO object to use with chunk module
self._file = io.BytesIO(data)
self._file.seek(12) # Skip FORM header, length, and IFZS type
log("Parsing chunks from byte data")
try:
while 1:
c = chunk.Chunk(self._file)
chunkname = c.getname()
chunksize = c.getsize()
chunk_data = c.read(chunksize)
log(f"** Found chunk ID {chunkname}: length {chunksize}")
self._last_loaded_metadata[chunkname] = chunksize
if chunkname == b"IFhd":
self._parse_ifhd(chunk_data)
elif chunkname == b"CMem":
self._parse_cmem(chunk_data)
elif chunkname == b"UMem":
self._parse_umem(chunk_data)
elif chunkname == b"Stks":
self._parse_stks(chunk_data)
elif chunkname == b"IntD":
self._parse_intd(chunk_data)
elif chunkname == b"AUTH":
self._parse_auth(chunk_data)
elif chunkname == b"(c) ":
self._parse_copyright(chunk_data)
elif chunkname == b"ANNO":
self._parse_anno(chunk_data)
else:
# spec says to ignore and skip past unrecognized chunks
pass
except EOFError:
pass
self._file.close()
log("Finished parsing Quetzal data.")
def load(self, savefile_path):
"""Parse each chunk of the Quetzal file at SAVEFILE_PATH,
initializing associated zmachine subsystems as needed."""
@ -455,211 +377,86 @@ class QuetzalWriter:
"""Return a chunk of type IFhd, containing metadata about the
zmachine and story being played."""
mem = self._zmachine._mem
### TODO: write this. payload must be *exactly* 13 bytes, even if
### it means padding the program counter.
# Release number (2 bytes, big-endian) from header bytes 2-3
release = mem.read_word(2)
### Some old infocom games don't have checksums stored in header.
### If not, generate it from the *original* story file memory
### image and put it into this chunk. See ZMemory.generate_checksum().
pass
# Serial number (6 bytes) from header bytes 0x12-0x17
serial = bytes(mem[0x12:0x18])
# Checksum (2 bytes, big-endian) from header bytes 0x1C-0x1D
checksum = mem.read_word(0x1C)
# Program counter (3 bytes, big-endian) - current PC
pc = self._zmachine._cpu._program_counter
# Build the 13-byte chunk
chunk_data = bytearray(13)
# Bytes 0-1: Release number
chunk_data[0] = (release >> 8) & 0xFF
chunk_data[1] = release & 0xFF
# Bytes 2-7: Serial number
chunk_data[2:8] = serial
# Bytes 8-9: Checksum
chunk_data[8] = (checksum >> 8) & 0xFF
chunk_data[9] = checksum & 0xFF
# Bytes 10-12: Program counter (24-bit)
chunk_data[10] = (pc >> 16) & 0xFF
chunk_data[11] = (pc >> 8) & 0xFF
chunk_data[12] = pc & 0xFF
return bytes(chunk_data)
return "0"
def _generate_cmem_chunk(self):
"""Return a compressed chunk of data representing the compressed
image of the zmachine's main memory."""
pmem = self._zmachine._pristine_mem
cmem = self._zmachine._mem
### TODO: debug this when ready
return "0"
# XOR current dynamic memory with pristine dynamic memory
dynamic_start = pmem._dynamic_start
dynamic_end = pmem._dynamic_end
diffarray = []
# XOR the original game image with the current one
diffarray = list(self._zmachine._pristine_mem)
for index in range(len(self._zmachine._pristine_mem._total_size)):
diffarray[index] = (
self._zmachine._pristine_mem[index] ^ self._zmachine._mem[index]
)
log(f"XOR array is {diffarray}")
for index in range(dynamic_start, dynamic_end + 1):
xor_value = pmem[index] ^ cmem[index]
diffarray.append(xor_value)
log(f"Generated XOR array of {len(diffarray)} bytes")
# Run-length encode the XOR result
# Run-length encode the resulting list of 0's and 1's.
result = []
zerocounter = 0
for byte in diffarray:
if byte == 0:
for index in range(len(diffarray)):
if diffarray[index] == 0:
zerocounter += 1
continue
else:
# Flush any pending zeros
while zerocounter > 0:
# Encode: 0x00 followed by count of ADDITIONAL zeros
# Maximum count in one byte is 255, meaning 256 zeros total (1+255)
if zerocounter > 256:
result.append(0x00)
result.append(0xFF) # 1 + 255 = 256 zeros
zerocounter -= 256
else:
result.append(0x00)
result.append(zerocounter - 1) # count of additional zeros
zerocounter = 0
# Output non-zero byte
result.append(byte)
# Don't encode trailing zeros - parser can leave them as-is
# (per spec: "If memcounter finishes less than memlen, that's totally fine")
log(f"Compressed to {len(result)} bytes")
return bytes(result)
if zerocounter > 0:
result.append(0)
result.append(zerocounter)
zerocounter = 0
result.append(diffarray[index])
return result
def _generate_stks_chunk(self):
"""Return a stacks chunk, describing the stack state of the
zmachine at this moment."""
result = bytearray()
stackmanager = self._zmachine._stackmanager
call_stack = stackmanager._call_stack
# Skip the ZStackBottom sentinel (first element)
for i, frame in enumerate(call_stack[1:], start=1):
num_local_vars = len(frame.local_vars)
# Quetzal return_pc = caller's saved program counter.
# The previous frame (caller) stores the resume PC that
# finish_routine() returns when this frame exits.
prev_frame = call_stack[i - 1]
return_pc = prev_frame.program_counter or 0
result.append((return_pc >> 16) & 0xFF)
result.append((return_pc >> 8) & 0xFF)
result.append(return_pc & 0xFF)
# Write flags byte (bits 0-3 = num local vars,
# bit 4 = discard return value)
flags = num_local_vars & 0x0F
if frame.return_addr is None:
flags |= 0x10
result.append(flags)
# Write varnum (which variable gets the return value)
varnum = frame.return_addr if frame.return_addr is not None else 0
result.append(varnum & 0xFF)
# Write argflag (bitmask of supplied arguments)
# TODO: track this properly, for now use 0
result.append(0)
# Write eval_stack_size as 16-bit big-endian
eval_stack_size = len(frame.stack)
result.append((eval_stack_size >> 8) & 0xFF)
result.append(eval_stack_size & 0xFF)
# Write local variables (16-bit big-endian each)
for local_var in frame.local_vars:
result.append((local_var >> 8) & 0xFF)
result.append(local_var & 0xFF)
# Write evaluation stack values (16-bit big-endian each)
for stack_val in frame.stack:
result.append((stack_val >> 8) & 0xFF)
result.append(stack_val & 0xFF)
return bytes(result)
### TODO: write this
return "0"
def _generate_anno_chunk(self):
"""Return an annotation chunk, containing metadata about the ZVM
interpreter which created the savefile."""
### TODO: write this
return b"0"
return "0"
# --------- Public APIs -----------
def generate_save_data(self):
"""Generate complete Quetzal save data as bytes (IFF/FORM/IFZS container).
Returns bytes representing the complete save file in IFF format.
"""
log("Generating Quetzal save data")
# Generate all chunks
ifhd_chunk = self._generate_ifhd_chunk()
cmem_chunk = self._generate_cmem_chunk()
stks_chunk = self._generate_stks_chunk()
anno_chunk = self._generate_anno_chunk()
# Build IFF container with proper chunk headers
result = bytearray()
# Helper to write a chunk with its header
def write_chunk(chunk_id, chunk_data):
result.extend(chunk_id.encode("ascii"))
size = len(chunk_data)
result.append((size >> 24) & 0xFF)
result.append((size >> 16) & 0xFF)
result.append((size >> 8) & 0xFF)
result.append(size & 0xFF)
result.extend(chunk_data)
# IFF chunks must be padded to even byte boundaries
if size % 2 == 1:
result.append(0) # padding byte
log(f" Added {chunk_id} chunk ({size} bytes)")
# Write nested chunks
write_chunk("IFhd", ifhd_chunk)
write_chunk("CMem", cmem_chunk)
write_chunk("Stks", stks_chunk)
write_chunk("ANNO", anno_chunk)
# Calculate total size (everything after FORM header + size field)
total_size = len(result) + 4 # +4 for "IFZS"
# Build final FORM container
container = bytearray()
container.extend(b"FORM")
container.append((total_size >> 24) & 0xFF)
container.append((total_size >> 16) & 0xFF)
container.append((total_size >> 8) & 0xFF)
container.append(total_size & 0xFF)
container.extend(b"IFZS")
container.extend(result)
log(f"Generated {len(container)} bytes of save data")
return bytes(container)
def write(self, savefile_path):
"""Write the current zmachine state to a new Quetzal-file at
SAVEFILE_PATH."""
log(f"Attempting to write game-state to '{savefile_path}'")
data = self.generate_save_data()
self._file = open(savefile_path, "w") # noqa: SIM115
with open(savefile_path, "wb") as f:
f.write(data)
ifhd_chunk = self._generate_ifhd_chunk()
cmem_chunk = self._generate_cmem_chunk()
stks_chunk = self._generate_stks_chunk()
anno_chunk = self._generate_anno_chunk()
_total_chunk_size = (
len(ifhd_chunk) + len(cmem_chunk) + len(stks_chunk) + len(anno_chunk)
)
# Write main FORM chunk to hold other chunks
self._file.write("FORM")
### TODO: self._file_write(total_chunk_size) -- spread it over 4 bytes
self._file.write("IFZS")
# Write nested chunks.
for chunk_data in (ifhd_chunk, cmem_chunk, stks_chunk, anno_chunk):
self._file.write(chunk_data)
log("Wrote a chunk.")
self._file.close()
log("Done writing game-state to savefile.")

View file

@ -44,16 +44,7 @@ class ZCpuRestart(ZCpuError):
class ZCpu:
def __init__(
self,
zmem,
zopdecoder,
zstack,
zobjects,
zstring,
zstreammanager,
zui,
zlexer,
zmachine=None,
self, zmem, zopdecoder, zstack, zobjects, zstring, zstreammanager, zui, zlexer
):
self._memory = zmem
self._opdecoder = zopdecoder
@ -63,14 +54,8 @@ class ZCpu:
self._streammanager = zstreammanager
self._ui = zui
self._lexer = zlexer
self._zmachine = zmachine
self._trace = deque(maxlen=20)
@property
def _program_counter(self):
"""Return the current program counter value."""
return self._opdecoder.program_counter
def _get_handler(self, opcode_class, opcode_number):
try:
opcode_decl = self.opcodes[opcode_class][opcode_number]
@ -236,7 +221,8 @@ class ZCpu:
except (ZCpuQuit, ZCpuRestart):
# Normal control flow - don't dump trace
raise
except Exception:
except ZCpuError:
# All other ZCpu errors - dump trace for debugging
self._dump_trace()
raise
return True
@ -557,26 +543,9 @@ class ZCpu:
def op_save(self, *args):
"""Save game state to file (V3 - branch on success).
Uses QuetzalWriter to generate save data in IFF/FORM/IFZS format,
then calls the filesystem to write it. Branches true on success,
false on failure.
Currently always fails because QuetzalWriter is not yet functional.
"""
if self._zmachine is None:
# Can't save without zmachine reference
self._branch(False)
return
from .quetzal import QuetzalWriter
try:
writer = QuetzalWriter(self._zmachine)
save_data = writer.generate_save_data()
success = self._ui.filesystem.save_game(save_data)
self._branch(success)
except Exception as e:
# Any error during save process = failure
log(f"Save failed with exception: {e}")
self._branch(False)
self._branch(False)
def op_save_v4(self, *args):
"""TODO: Write docstring here."""
@ -585,41 +554,10 @@ class ZCpu:
def op_restore(self, *args):
"""Restore game state from file (V3 - branch on success).
Uses QuetzalParser to load save data from filesystem,
validates it matches current story, and restores memory/stack/PC.
Branches true on success, false on failure.
Currently always fails because QuetzalWriter is not yet functional,
so there are no valid save files to restore.
"""
if self._zmachine is None:
# Can't restore without zmachine reference
self._branch(False)
return
from .quetzal import QuetzalParser
try:
# Get save data from filesystem
save_data = self._ui.filesystem.restore_game()
if save_data is None:
# User cancelled or no save file available
self._branch(False)
return
# Parse the save data
parser = QuetzalParser(self._zmachine)
parser.load_from_bytes(save_data)
# QuetzalParser already:
# - Validated IFhd matches current story (release/serial/checksum)
# - Replaced dynamic memory via _parse_cmem or _parse_umem
# - Replaced stack manager via _parse_stks
# - Set program counter via _parse_ifhd
# Success!
self._branch(True)
except Exception as e:
# Any error during restore process = failure
log(f"Restore failed with exception: {e}")
self._branch(False)
self._branch(False)
def op_restore_v4(self, *args):
"""TODO: Write docstring here."""

View file

@ -44,7 +44,6 @@ class ZMachine:
self._stream_manager,
self._ui,
self._lexer,
zmachine=self,
)
# --------- Public APIs -----------

View file

@ -184,15 +184,10 @@ class ZMemory:
def _check_bounds(self, index):
if isinstance(index, slice):
start, stop = index.start, index.stop
# For slices, stop can be _total_size since slicing is exclusive
if not (
(0 <= start < self._total_size) and (0 <= stop <= self._total_size)
):
raise ZMemoryOutOfBounds
else:
start, stop = index, index
if not ((0 <= start < self._total_size) and (0 <= stop < self._total_size)):
raise ZMemoryOutOfBounds
if not ((0 <= start < self._total_size) and (0 <= stop < self._total_size)):
raise ZMemoryOutOfBounds
def _check_static(self, index):
"""Throw error if INDEX is within the static-memory area."""

View file

@ -153,11 +153,6 @@ class ZStackManager:
"Remove and return value from the top of the data stack."
current_routine = self._call_stack[-1]
if not current_routine.stack:
frame_idx = len(self._call_stack) - 1
ra = getattr(current_routine, "return_addr", "N/A")
pc = getattr(current_routine, "program_counter", 0)
raise ZStackPopError(f"frame {frame_idx}, return_addr={ra}, pc=0x{pc:06x}")
return current_routine.stack.pop()
def get_stack_frame_index(self):

View file

@ -1,329 +0,0 @@
"""Tests for embedded z-machine MUD integration.
Tests the MUD UI components and EmbeddedIFSession integration with real zork1.z3.
"""
import threading
from dataclasses import dataclass
from pathlib import Path
import pytest
from mudlib.zmachine.mud_ui import MudFilesystem, MudInputStream, MudScreen
ZORK_PATH = Path(__file__).parent.parent / "content" / "stories" / "zork1.z3"
requires_zork = pytest.mark.skipif(not ZORK_PATH.exists(), reason="zork1.z3 not found")
@dataclass
class MockWriter:
def write(self, data):
pass
async def drain(self):
pass
# Unit tests for MUD UI components
def test_mud_screen_captures_output():
"""MudScreen captures written text and flush returns it."""
screen = MudScreen()
screen.write("Hello ")
screen.write("world!")
output = screen.flush()
assert output == "Hello world!"
def test_mud_screen_flush_clears_buffer():
"""MudScreen flush clears buffer, second flush returns empty."""
screen = MudScreen()
screen.write("test")
first = screen.flush()
assert first == "test"
second = screen.flush()
assert second == ""
def test_mud_input_stream_feed_and_read():
"""MudInputStream feed and read_line work with threading."""
stream = MudInputStream()
result = []
def reader():
result.append(stream.read_line())
t = threading.Thread(target=reader)
t.start()
# Wait for stream to signal it's waiting
stream._waiting.wait(timeout=2)
stream.feed("hello")
t.join(timeout=2)
assert result == ["hello"]
def test_mud_filesystem_save_restore(tmp_path):
"""MudFilesystem save and restore bytes correctly."""
save_path = tmp_path / "test.qzl"
filesystem = MudFilesystem(save_path)
test_data = b"\x01\x02\x03\x04\x05"
success = filesystem.save_game(test_data)
assert success
assert save_path.exists()
restored = filesystem.restore_game()
assert restored == test_data
# Integration tests with real zork1.z3
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_start():
"""EmbeddedIFSession starts and returns intro containing game info."""
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
player = Player(name="tester", x=5, y=5, writer=mock_writer)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
# Clean up any existing save to get a fresh start
if session.save_path.exists():
session.save_path.unlink()
intro = await session.start()
assert intro is not None
assert len(intro) > 0
# Intro should contain game title or location
assert "ZORK" in intro or "West of House" in intro
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_handle_input():
"""EmbeddedIFSession handles input and returns response."""
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
player = Player(name="tester", x=5, y=5, writer=mock_writer)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
# Clean up any existing save to get a fresh start
if session.save_path.exists():
session.save_path.unlink()
await session.start()
response = await session.handle_input("look")
assert response is not None
assert response.done is False
assert len(response.output) > 0
# Looking should describe the starting location
assert "West of House" in response.output or "house" in response.output
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_escape_help():
"""EmbeddedIFSession ::help returns help text."""
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
player = Player(name="tester", x=5, y=5, writer=mock_writer)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
await session.start()
response = await session.handle_input("::help")
assert response.done is False
assert "::quit" in response.output
assert "::save" in response.output
assert "::help" in response.output
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_escape_quit():
"""EmbeddedIFSession ::quit returns done=True."""
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
player = Player(name="tester", x=5, y=5, writer=mock_writer)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
await session.start()
response = await session.handle_input("::quit")
assert response.done is True
assert "saved" in response.output.lower()
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_location_name():
"""EmbeddedIFSession get_location_name returns location after input."""
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
player = Player(name="tester", x=5, y=5, writer=mock_writer)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
await session.start()
# Send a command to advance game state
await session.handle_input("look")
location = session.get_location_name()
# Location may be None or a string depending on game state
assert location is None or isinstance(location, str)
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_room_objects():
"""EmbeddedIFSession get_room_objects returns a list after start."""
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
player = Player(name="tester", x=5, y=5, writer=mock_writer)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
await session.start()
objects = session.get_room_objects()
assert isinstance(objects, list)
# Zork1 starting location usually has some objects
assert len(objects) >= 0 # May or may not have visible objects initially
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_try_restore_before_thread():
"""_try_restore() is called synchronously before interpreter thread starts."""
from unittest.mock import patch
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
player = Player(name="tester", x=5, y=5, writer=mock_writer)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
# Create a save file
if session.save_path.exists():
session.save_path.unlink()
session.save_path.parent.mkdir(parents=True, exist_ok=True)
# Write a minimal valid save (header only, won't actually restore correctly)
session.save_path.write_bytes(b"FORM\x00\x00\x00\x08IFZSQUTZ\x00\x00\x00\x00")
call_order = []
original_try_restore = session._try_restore
original_run_interpreter = session._run_interpreter
def track_try_restore():
call_order.append("try_restore")
return original_try_restore()
def track_run_interpreter():
call_order.append("run_interpreter")
original_run_interpreter()
with (
patch.object(session, "_try_restore", side_effect=track_try_restore),
patch.object(session, "_run_interpreter", side_effect=track_run_interpreter),
):
await session.start()
# Verify _try_restore was called before _run_interpreter
assert call_order[0] == "try_restore"
assert call_order[1] == "run_interpreter"
await session.stop()
if session.save_path.exists():
session.save_path.unlink()
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_no_restore_without_save():
"""start() does not restore when no save file exists."""
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
player = Player(name="nosaveplayer", writer=mock_writer, x=0, y=0)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
# Ensure no save file exists
if session.save_path.exists():
session.save_path.unlink()
intro = await session.start()
# Should NOT contain restore message
assert "restoring" not in intro.lower()
# Should contain normal game intro
assert "ZORK" in intro or "West of House" in intro
await session.stop()
@requires_zork
@pytest.mark.asyncio
async def test_embedded_session_save_and_restore():
"""Save a game, create new session, restore it via start()."""
from mudlib.embedded_if_session import EmbeddedIFSession
from mudlib.player import Player
mock_writer = MockWriter()
# Start first session
player = Player(name="testplayer", writer=mock_writer, x=0, y=0)
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
# Clean up any existing save to get a fresh start
if session.save_path.exists():
session.save_path.unlink()
await session.start()
# Do something to change state
await session.handle_input("open mailbox")
# Save
save_result = await session.handle_input("::save")
assert "saved" in save_result.output.lower()
await session.stop()
# Start new session - should auto-restore via start()
# start() calls _try_restore() BEFORE launching the interpreter thread
session2 = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
intro = await session2.start()
# Should contain restore message prefixed to output
assert "restoring saved game" in intro.lower()
assert "restored" in intro.lower()
# The game state should reflect the restored state
# (location may differ after restore, just verify it works)
response = await session2.handle_input("look")
assert response.output # Should get some output
await session2.stop()
# Clean up save file
if session2.save_path.exists():
session2.save_path.unlink()

View file

@ -70,9 +70,9 @@ async def test_play_enters_if_mode(player):
with patch("mudlib.commands.play.IFSession") as MockIFSession:
MockIFSession.return_value = mock_session
# Use .z5 to test dfrotz path
# Ensure story file exists check passes
with patch("mudlib.commands.play._find_story") as mock_find:
mock_find.return_value = "/fake/path/zork1.z5"
mock_find.return_value = "/fake/path/zork1.z3"
await cmd_play(player, "zork1")
@ -108,9 +108,8 @@ async def test_play_handles_dfrotz_missing(player):
with patch("mudlib.commands.play.IFSession") as MockIFSession:
MockIFSession.return_value = mock_session
# Use .z5 to test dfrotz path
with patch("mudlib.commands.play._find_story") as mock_find:
mock_find.return_value = "/fake/path/zork1.z5"
mock_find.return_value = "/fake/path/zork1.z3"
await cmd_play(player, "zork1")
@ -131,35 +130,41 @@ async def test_play_handles_dfrotz_missing(player):
@pytest.mark.asyncio
async def test_play_restores_save_if_exists(player):
"""Playing restores saved game if save file exists (via start())."""
"""Playing restores saved game if save file exists."""
from pathlib import Path
from mudlib.commands.play import cmd_play
# Mock IFSession - restore now happens in start() before thread launches
# Mock IFSession
mock_session = Mock()
restored_output = (
"restoring saved game...\r\nrestored.\r\n\r\n"
"West of House\nYou are standing in an open field."
mock_session.start = AsyncMock(return_value="Welcome to Zork!")
mock_session._do_restore = AsyncMock(
return_value="West of House\nYou are standing in an open field."
)
mock_session.start = AsyncMock(return_value=restored_output)
mock_session.save_path = Mock(spec=Path)
mock_session.save_path.exists = Mock(return_value=True)
with patch("mudlib.commands.play.IFSession") as MockIFSession:
MockIFSession.return_value = mock_session
# Use .z5 to test dfrotz path
with patch("mudlib.commands.play._find_story") as mock_find:
mock_find.return_value = "/fake/path/zork1.z5"
mock_find.return_value = "/fake/path/zork1.z3"
await cmd_play(player, "zork1")
# Verify restore was called
mock_session._do_restore.assert_called_once()
# Verify session was created and started
mock_session.start.assert_called_once()
# Verify mode was pushed
assert "if" in player.mode_stack
# Verify restored text was sent (start() returns full output with restore)
# Verify restored text was sent
calls = [call[0][0] for call in player.writer.write.call_args_list]
full_output = "".join(calls)
assert "restoring" in full_output.lower()
assert "West of House" in full_output
assert "open field" in full_output
@ -167,22 +172,28 @@ async def test_play_restores_save_if_exists(player):
@pytest.mark.asyncio
async def test_play_no_restore_if_no_save(player):
"""Playing does not restore if no save file exists."""
from pathlib import Path
from mudlib.commands.play import cmd_play
# Mock IFSession
mock_session = Mock()
mock_session.start = AsyncMock(return_value="Welcome to Zork!")
mock_session._do_restore = AsyncMock(return_value="")
mock_session.save_path = Mock(spec=Path)
mock_session.save_path.exists = Mock(return_value=False)
with patch("mudlib.commands.play.IFSession") as MockIFSession:
MockIFSession.return_value = mock_session
# Use .z5 to test dfrotz path
with patch("mudlib.commands.play._find_story") as mock_find:
mock_find.return_value = "/fake/path/zork1.z5"
mock_find.return_value = "/fake/path/zork1.z3"
await cmd_play(player, "zork1")
# Verify restore was NOT called
mock_session._do_restore.assert_not_called()
# Verify session was created and started
mock_session.start.assert_called_once()

View file

@ -1,297 +0,0 @@
"""Integration tests for Quetzal save/restore round-trip.
Tests that verify the complete save/restore pipeline works end-to-end by
generating save data with QuetzalWriter and restoring it with QuetzalParser.
Field mapping reminder:
- Quetzal return_pc for frame N caller (frame N-1) program_counter
- Quetzal varnum frame.return_addr (store variable for return value)
"""
class TestQuetzalRoundTrip:
"""Test complete save/restore cycle with real zmachine state."""
def test_basic_round_trip(self):
"""Test saving and restoring basic zmachine state."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
# Create a minimal z-machine story file (V3)
story_data = bytearray(8192)
story_data[0] = 3 # Version 3
story_data[0x02:0x04] = [0x12, 0x34] # Release number
story_data[0x04:0x06] = [0x10, 0x00] # High memory start
story_data[0x06:0x08] = [0x08, 0x00] # Initial PC
story_data[0x0E:0x10] = [0x04, 0x00] # Static memory start
story_data[0x0C:0x0E] = [0x02, 0x00] # Global variables start
story_data[0x12:0x18] = b"860101" # Serial number
story_data[0x1C:0x1E] = [0xAB, 0xCD] # Checksum
pristine_mem = ZMemory(bytes(story_data))
current_mem = ZMemory(bytes(story_data))
# Modify some bytes in dynamic memory (before static start at 0x400)
current_mem[0x100] = 0x42
current_mem[0x101] = 0x43
current_mem[0x200] = 0xFF
# Create a stack with one frame
# return_addr=5 means "store return value in local var 4"
stack_manager = ZStackManager(current_mem)
routine = ZRoutine(
start_addr=0x5000,
return_addr=5,
zmem=current_mem,
args=[],
local_vars=[0x0001, 0x0002, 0x0003],
stack=[0x1111, 0x2222],
)
stack_manager.push_routine(routine)
# Set up mock zmachine
zmachine = Mock()
zmachine._pristine_mem = pristine_mem
zmachine._mem = current_mem
zmachine._stackmanager = stack_manager
zmachine._cpu = Mock()
zmachine._cpu._program_counter = 0x0850
zmachine._opdecoder = Mock()
# SAVE
writer = QuetzalWriter(zmachine)
save_data = writer.generate_save_data()
assert save_data[:4] == b"FORM"
assert save_data[8:12] == b"IFZS"
# CORRUPT: Change the zmachine state
current_mem[0x100] = 0x99
current_mem[0x101] = 0x99
current_mem[0x200] = 0x00
stack_manager._call_stack.clear()
from mudlib.zmachine.zstackmanager import ZStackBottom
stack_manager._call_stack.append(ZStackBottom())
zmachine._cpu._program_counter = 0x9999
assert current_mem[0x100] == 0x99
assert len(stack_manager._call_stack) == 1
assert zmachine._cpu._program_counter == 0x9999
# RESTORE
parser = QuetzalParser(zmachine)
parser.load_from_bytes(save_data)
# VERIFY: Memory was restored
assert current_mem[0x100] == 0x42
assert current_mem[0x101] == 0x43
assert current_mem[0x200] == 0xFF
# VERIFY: Stack was restored
restored_stack = zmachine._stackmanager
assert len(restored_stack._call_stack) == 2 # Bottom + one frame
restored_frame = restored_stack._call_stack[1]
# return_addr is the store variable (varnum), not a PC
assert restored_frame.return_addr == 5
assert restored_frame.local_vars[:3] == [0x0001, 0x0002, 0x0003]
assert restored_frame.stack == [0x1111, 0x2222]
# VERIFY: Program counter was restored
assert zmachine._opdecoder.program_counter == 0x0850
def test_round_trip_with_multiple_frames(self):
"""Test save/restore with nested call frames."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
story_data = bytearray(8192)
story_data[0] = 3
story_data[0x02:0x04] = [0x10, 0x00]
story_data[0x04:0x06] = [0x10, 0x00]
story_data[0x06:0x08] = [0x08, 0x00]
story_data[0x0E:0x10] = [0x04, 0x00]
story_data[0x0C:0x0E] = [0x02, 0x00]
story_data[0x12:0x18] = b"860101"
story_data[0x1C:0x1E] = [0x00, 0x00]
pristine_mem = ZMemory(bytes(story_data))
current_mem = ZMemory(bytes(story_data))
# Create nested call frames with proper semantics:
# return_addr = store variable (varnum)
# program_counter = resume PC for when the next frame returns
stack_manager = ZStackManager(current_mem)
routine1 = ZRoutine(
start_addr=0x5000,
return_addr=0, # varnum 0 = push to stack
zmem=current_mem,
args=[],
local_vars=[0xAAAA, 0xBBBB],
stack=[0x1111],
)
stack_manager.push_routine(routine1)
# resume PC in routine1 after routine2 returns
routine1.program_counter = 0x5123
routine2 = ZRoutine(
start_addr=0x6000,
return_addr=3, # varnum 3 = local var 2
zmem=current_mem,
args=[],
local_vars=[0xCCCC],
stack=[0x2222, 0x3333],
)
stack_manager.push_routine(routine2)
zmachine = Mock()
zmachine._pristine_mem = pristine_mem
zmachine._mem = current_mem
zmachine._stackmanager = stack_manager
zmachine._cpu = Mock()
zmachine._cpu._program_counter = 0x0900
zmachine._opdecoder = Mock()
# Save
writer = QuetzalWriter(zmachine)
save_data = writer.generate_save_data()
# Clear stack
stack_manager._call_stack.clear()
from mudlib.zmachine.zstackmanager import ZStackBottom
stack_manager._call_stack.append(ZStackBottom())
# Restore
parser = QuetzalParser(zmachine)
parser.load_from_bytes(save_data)
# Verify both frames restored
restored_stack = zmachine._stackmanager
assert len(restored_stack._call_stack) == 3 # Bottom + two frames
frame1 = restored_stack._call_stack[1]
assert frame1.return_addr == 0 # varnum
assert frame1.local_vars[:2] == [0xAAAA, 0xBBBB]
assert frame1.stack == [0x1111]
# frame1 should have the resume PC for after frame2 returns
assert frame1.program_counter == 0x5123
frame2 = restored_stack._call_stack[2]
assert frame2.return_addr == 3 # varnum
assert frame2.local_vars[:1] == [0xCCCC]
assert frame2.stack == [0x2222, 0x3333]
def test_round_trip_preserves_unchanged_memory(self):
"""Test that unchanged memory bytes are preserved correctly."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
from mudlib.zmachine.zstackmanager import ZStackManager
story_data = bytearray(8192)
story_data[0] = 3
story_data[0x02:0x04] = [0x10, 0x00]
story_data[0x04:0x06] = [0x10, 0x00]
story_data[0x06:0x08] = [0x08, 0x00]
story_data[0x0E:0x10] = [0x04, 0x00]
story_data[0x0C:0x0E] = [0x02, 0x00]
story_data[0x12:0x18] = b"860101"
story_data[0x1C:0x1E] = [0x00, 0x00]
for i in range(0x100, 0x200):
story_data[i] = i & 0xFF
pristine_mem = ZMemory(bytes(story_data))
current_mem = ZMemory(bytes(story_data))
current_mem[0x150] = 0xFF
current_mem[0x180] = 0xAA
stack_manager = ZStackManager(current_mem)
zmachine = Mock()
zmachine._pristine_mem = pristine_mem
zmachine._mem = current_mem
zmachine._stackmanager = stack_manager
zmachine._cpu = Mock()
zmachine._cpu._program_counter = 0x0800
zmachine._opdecoder = Mock()
writer = QuetzalWriter(zmachine)
save_data = writer.generate_save_data()
for i in range(0x100, 0x200):
current_mem[i] = 0x00
parser = QuetzalParser(zmachine)
parser.load_from_bytes(save_data)
for i in range(0x100, 0x200):
if i == 0x150:
assert current_mem[i] == 0xFF
elif i == 0x180:
assert current_mem[i] == 0xAA
else:
assert current_mem[i] == (i & 0xFF)
def test_round_trip_empty_stack(self):
"""Test save/restore with no routine frames (just bottom sentinel)."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
story_data = bytearray(8192)
story_data[0] = 3
story_data[0x02:0x04] = [0x10, 0x00]
story_data[0x04:0x06] = [0x10, 0x00]
story_data[0x06:0x08] = [0x08, 0x00]
story_data[0x0E:0x10] = [0x04, 0x00]
story_data[0x0C:0x0E] = [0x02, 0x00]
story_data[0x12:0x18] = b"860101"
story_data[0x1C:0x1E] = [0x00, 0x00]
pristine_mem = ZMemory(bytes(story_data))
current_mem = ZMemory(bytes(story_data))
current_mem[0x100] = 0x42
stack_manager = ZStackManager(current_mem)
zmachine = Mock()
zmachine._pristine_mem = pristine_mem
zmachine._mem = current_mem
zmachine._stackmanager = stack_manager
zmachine._cpu = Mock()
zmachine._cpu._program_counter = 0x0800
zmachine._opdecoder = Mock()
writer = QuetzalWriter(zmachine)
save_data = writer.generate_save_data()
# Add a dummy frame to verify it gets cleared
dummy = ZRoutine(
start_addr=0x5000,
return_addr=1,
zmem=current_mem,
args=[],
local_vars=[0x9999],
stack=[],
)
stack_manager.push_routine(dummy)
parser = QuetzalParser(zmachine)
parser.load_from_bytes(save_data)
restored_stack = zmachine._stackmanager
assert len(restored_stack._call_stack) == 1
assert current_mem[0x100] == 0x42

View file

@ -1,356 +0,0 @@
"""
Tests for QuetzalWriter._generate_stks_chunk() serialization.
The Stks chunk serializes the Z-machine call stack. Each frame has:
- Bytes 0-2: return_pc (24-bit big-endian) caller's resume PC
- Byte 3: flags (bits 0-3 = num local vars, bit 4 = discard result)
- Byte 4: varnum (which variable gets return value)
- Byte 5: argflag (bitmask of supplied arguments)
- Bytes 6-7: eval_stack_size (16-bit big-endian)
- Next (num_local_vars * 2) bytes: local variables
- Next (eval_stack_size * 2) bytes: evaluation stack values
All multi-byte values are big-endian. Bottom frame has return_pc=0.
Field mapping to runtime:
- return_pc for frame N stored as program_counter on frame N-1 (the caller)
- varnum stored as return_addr on the frame (the store variable)
"""
from unittest import TestCase
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
class MockMemory:
"""Mock memory for testing."""
def __init__(self):
self.version = 5
class MockZMachine:
"""Mock z-machine with stack manager."""
def __init__(self):
self._mem = MockMemory()
self._stackmanager = ZStackManager(self._mem)
class QuetzalStksTests(TestCase):
"""Test suite for Stks chunk generation."""
def setUp(self):
self.zmachine = MockZMachine()
self.writer = QuetzalWriter(self.zmachine)
def test_empty_call_stack_generates_empty_chunk(self):
"""With only the sentinel bottom, should generate empty bytes."""
# Call stack has only ZStackBottom sentinel
chunk = self.writer._generate_stks_chunk()
self.assertEqual(chunk, b"")
def test_single_frame_serialization(self):
"""Single routine frame should serialize correctly."""
# Set up caller resume PC on ZStackBottom
sentinel = self.zmachine._stackmanager._call_stack[0]
sentinel.program_counter = 0x4200
# Create a routine with return_addr = store variable (varnum 5)
routine = ZRoutine(
start_addr=0x5000,
return_addr=5,
zmem=self._mem,
args=[],
local_vars=[0x1234, 0x5678, 0xABCD],
stack=[0x1111, 0x2222],
)
self.zmachine._stackmanager.push_routine(routine)
chunk = self.writer._generate_stks_chunk()
# return_pc comes from ZStackBottom.program_counter (0x4200)
# varnum is frame.return_addr (5)
expected = bytes(
[
0x00,
0x42,
0x00, # return_pc (from caller's program_counter)
0x03, # flags (3 local vars)
0x05, # varnum (store variable)
0x00, # argflag
0x00,
0x02, # eval_stack_size = 2
0x12,
0x34, # local_vars[0]
0x56,
0x78, # local_vars[1]
0xAB,
0xCD, # local_vars[2]
0x11,
0x11, # stack[0]
0x22,
0x22, # stack[1]
]
)
self.assertEqual(chunk, expected)
def test_multiple_frames_serialization(self):
"""Multiple nested frames should serialize in order."""
sentinel = self.zmachine._stackmanager._call_stack[0]
sentinel.program_counter = 0 # main routine has no caller
# Frame 1: outer routine (varnum=0 means push result to stack)
routine1 = ZRoutine(
start_addr=0x1000,
return_addr=0,
zmem=self._mem,
args=[],
local_vars=[0x0001],
stack=[],
)
self.zmachine._stackmanager.push_routine(routine1)
# Set routine1's resume PC (where to go after frame2 returns)
routine1.program_counter = 0x1050
# Frame 2: inner routine (varnum=3)
routine2 = ZRoutine(
start_addr=0x2000,
return_addr=3,
zmem=self._mem,
args=[],
local_vars=[0x0002, 0x0003],
stack=[0xAAAA],
)
self.zmachine._stackmanager.push_routine(routine2)
chunk = self.writer._generate_stks_chunk()
# Frame 1: return_pc from sentinel.pc (0), varnum=0
frame1 = bytes(
[
0x00,
0x00,
0x00, # return_pc = 0 (from sentinel)
0x01, # flags (1 local var)
0x00, # varnum = 0 (push to stack)
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
0x00,
0x01, # local_vars[0]
]
)
# Frame 2: return_pc from routine1.pc (0x1050), varnum=3
frame2 = bytes(
[
0x00,
0x10,
0x50, # return_pc (from routine1.program_counter)
0x02, # flags (2 local vars)
0x03, # varnum = 3
0x00, # argflag
0x00,
0x01, # eval_stack_size = 1
0x00,
0x02, # local_vars[0]
0x00,
0x03, # local_vars[1]
0xAA,
0xAA, # stack[0]
]
)
expected = frame1 + frame2
self.assertEqual(chunk, expected)
def test_frame_with_no_locals_or_stack(self):
"""Frame with no local vars or stack values should serialize correctly."""
sentinel = self.zmachine._stackmanager._call_stack[0]
sentinel.program_counter = 0x2500
routine = ZRoutine(
start_addr=0x3000,
return_addr=1,
zmem=self._mem,
args=[],
local_vars=[],
stack=[],
)
self.zmachine._stackmanager.push_routine(routine)
chunk = self.writer._generate_stks_chunk()
expected = bytes(
[
0x00,
0x25,
0x00, # return_pc (from sentinel.program_counter)
0x00, # flags (0 local vars)
0x01, # varnum = 1
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
]
)
self.assertEqual(chunk, expected)
def test_discard_result_sets_flags_bit4(self):
"""Frame with return_addr=None should set bit 4 in flags byte."""
sentinel = self.zmachine._stackmanager._call_stack[0]
sentinel.program_counter = 0
routine = ZRoutine(
start_addr=0x1000,
return_addr=None,
zmem=self._mem,
args=[],
local_vars=[0x0001, 0x0002],
stack=[],
)
self.zmachine._stackmanager.push_routine(routine)
chunk = self.writer._generate_stks_chunk()
# flags = 0x02 (2 locals) | 0x10 (discard) = 0x12
expected = bytes(
[
0x00,
0x00,
0x00, # return_pc
0x12, # flags (2 locals + discard bit)
0x00, # varnum (0 when discarding)
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
0x00,
0x01, # local_vars[0]
0x00,
0x02, # local_vars[1]
]
)
self.assertEqual(chunk, expected)
def test_round_trip_with_parser(self):
"""Generated stks bytes should parse back identically."""
from mudlib.zmachine.quetzal import QuetzalParser
sentinel = self.zmachine._stackmanager._call_stack[0]
sentinel.program_counter = 0 # main routine caller
# Create a complex stack state
routine1 = ZRoutine(
start_addr=0x1000,
return_addr=0,
zmem=self._mem,
args=[],
local_vars=[0x1111, 0x2222, 0x3333],
stack=[0xAAAA, 0xBBBB],
)
self.zmachine._stackmanager.push_routine(routine1)
routine1.program_counter = 0x1234 # resume PC for after routine2
routine2 = ZRoutine(
start_addr=0x2000,
return_addr=5,
zmem=self._mem,
args=[],
local_vars=[0x4444],
stack=[0xCCCC, 0xDDDD, 0xEEEE],
)
self.zmachine._stackmanager.push_routine(routine2)
# Generate the stks chunk
stks_bytes = self.writer._generate_stks_chunk()
# Parse it back
parser = QuetzalParser(self.zmachine)
parser._parse_stks(stks_bytes)
# Verify the stack was reconstructed correctly
# Parser creates a new stack manager, skip bottom sentinel
call_stack = self.zmachine._stackmanager._call_stack
frames = call_stack[1:]
self.assertEqual(len(frames), 2)
# Check frame 1: return_addr = varnum, caller PC on sentinel
assert isinstance(frames[0], ZRoutine)
self.assertEqual(frames[0].return_addr, 0)
self.assertEqual(frames[0].local_vars[:3], [0x1111, 0x2222, 0x3333])
self.assertEqual(frames[0].stack, [0xAAAA, 0xBBBB])
# Sentinel should have frame1's return_pc (0)
self.assertEqual(call_stack[0].program_counter, 0)
# Check frame 2: return_addr = varnum, caller PC on frame1
assert isinstance(frames[1], ZRoutine)
self.assertEqual(frames[1].return_addr, 5)
self.assertEqual(frames[1].local_vars[:1], [0x4444])
self.assertEqual(frames[1].stack, [0xCCCC, 0xDDDD, 0xEEEE])
# Frame1 should have frame2's return_pc (0x1234)
self.assertEqual(frames[0].program_counter, 0x1234)
def test_parse_return_pc_goes_to_caller(self):
"""Parser should put return_pc on the caller frame's program_counter."""
from mudlib.zmachine.quetzal import QuetzalParser
# Construct a minimal stack frame with return_pc=0x123456
stks_bytes = bytes(
[
0x12,
0x34,
0x56, # return_pc = 0x123456
0x00, # flags (0 local vars)
0x07, # varnum = 7
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
]
)
parser = QuetzalParser(self.zmachine)
parser._parse_stks(stks_bytes)
call_stack = self.zmachine._stackmanager._call_stack
frames = call_stack[1:] # skip sentinel
self.assertEqual(len(frames), 1)
# return_pc goes to caller (sentinel) program_counter
self.assertEqual(call_stack[0].program_counter, 0x123456)
# varnum goes to frame's return_addr
assert isinstance(frames[0], ZRoutine)
self.assertEqual(frames[0].return_addr, 7)
def test_parse_discard_bit_restores_none(self):
"""Parser should set return_addr=None when flags bit 4 is set."""
from mudlib.zmachine.quetzal import QuetzalParser
# flags = 0x12 = 2 locals + discard bit
stks_bytes = bytes(
[
0x00,
0x00,
0x00, # return_pc = 0
0x12, # flags (2 locals + discard)
0x00, # varnum (ignored when discarding)
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
0x00,
0x01, # local_vars[0]
0x00,
0x02, # local_vars[1]
]
)
parser = QuetzalParser(self.zmachine)
parser._parse_stks(stks_bytes)
frames = self.zmachine._stackmanager._call_stack[1:]
self.assertEqual(len(frames), 1)
self.assertIsNone(frames[0].return_addr)
self.assertEqual(frames[0].local_vars[:2], [1, 2])
@property
def _mem(self):
"""Helper to get mock memory."""
return self.zmachine._mem

View file

@ -1,406 +0,0 @@
"""Tests for QuetzalWriter Stks chunk generation.
Field mapping:
- Quetzal return_pc previous frame's program_counter (caller resume PC)
- Quetzal varnum frame.return_addr (store variable for return value)
"""
class TestStksChunkGeneration:
"""Test Stks chunk generation and serialization."""
def test_empty_stack_serialization(self):
"""Test serializing an empty stack (just the bottom sentinel)."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
assert result == b""
def test_single_frame_no_locals_no_stack(self):
"""Test serializing a single routine frame with no locals or stack values."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
# Set caller resume PC on sentinel
stack_manager._call_stack[0].program_counter = 0x1234
routine = ZRoutine(
start_addr=0x5000,
return_addr=7, # varnum: store to local var 6
zmem=zmachine._mem,
args=[],
local_vars=[],
stack=[],
)
stack_manager.push_routine(routine)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
expected = bytes(
[
0x00,
0x12,
0x34, # return_pc (from sentinel.program_counter)
0x00, # flags (0 locals)
0x07, # varnum (frame.return_addr)
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
]
)
assert result == expected
def test_single_frame_with_locals(self):
"""Test serializing a frame with local variables."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
stack_manager._call_stack[0].program_counter = 0x2000
routine = ZRoutine(
start_addr=0x5000,
return_addr=0x10, # varnum: store to global var 0x10
zmem=zmachine._mem,
args=[],
local_vars=[0x1111, 0x2222, 0x3333],
stack=[],
)
stack_manager.push_routine(routine)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
expected = bytes(
[
0x00,
0x20,
0x00, # return_pc (from sentinel.program_counter)
0x03, # flags (3 locals)
0x10, # varnum (frame.return_addr)
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
0x11,
0x11, # local_var[0]
0x22,
0x22, # local_var[1]
0x33,
0x33, # local_var[2]
]
)
assert result == expected
def test_single_frame_with_stack_values(self):
"""Test serializing a frame with evaluation stack values."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
stack_manager._call_stack[0].program_counter = 0x3000
routine = ZRoutine(
start_addr=0x5000,
return_addr=0, # varnum 0: push result to eval stack
zmem=zmachine._mem,
args=[],
local_vars=[],
stack=[0xABCD, 0xEF01],
)
stack_manager.push_routine(routine)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
expected = bytes(
[
0x00,
0x30,
0x00, # return_pc
0x00, # flags (0 locals)
0x00, # varnum (push to stack)
0x00, # argflag
0x00,
0x02, # eval_stack_size = 2
0xAB,
0xCD, # stack[0]
0xEF,
0x01, # stack[1]
]
)
assert result == expected
def test_single_frame_full(self):
"""Test serializing a frame with both locals and stack values."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
stack_manager._call_stack[0].program_counter = 0x4567
routine = ZRoutine(
start_addr=0x5000,
return_addr=2, # varnum: store to local var 1
zmem=zmachine._mem,
args=[],
local_vars=[0x0001, 0x0002],
stack=[0x1000, 0x2000, 0x3000],
)
stack_manager.push_routine(routine)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
expected = bytes(
[
0x00,
0x45,
0x67, # return_pc (from sentinel.program_counter)
0x02, # flags (2 locals)
0x02, # varnum
0x00, # argflag
0x00,
0x03, # eval_stack_size = 3
0x00,
0x01, # local_var[0]
0x00,
0x02, # local_var[1]
0x10,
0x00, # stack[0]
0x20,
0x00, # stack[1]
0x30,
0x00, # stack[2]
]
)
assert result == expected
def test_multiple_nested_frames(self):
"""Test serializing multiple nested routine frames."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
# Sentinel has return PC for frame 1
stack_manager._call_stack[0].program_counter = 0x1000
routine1 = ZRoutine(
start_addr=0x5000,
return_addr=0, # varnum: push to stack
zmem=zmachine._mem,
args=[],
local_vars=[0xAAAA],
stack=[0xBBBB],
)
stack_manager.push_routine(routine1)
# Frame1 has return PC for frame 2
routine1.program_counter = 0x2000
routine2 = ZRoutine(
start_addr=0x6000,
return_addr=5, # varnum: store to local var 4
zmem=zmachine._mem,
args=[],
local_vars=[0xCCCC],
stack=[0xDDDD],
)
stack_manager.push_routine(routine2)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
expected = bytes(
[
# Frame 1: return_pc from sentinel (0x1000), varnum=0
0x00,
0x10,
0x00, # return_pc
0x01, # flags (1 local)
0x00, # varnum
0x00, # argflag
0x00,
0x01, # eval_stack_size = 1
0xAA,
0xAA, # local_var[0]
0xBB,
0xBB, # stack[0]
# Frame 2: return_pc from routine1 (0x2000), varnum=5
0x00,
0x20,
0x00, # return_pc
0x01, # flags (1 local)
0x05, # varnum
0x00, # argflag
0x00,
0x01, # eval_stack_size = 1
0xCC,
0xCC, # local_var[0]
0xDD,
0xDD, # stack[0]
]
)
assert result == expected
def test_bottom_frame_zero_return_pc(self):
"""Test that a bottom/dummy frame with return_pc=0 is handled correctly."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
stack_manager = ZStackManager(zmachine._mem)
# Sentinel PC = 0 (main routine has no caller)
stack_manager._call_stack[0].program_counter = 0
routine = ZRoutine(
start_addr=0x5000,
return_addr=0, # varnum: push to stack
zmem=zmachine._mem,
args=[],
local_vars=[0x1234],
stack=[],
)
stack_manager.push_routine(routine)
zmachine._stackmanager = stack_manager
writer = QuetzalWriter(zmachine)
result = writer._generate_stks_chunk()
expected = bytes(
[
0x00,
0x00,
0x00, # return_pc = 0
0x01, # flags (1 local)
0x00, # varnum
0x00, # argflag
0x00,
0x00, # eval_stack_size = 0
0x12,
0x34, # local_var[0]
]
)
assert result == expected
class TestStksRoundTrip:
"""Test that Stks serialization/deserialization is symmetrical."""
def test_round_trip_serialization(self):
"""Test that we can serialize and deserialize frames correctly."""
from unittest.mock import Mock
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
zmachine = Mock()
zmachine._mem = Mock()
zmachine._mem.version = 5
original_stack = ZStackManager(zmachine._mem)
# Sentinel has return PC for frame 1
original_stack._call_stack[0].program_counter = 0
routine1 = ZRoutine(
start_addr=0x5000,
return_addr=5, # varnum: store to local var 4
zmem=zmachine._mem,
args=[],
local_vars=[0x0001, 0x0002, 0x0003],
stack=[0x1111, 0x2222],
)
original_stack.push_routine(routine1)
routine1.program_counter = 0x5678 # resume PC for after routine2
routine2 = ZRoutine(
start_addr=0x6000,
return_addr=3, # varnum: store to local var 2
zmem=zmachine._mem,
args=[],
local_vars=[0xAAAA],
stack=[0xBBBB, 0xCCCC, 0xDDDD],
)
original_stack.push_routine(routine2)
zmachine._stackmanager = original_stack
# Serialize
writer = QuetzalWriter(zmachine)
stks_data = writer._generate_stks_chunk()
# Deserialize
parser = QuetzalParser(zmachine)
parser._parse_stks(stks_data)
restored_stack = zmachine._stackmanager
assert len(restored_stack._call_stack) == 3
# Check frame 1: return_addr is varnum
frame1 = restored_stack._call_stack[1]
assert frame1.return_addr == 5
assert frame1.local_vars[:3] == [0x0001, 0x0002, 0x0003]
assert frame1.stack == [0x1111, 0x2222]
# Frame1's program_counter was set from frame2's return_pc
assert frame1.program_counter == 0x5678
# Check frame 2
frame2 = restored_stack._call_stack[2]
assert frame2.return_addr == 3
assert frame2.local_vars[:1] == [0xAAAA]
assert frame2.stack == [0xBBBB, 0xCCCC, 0xDDDD]

View file

@ -50,10 +50,6 @@ class MockStackManager:
def __init__(self):
self.stack = []
self.locals = [0] * 15
# For QuetzalWriter support - empty call stack
from mudlib.zmachine.zstackmanager import ZStackBottom
self._call_stack = [ZStackBottom()]
def push_stack(self, value):
self.stack.append(value)
@ -96,7 +92,6 @@ class MockUI:
self.screen.write = Mock()
self.keyboard_input = Mock()
self.keyboard_input.read_line = Mock()
self.filesystem = Mock()
class ZMachineOpcodeTests(TestCase):
@ -119,7 +114,6 @@ class ZMachineOpcodeTests(TestCase):
Mock(), # stream manager
self.ui,
Mock(), # lexer
zmachine=None,
)
def test_op_nop(self):
@ -457,7 +451,6 @@ class ZMachineObjectOpcodeTests(TestCase):
Mock(), # stream manager
self.ui,
Mock(), # lexer
zmachine=None,
)
def test_op_get_sibling_with_sibling(self):
@ -668,7 +661,6 @@ class ZMachineComplexOpcodeTests(TestCase):
Mock(), # stream manager
self.ui,
Mock(), # lexer
zmachine=None,
)
def test_op_sread_v3_basic_input(self):
@ -754,27 +746,8 @@ class ZMachineComplexOpcodeTests(TestCase):
# Should have called show_status once
self.assertEqual(call_count[0], 1)
def test_op_save_v3_branches_false_when_filesystem_fails(self):
"""Test save (V3) branches false when filesystem returns False."""
# Need a valid zmachine for the test to proceed
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
zmachine_mock = Mock()
zmachine_mock._mem = ZMemory(bytes(story))
zmachine_mock._pristine_mem = ZMemory(bytes(story))
zmachine_mock._cpu = self.cpu
zmachine_mock._stackmanager = self.stack
self.cpu._zmachine = zmachine_mock
# Mock filesystem to fail
self.ui.filesystem.save_game = Mock(return_value=False)
def test_op_save_v3_branches_false(self):
"""Test save (V3) branches false (QuetzalWriter not functional)."""
self.decoder.branch_condition = True
self.decoder.branch_offset = 100
old_pc = self.cpu._opdecoder.program_counter
@ -784,26 +757,8 @@ class ZMachineComplexOpcodeTests(TestCase):
# Should not have branched (test is false)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
def test_op_restore_v3_branches_false_when_filesystem_returns_none(self):
"""Test restore (V3) branches false when filesystem returns None."""
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
zmachine_mock = Mock()
zmachine_mock._mem = ZMemory(bytes(story))
zmachine_mock._pristine_mem = ZMemory(bytes(story))
zmachine_mock._cpu = self.cpu
zmachine_mock._stackmanager = self.stack
self.cpu._zmachine = zmachine_mock
# Mock filesystem to return None (user cancelled or no file)
self.ui.filesystem.restore_game = Mock(return_value=None)
def test_op_restore_v3_branches_false(self):
"""Test restore (V3) branches false (no valid save files)."""
self.decoder.branch_condition = True
self.decoder.branch_offset = 100
old_pc = self.cpu._opdecoder.program_counter
@ -813,256 +768,6 @@ class ZMachineComplexOpcodeTests(TestCase):
# Should not have branched (test is false)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
def test_op_restore_v3_branches_false_when_no_zmachine(self):
"""Test restore (V3) branches false when zmachine is not set."""
self.cpu._zmachine = None
self.decoder.branch_condition = True
self.decoder.branch_offset = 100
old_pc = self.cpu._opdecoder.program_counter
self.cpu.op_restore()
# Should not have branched (test is false)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
def test_op_restore_v3_branches_true_on_success(self):
"""Test restore (V3) branches true when restore succeeds."""
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
# Create a story with some dynamic memory
story = bytearray(1024)
story[0] = 3 # version 3
story[0x0E] = 0x04 # static memory starts at 0x0400
story[0x0F] = 0x00
# Set header values
story[0x02] = 0x12 # release high byte
story[0x03] = 0x34 # release low byte
for i, byte in enumerate(b"860509"):
story[0x12 + i] = byte
story[0x1C] = 0xAB # checksum high
story[0x1D] = 0xCD # checksum low
# Create zmachine with modified memory state
zmachine_mock = Mock()
zmachine_mock._mem = ZMemory(bytes(story))
zmachine_mock._pristine_mem = ZMemory(bytes(story))
zmachine_mock._cpu = self.cpu
zmachine_mock._stackmanager = self.stack
# Modify some dynamic memory to create a save state
zmachine_mock._mem[0x100] = 0x42
zmachine_mock._mem[0x200] = 0x99
self.cpu._zmachine = zmachine_mock
# Generate save data using QuetzalWriter
writer = QuetzalWriter(zmachine_mock)
save_data = writer.generate_save_data()
# Mock filesystem to return the save data
self.ui.filesystem.restore_game = Mock(return_value=save_data)
self.decoder.branch_condition = True
self.decoder.branch_offset = 10
old_pc = self.cpu._opdecoder.program_counter
self.cpu.op_restore()
# Should have branched (test is true)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc + 8)
def test_op_restore_v3_restores_memory_state(self):
"""Test restore (V3) correctly restores dynamic memory."""
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
# Create a story
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
story[0x02] = 0x12
story[0x03] = 0x34
for i, byte in enumerate(b"860509"):
story[0x12 + i] = byte
story[0x1C] = 0xAB
story[0x1D] = 0xCD
# Create zmachine with saved state
saved_zmachine = Mock()
saved_zmachine._mem = ZMemory(bytes(story))
saved_zmachine._pristine_mem = ZMemory(bytes(story))
saved_zmachine._cpu = self.cpu
saved_zmachine._stackmanager = self.stack
# Modify memory to create unique save state
saved_zmachine._mem[0x50] = 0xAA
saved_zmachine._mem[0x150] = 0xBB
saved_zmachine._mem[0x250] = 0xCC
# Generate save data
writer = QuetzalWriter(saved_zmachine)
save_data = writer.generate_save_data()
# Create fresh zmachine with different state
current_zmachine = Mock()
current_zmachine._mem = ZMemory(bytes(story))
current_zmachine._pristine_mem = ZMemory(bytes(story))
current_zmachine._cpu = self.cpu
current_zmachine._stackmanager = self.stack
# Different values in memory
current_zmachine._mem[0x50] = 0x11
current_zmachine._mem[0x150] = 0x22
current_zmachine._mem[0x250] = 0x33
self.cpu._zmachine = current_zmachine
# Mock filesystem to return save data
self.ui.filesystem.restore_game = Mock(return_value=save_data)
self.decoder.branch_condition = True
self.decoder.branch_offset = 10
self.cpu.op_restore()
# Memory should now match saved state
self.assertEqual(current_zmachine._mem[0x50], 0xAA)
self.assertEqual(current_zmachine._mem[0x150], 0xBB)
self.assertEqual(current_zmachine._mem[0x250], 0xCC)
def test_op_restore_v3_branches_false_on_malformed_data(self):
"""Test restore (V3) branches false when save data is malformed."""
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
zmachine_mock = Mock()
zmachine_mock._mem = ZMemory(bytes(story))
zmachine_mock._pristine_mem = ZMemory(bytes(story))
zmachine_mock._cpu = self.cpu
zmachine_mock._stackmanager = self.stack
self.cpu._zmachine = zmachine_mock
# Mock filesystem to return invalid data
self.ui.filesystem.restore_game = Mock(return_value=b"invalid data")
self.decoder.branch_condition = True
self.decoder.branch_offset = 10
old_pc = self.cpu._opdecoder.program_counter
self.cpu.op_restore()
# Should not have branched (test is false)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
def test_op_save_v3_branches_true_on_success(self):
"""Test save (V3) branches true when filesystem succeeds."""
# Create minimal zmachine mock
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3 # version 3
story[0x0E] = 0x04 # static memory starts at 0x0400
story[0x0F] = 0x00
zmachine_mock = Mock()
zmachine_mock._mem = ZMemory(bytes(story))
zmachine_mock._pristine_mem = ZMemory(bytes(story))
zmachine_mock._cpu = self.cpu
zmachine_mock._stackmanager = self.stack
# Attach zmachine to cpu
self.cpu._zmachine = zmachine_mock
# Mock filesystem to succeed
self.ui.filesystem.save_game = Mock(return_value=True)
self.decoder.branch_condition = True
self.decoder.branch_offset = 10
old_pc = self.cpu._opdecoder.program_counter
self.cpu.op_save()
# Should have branched (test is true)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc + 8)
# Filesystem should have been called with bytes
self.assertTrue(self.ui.filesystem.save_game.called)
call_args = self.ui.filesystem.save_game.call_args[0]
self.assertIsInstance(call_args[0], bytes)
def test_op_save_v3_generates_valid_iff_data(self):
"""Test save generates valid IFF/FORM/IFZS container."""
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
# Set header values
story[0x02] = 0x12 # release high byte
story[0x03] = 0x34 # release low byte
for i, byte in enumerate(b"860509"):
story[0x12 + i] = byte
story[0x1C] = 0xAB # checksum high
story[0x1D] = 0xCD # checksum low
zmachine_mock = Mock()
zmachine_mock._mem = ZMemory(bytes(story))
zmachine_mock._pristine_mem = ZMemory(bytes(story))
zmachine_mock._cpu = self.cpu
zmachine_mock._stackmanager = self.stack
self.cpu._zmachine = zmachine_mock
# Mock filesystem to capture data
captured_data = []
def capture_save(data):
captured_data.append(data)
return True
self.ui.filesystem.save_game = Mock(side_effect=capture_save)
self.decoder.branch_condition = True
self.decoder.branch_offset = 10
self.cpu.op_save()
# Verify we got data
self.assertEqual(len(captured_data), 1)
data = captured_data[0]
# Verify IFF structure
self.assertEqual(data[0:4], b"FORM") # FORM header
# Bytes 4-7 are the size (big-endian 32-bit)
self.assertEqual(data[8:12], b"IFZS") # IFZS type
# Verify chunks are present
self.assertIn(b"IFhd", data)
self.assertIn(b"CMem", data)
self.assertIn(b"Stks", data)
self.assertIn(b"ANNO", data)
def test_op_save_v3_without_zmachine_branches_false(self):
"""Test save fails gracefully when zmachine is not set."""
# Don't set zmachine
self.cpu._zmachine = None
self.decoder.branch_condition = True
old_pc = self.cpu._opdecoder.program_counter
self.cpu.op_save()
# Should not have branched
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
def test_op_input_stream_is_noop(self):
"""Test input_stream is a no-op stub."""
# Should not raise
@ -1079,204 +784,6 @@ class ZMachineComplexOpcodeTests(TestCase):
self.cpu.op_restart()
class QuetzalWriterTests(TestCase):
"""Test suite for QuetzalWriter save functionality."""
def test_generate_ifhd_chunk(self):
"""Test _generate_ifhd_chunk() produces correct 13-byte IFhd chunk."""
from mudlib.zmachine.quetzal import QuetzalWriter
# Create a mock zmachine with known header values
mock_zmachine = Mock()
mock_zmachine._mem = MockMemory()
# Set header values in memory:
# Bytes 2-3: Release number (0x1234)
mock_zmachine._mem.write_word(0x02, 0x1234)
# Bytes 0x12-0x17: Serial number (6 bytes: "860509")
serial = b"860509"
for i, byte in enumerate(serial):
mock_zmachine._mem[0x12 + i] = byte
# Bytes 0x1C-0x1D: Checksum (0xABCD)
mock_zmachine._mem.write_word(0x1C, 0xABCD)
# Set program counter
mock_cpu = Mock()
mock_cpu._program_counter = 0x123456 # 24-bit PC
mock_zmachine._cpu = mock_cpu
# Create writer and generate chunk
writer = QuetzalWriter(mock_zmachine)
chunk_data = writer._generate_ifhd_chunk()
# Verify chunk is exactly 13 bytes
self.assertEqual(len(chunk_data), 13)
# Verify release number (bytes 0-1)
self.assertEqual(chunk_data[0], 0x12)
self.assertEqual(chunk_data[1], 0x34)
# Verify serial number (bytes 2-7)
for i, expected in enumerate(serial):
self.assertEqual(chunk_data[2 + i], expected)
# Verify checksum (bytes 8-9)
self.assertEqual(chunk_data[8], 0xAB)
self.assertEqual(chunk_data[9], 0xCD)
# Verify program counter (bytes 10-12)
self.assertEqual(chunk_data[10], 0x12)
self.assertEqual(chunk_data[11], 0x34)
self.assertEqual(chunk_data[12], 0x56)
def test_cmem_all_unchanged(self):
"""Test CMem chunk with no changes (all zeros after XOR)."""
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
# Create a minimal z3 story file
story = bytearray(1024)
story[0] = 3 # version 3
story[0x0E] = 0x04 # static memory starts at 0x0400
story[0x0F] = 0x00
pristine = ZMemory(bytes(story))
current = ZMemory(bytes(story))
zmachine = Mock()
zmachine._pristine_mem = pristine
zmachine._mem = current
writer = QuetzalWriter(zmachine)
result = writer._generate_cmem_chunk()
# All identical means no output (trailing zeros omitted)
self.assertIsInstance(result, bytes)
self.assertEqual(len(result), 0)
def test_cmem_single_byte_change(self):
"""Test CMem chunk with one byte changed."""
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
pristine = ZMemory(bytes(story))
current = ZMemory(bytes(story))
current[0x0100] = 0x42
zmachine = Mock()
zmachine._pristine_mem = pristine
zmachine._mem = current
writer = QuetzalWriter(zmachine)
result = writer._generate_cmem_chunk()
self.assertIsInstance(result, bytes)
self.assertIn(0x42, result)
def test_cmem_multiple_scattered_changes(self):
"""Test CMem chunk with multiple changes across memory."""
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
pristine = ZMemory(bytes(story))
current = ZMemory(bytes(story))
current[0x0010] = 0xAA
current[0x0100] = 0xBB
current[0x0200] = 0xCC
zmachine = Mock()
zmachine._pristine_mem = pristine
zmachine._mem = current
writer = QuetzalWriter(zmachine)
result = writer._generate_cmem_chunk()
self.assertIsInstance(result, bytes)
self.assertIn(0xAA, result)
self.assertIn(0xBB, result)
self.assertIn(0xCC, result)
self.assertLess(len(result), 1024)
def test_cmem_roundtrip_with_parser(self):
"""Test that CMem output can be decoded by QuetzalParser._parse_cmem()."""
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
pristine = ZMemory(bytes(story))
current = ZMemory(bytes(story))
current[0x0050] = 0x12
current[0x0051] = 0x34
current[0x0150] = 0xAB
current[0x0300] = 0xFF
zmachine = Mock()
zmachine._pristine_mem = pristine
zmachine._mem = current
writer = QuetzalWriter(zmachine)
compressed_bytes = writer._generate_cmem_chunk()
# Create fresh memory for parsing into
restored = ZMemory(bytes(story))
restored_zmachine = Mock()
restored_zmachine._pristine_mem = pristine
restored_zmachine._mem = restored
parser = QuetzalParser(restored_zmachine)
parser._parse_cmem(compressed_bytes)
# Verify restored memory matches current memory
for addr in [0x0050, 0x0051, 0x0150, 0x0300]:
self.assertEqual(
restored[addr],
current[addr],
f"Mismatch at address 0x{addr:04X}",
)
def test_cmem_consecutive_zeros(self):
"""Test CMem encoding handles consecutive zero XOR results correctly."""
from mudlib.zmachine.quetzal import QuetzalWriter
from mudlib.zmachine.zmemory import ZMemory
story = bytearray(1024)
story[0] = 3
story[0x0E] = 0x04
story[0x0F] = 0x00
pristine = ZMemory(bytes(story))
current = ZMemory(bytes(story))
current[0x0040] = 0x11
current[0x0045] = 0x22
zmachine = Mock()
zmachine._pristine_mem = pristine
zmachine._mem = current
writer = QuetzalWriter(zmachine)
result = writer._generate_cmem_chunk()
self.assertIsInstance(result, bytes)
idx_11 = result.index(0x11)
self.assertEqual(result[idx_11 + 1], 0x00)
self.assertEqual(result[idx_11 + 2], 0x03)
self.assertEqual(result[idx_11 + 3], 0x22)
# Note: ZObjectParser methods are tested through integration tests
# with real story files, not unit tests with mock memory, as the
# interaction with ZStringFactory makes mocking complex.