Compare commits
No commits in common. "f4b7d0548bcfbfaba38d5a684e015a5403aa9465" and "8097bbcf55ae86c5c957b70ec5979b969e646ad1" have entirely different histories.
f4b7d0548b
...
8097bbcf55
18 changed files with 123 additions and 3032 deletions
|
|
@ -205,8 +205,6 @@ All V3 gaps have been resolved. sread tokenization works correctly. save/restore
|
|||
|
||||
Do they represent z-machine memory the same way? Both use bytearrays, but header parsing, object table offsets, string encoding — are these compatible enough that porting opcodes is translation, or is it a deeper rewrite?
|
||||
|
||||
UPDATE: Less urgent now that the hybrid interpreter works end-to-end for V3. The layout question mainly matters for V5 opcode porting (Lost Pig, Wizard Sniffer). The hybrid already handles all V3 memory operations correctly.
|
||||
|
||||
3. Async model
|
||||
~~~~~~~~~~~~~~
|
||||
|
||||
|
|
@ -236,16 +234,12 @@ How hard is it to add words to a z-machine dictionary at runtime? The dictionary
|
|||
7. Save/restore in the hybrid interpreter
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
RESOLVED: save/restore is now fully implemented and working. Key pieces:
|
||||
``op_save`` is a stub that always branches false (game prints "Failed."). The infrastructure is mostly there — ``TrivialFilesystem.save_game()`` prompts for a filename and writes to disk, ``QuetzalParser`` can read save files — but two pieces are missing:
|
||||
|
||||
- ``QuetzalWriter`` chunk generators implemented (``IFhd`` for header, ``CMem`` for XOR-compressed dynamic memory, ``Stks`` for stack frame serialization)
|
||||
- ``op_save`` and ``op_restore`` wired to filesystem layer via ``TrivialFilesystem``
|
||||
- round-trip tested: save game state, restore it, continue playing
|
||||
- fixed Quetzal ``Stks`` field mapping: ``return_pc`` belongs on the caller frame's ``program_counter``, not the current frame. ``varnum`` is the store variable on the current frame. round-trip tests masked this because writer and parser had the same bug symmetrically
|
||||
- fixed V3 save branch processing on restore: in-game saves store PC pointing at branch data after the save opcode (0xB5). ``_try_restore()`` detects this and calls ``_branch(True)`` to advance past it. without this, branch bytes were decoded as instructions
|
||||
- fixed restored local var padding: save files store only declared locals, runtime expects 15 slots. now zero-pads on restore
|
||||
- ``QuetzalWriter`` chunk generators (``ifhd``, ``cmem``, ``stks``) are all stubs returning ``"0"``
|
||||
- ``op_save`` doesn't collect game state or call the filesystem
|
||||
|
||||
Quetzal format is now fully supported for both reading and writing saves. Diagnostic tooling: ``scripts/zmachine_inspect.py`` for offline state inspection, instruction trace deque (last 20) auto-dumps on crash.
|
||||
To make save work: implement ``QuetzalWriter`` (XOR-compress dynamic memory against original story, serialize stack frames into Quetzal format), then wire ``op_save`` to generate the bytes and call ``self._ui.filesystem.save_game(data)``. Restore should be simpler since ``QuetzalParser`` already works — just need to wire ``op_restore`` to call ``filesystem.restore_game()`` and apply the parsed state.
|
||||
|
||||
what to do next
|
||||
---------------
|
||||
|
|
@ -260,9 +254,7 @@ Concrete next steps, roughly ordered. Update as items get done.
|
|||
|
||||
- [x] build level 1 prototype: regardless of interpreter choice, implement the terminal object, IF mode, and subprocess dfrotz path. this proves the MUD-side architecture (mode stack, spectators, save/restore) independently of the interpreter question. (done — see ``docs/how/if-terminal.txt``)
|
||||
|
||||
- [x] implement save/restore: finished ``QuetzalWriter`` chunk generators (IFhd, CMem, Stks) and wired ``op_save``/``op_restore`` to filesystem. quetzal round-trip now works — can save during gameplay, restore, and continue. also fixed parser off-by-one bug in return_pc.
|
||||
|
||||
- [x] wire embedded interpreter to MUD: connected the hybrid interpreter to the MUD's mode stack via ``EmbeddedIFSession``. .z3 files use the embedded interpreter; other formats fall back to dfrotz. save/restore works via QuetzalWriter/QuetzalParser. state inspection (room name, objects) enables level 2. found and fixed a quetzal parser bug (bit slice for local vars was 3 bits, needed 4). (done — see ``src/mudlib/embedded_if_session.py``, ``src/mudlib/zmachine/mud_ui.py``)
|
||||
- [ ] implement save/restore: finish ``QuetzalWriter`` chunk generators and wire ``op_save``/``op_restore`` to the filesystem layer. restore should be easier since ``QuetzalParser`` already works.
|
||||
|
||||
- [ ] study MojoZork's multiplayer model: read the MultiZork source for how it handles multiple players in one z-machine. document the pattern for our eventual level 4.
|
||||
|
||||
|
|
@ -281,8 +273,7 @@ What works:
|
|||
- instruction trace deque (last 20 instructions) for debugging state errors
|
||||
- smoke test: ``scripts/run_zork1.py`` runs the game headless, exercises core opcode paths
|
||||
- parser and lexer: all Zork 1 commands work (look, open mailbox, read leaflet, inventory, take, drop, navigation)
|
||||
- save/restore: full quetzal format support for persisting and restoring game state
|
||||
- the interpreter is fully playable for Zork 1
|
||||
- the interpreter is fully playable for Zork 1 (save/restore not yet wired — see open question 7)
|
||||
|
||||
What this enables:
|
||||
|
||||
|
|
@ -293,29 +284,6 @@ What this enables:
|
|||
|
||||
The step-based execution model means IF sessions can run in the async MUD game loop without blocking. Each player command advances their z-machine instance by N instructions (until output or a stopping condition). The trace deque captures the last 20 instructions for debugging unexpected state.
|
||||
|
||||
milestone — Level 2: embedded interpreter wired to MUD
|
||||
-------------------------------------------------------
|
||||
|
||||
The embedded z-machine interpreter is now connected to the MUD engine. Players can ``play zork1`` and the game runs inside the MUD process — no dfrotz subprocess needed for .z3 files.
|
||||
|
||||
What works:
|
||||
|
||||
- ``EmbeddedIFSession`` wraps the hybrid interpreter with the same interface as the dfrotz-based ``IFSession``
|
||||
- MUD ZUI components: ``MudScreen`` (buffered output), ``MudInputStream`` (thread-safe input with events), ``MudFilesystem`` (quetzal saves to disk), ``NullAudio``
|
||||
- interpreter runs in a daemon thread; ``MudInputStream`` uses ``threading.Event`` for async bridge — interpreter blocks on ``read_line()``, async side feeds input and waits for next prompt
|
||||
- save/restore via ``::save`` and ``::quit`` escape commands (QuetzalWriter), auto-restore on session start (QuetzalParser)
|
||||
- state inspection: ``get_location_name()`` reads global variable 0 (player location object), ``get_room_objects()`` walks the object tree
|
||||
- .z3 files use embedded interpreter, other formats fall back to dfrotz
|
||||
- fixed quetzal parser bug: ``_parse_stks`` bit slice was ``[0:3]`` (3 bits, max 7 locals), should be ``[0:4]`` (4 bits, max 15 locals) — Zork uses 15
|
||||
- 558 tests pass including unit tests for MUD UI components and integration tests with real zork1.z3
|
||||
|
||||
What this enables:
|
||||
|
||||
- spectators can see what room the IF player is in (``get_location_name()``)
|
||||
- MUD code can read the object tree, variables, and attributes
|
||||
- foundation for level 3 (moldable world — write z-machine state from MUD)
|
||||
- no external dependency on dfrotz for V3 games
|
||||
|
||||
related documents
|
||||
-----------------
|
||||
|
||||
|
|
|
|||
7
mud.tin
7
mud.tin
|
|
@ -12,10 +12,7 @@
|
|||
#alias {fse} {fly southeast}
|
||||
#alias {fsw} {fly southwest}
|
||||
|
||||
#NOP combat aliases (pr/pl/dr/dl/f/v are built into the MUD)
|
||||
#NOP these are extras for single-key convenience
|
||||
#alias {o} {sweep}
|
||||
#alias {r} {roundhouse}
|
||||
|
||||
#alias {reconnect} {
|
||||
#zap mud;
|
||||
#session mud localhost 6789;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,399 +0,0 @@
|
|||
#!/usr/bin/env -S uv run --script
|
||||
"""Z-Machine state inspection and debugging tool.
|
||||
|
||||
Loads a story file and optionally applies a Quetzal save, then displays
|
||||
machine state for debugging z-machine interpreter issues.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add src to path so we can import mudlib
|
||||
project_root = Path(__file__).parent.parent
|
||||
sys.path.insert(0, str(project_root / "src"))
|
||||
|
||||
from mudlib.zmachine.quetzal import ( # noqa: E402
|
||||
QuetzalError,
|
||||
QuetzalMismatchedFile,
|
||||
QuetzalParser,
|
||||
)
|
||||
from mudlib.zmachine.trivialzui import create_zui # noqa: E402
|
||||
from mudlib.zmachine.zmachine import ZMachine # noqa: E402
|
||||
|
||||
# Opcode name tables for disassembly
|
||||
OP2_NAMES = {
|
||||
1: "je",
|
||||
2: "jl",
|
||||
3: "jg",
|
||||
4: "dec_chk",
|
||||
5: "inc_chk",
|
||||
6: "jin",
|
||||
7: "test",
|
||||
8: "or",
|
||||
9: "and",
|
||||
10: "test_attr",
|
||||
11: "set_attr",
|
||||
12: "clear_attr",
|
||||
13: "store",
|
||||
14: "insert_obj",
|
||||
15: "loadw",
|
||||
16: "loadb",
|
||||
17: "get_prop",
|
||||
18: "get_prop_addr",
|
||||
19: "get_next_prop",
|
||||
20: "add",
|
||||
21: "sub",
|
||||
22: "mul",
|
||||
23: "div",
|
||||
24: "mod",
|
||||
}
|
||||
|
||||
OP1_NAMES = {
|
||||
0: "jz",
|
||||
1: "get_sibling",
|
||||
2: "get_child",
|
||||
3: "get_parent",
|
||||
4: "get_prop_len",
|
||||
5: "inc",
|
||||
6: "dec",
|
||||
7: "print_addr",
|
||||
8: "call_1s",
|
||||
9: "remove_obj",
|
||||
10: "print_obj",
|
||||
11: "ret",
|
||||
12: "jump",
|
||||
13: "print_paddr",
|
||||
14: "load",
|
||||
15: "not/call_1n",
|
||||
}
|
||||
|
||||
OP0_NAMES = {
|
||||
0: "rtrue",
|
||||
1: "rfalse",
|
||||
2: "print",
|
||||
3: "print_ret",
|
||||
4: "nop",
|
||||
5: "save",
|
||||
6: "restore",
|
||||
7: "restart",
|
||||
8: "ret_popped",
|
||||
9: "pop/catch",
|
||||
10: "quit",
|
||||
11: "new_line",
|
||||
12: "show_status",
|
||||
13: "verify",
|
||||
15: "piracy",
|
||||
}
|
||||
|
||||
VAR_NAMES = {
|
||||
0: "call",
|
||||
1: "storew",
|
||||
2: "storeb",
|
||||
3: "put_prop",
|
||||
4: "sread",
|
||||
5: "print_char",
|
||||
6: "print_num",
|
||||
7: "random",
|
||||
8: "push",
|
||||
9: "pull",
|
||||
10: "split_window",
|
||||
11: "set_window",
|
||||
19: "output_stream",
|
||||
20: "input_stream",
|
||||
21: "sound_effect",
|
||||
}
|
||||
|
||||
|
||||
def decode_opcode_class(opcode_byte):
|
||||
"""Determine opcode class from the opcode byte."""
|
||||
if opcode_byte < 0x80:
|
||||
return "2OP"
|
||||
elif opcode_byte < 0xC0:
|
||||
op_type = (opcode_byte >> 4) & 3
|
||||
if op_type == 3:
|
||||
return "0OP"
|
||||
else:
|
||||
return "1OP"
|
||||
elif opcode_byte < 0xE0:
|
||||
return "2OP"
|
||||
else:
|
||||
return "VAR"
|
||||
|
||||
|
||||
def get_opcode_name(opcode_class, opcode_num):
|
||||
"""Get the name of an opcode."""
|
||||
if opcode_class == "2OP":
|
||||
return OP2_NAMES.get(opcode_num, f"unknown_{opcode_num}")
|
||||
elif opcode_class == "1OP":
|
||||
return OP1_NAMES.get(opcode_num, f"unknown_{opcode_num}")
|
||||
elif opcode_class == "0OP":
|
||||
return OP0_NAMES.get(opcode_num, f"unknown_{opcode_num}")
|
||||
elif opcode_class == "VAR":
|
||||
return VAR_NAMES.get(opcode_num, f"unknown_{opcode_num}")
|
||||
else:
|
||||
return "unknown"
|
||||
|
||||
|
||||
def parse_operand_types(mem, pc, opcode_byte):
|
||||
"""Parse operand types without evaluating them. Returns (types, bytes_consumed)."""
|
||||
types = []
|
||||
pos = pc
|
||||
|
||||
if opcode_byte < 0x80:
|
||||
# Long form 2OP
|
||||
types.append("var" if (opcode_byte & 0x40) else "small")
|
||||
types.append("var" if (opcode_byte & 0x20) else "small")
|
||||
return types, pos - pc
|
||||
elif opcode_byte < 0xC0:
|
||||
# Short form
|
||||
op_type = (opcode_byte >> 4) & 3
|
||||
if op_type == 0:
|
||||
types.append("large")
|
||||
elif op_type == 1:
|
||||
types.append("small")
|
||||
elif op_type == 2:
|
||||
types.append("var")
|
||||
# op_type == 3 means 0OP, no operands
|
||||
return types, pos - pc
|
||||
else:
|
||||
# Variable form - read types byte
|
||||
types_byte = mem[pos]
|
||||
pos += 1
|
||||
for i in range(4):
|
||||
t = (types_byte >> (6 - i * 2)) & 3
|
||||
if t == 3:
|
||||
break
|
||||
elif t == 0:
|
||||
types.append("large")
|
||||
elif t == 1:
|
||||
types.append("small")
|
||||
elif t == 2:
|
||||
types.append("var")
|
||||
return types, pos - pc
|
||||
|
||||
|
||||
def disassemble_at(zmachine, addr, count):
|
||||
"""Disassemble count instructions starting at addr."""
|
||||
mem = zmachine._mem
|
||||
pc = addr
|
||||
|
||||
print(f"\n--- disassembly at 0x{addr:06x} ({count} instructions) ---")
|
||||
|
||||
for _ in range(count):
|
||||
if pc >= len(mem._memory):
|
||||
print(f" {pc:06x} (out of bounds)")
|
||||
break
|
||||
|
||||
start_pc = pc
|
||||
opcode_byte = mem[pc]
|
||||
pc += 1
|
||||
|
||||
# Decode opcode class and number
|
||||
if opcode_byte < 0x80:
|
||||
# Long form 2OP
|
||||
op_class = "2OP"
|
||||
op_num = opcode_byte & 0x1F
|
||||
elif opcode_byte < 0xC0:
|
||||
# Short form
|
||||
op_type = (opcode_byte >> 4) & 3
|
||||
if op_type == 3:
|
||||
op_class = "0OP"
|
||||
op_num = opcode_byte & 0x0F
|
||||
else:
|
||||
op_class = "1OP"
|
||||
op_num = opcode_byte & 0x0F
|
||||
elif opcode_byte < 0xE0:
|
||||
# Variable form 2OP
|
||||
op_class = "2OP"
|
||||
op_num = opcode_byte & 0x1F
|
||||
else:
|
||||
# Variable form VAR
|
||||
op_class = "VAR"
|
||||
op_num = opcode_byte & 0x1F
|
||||
|
||||
op_name = get_opcode_name(op_class, op_num)
|
||||
|
||||
# Parse operand types
|
||||
operand_types, type_bytes = parse_operand_types(mem, pc, opcode_byte)
|
||||
pc += type_bytes
|
||||
|
||||
# Skip past operand values (without reading them)
|
||||
operand_str_parts = []
|
||||
for ot in operand_types:
|
||||
if ot == "large":
|
||||
if pc + 1 < len(mem._memory):
|
||||
val = (mem[pc] << 8) | mem[pc + 1]
|
||||
operand_str_parts.append(f"#{val}")
|
||||
pc += 2
|
||||
elif ot == "small":
|
||||
if pc < len(mem._memory):
|
||||
operand_str_parts.append(f"#{mem[pc]}")
|
||||
pc += 1
|
||||
elif ot == "var" and pc < len(mem._memory):
|
||||
operand_str_parts.append(f"V{mem[pc]:02x}")
|
||||
pc += 1
|
||||
|
||||
operands_str = ", ".join(operand_str_parts)
|
||||
print(f" {start_pc:06x} {op_class}:{op_num:02d} {op_name}({operands_str})")
|
||||
|
||||
|
||||
def get_location_info(zmachine):
|
||||
"""Get current location object and its contents."""
|
||||
try:
|
||||
# Global variable 0 (opcode variable 0x10) is the player location
|
||||
location_obj = zmachine._mem.read_global(0x10)
|
||||
|
||||
if location_obj == 0:
|
||||
return None, []
|
||||
|
||||
obj_parser = zmachine._objectparser
|
||||
|
||||
# Try to get the location name
|
||||
try:
|
||||
location_name = obj_parser.get_shortname(location_obj)
|
||||
except Exception:
|
||||
# Invalid object number - return None
|
||||
return None, []
|
||||
|
||||
# Get objects in this location (children)
|
||||
objects = []
|
||||
child = obj_parser.get_child(location_obj)
|
||||
while child != 0:
|
||||
try:
|
||||
child_name = obj_parser.get_shortname(child)
|
||||
objects.append(child_name)
|
||||
except Exception:
|
||||
objects.append(f"object #{child}")
|
||||
child = obj_parser.get_sibling(child)
|
||||
|
||||
return (location_obj, location_name), objects
|
||||
|
||||
except Exception:
|
||||
return None, []
|
||||
|
||||
|
||||
def validate_save(zmachine, save_path):
|
||||
"""Validate a save file and print diagnostic info."""
|
||||
print("\n--- save file validation ---")
|
||||
|
||||
# Read the save file
|
||||
try:
|
||||
with open(save_path, "rb") as f:
|
||||
save_data = f.read()
|
||||
except OSError as e:
|
||||
print(f"ERROR: Cannot read save file: {e}")
|
||||
return False
|
||||
|
||||
# Parse it
|
||||
parser = QuetzalParser(zmachine)
|
||||
try:
|
||||
parser.load_from_bytes(save_data)
|
||||
except QuetzalMismatchedFile:
|
||||
print("ERROR: Save file does not match story file")
|
||||
print(" (release number, serial, or checksum mismatch)")
|
||||
return False
|
||||
except QuetzalError as e:
|
||||
print(f"ERROR: Invalid Quetzal file: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"ERROR: Failed to parse save file: {e}")
|
||||
return False
|
||||
|
||||
# Get metadata
|
||||
metadata = parser.get_last_loaded()
|
||||
|
||||
print("Save file loaded successfully")
|
||||
print(f" release: {metadata.get('release number', 'unknown')}")
|
||||
print(f" serial: {metadata.get('serial number', 'unknown')}")
|
||||
print(f" checksum: 0x{metadata.get('checksum', 0):04x}")
|
||||
print(f" PC: 0x{metadata.get('program counter', 0):06x}")
|
||||
|
||||
# Check PC is in bounds
|
||||
pc = metadata.get("program counter", 0)
|
||||
mem_size = len(zmachine._mem._memory)
|
||||
if pc >= mem_size:
|
||||
print(f" WARNING: PC 0x{pc:06x} is out of bounds (size: {mem_size})")
|
||||
return False
|
||||
|
||||
print(" PC is within story file bounds")
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Inspect z-machine state for debugging"
|
||||
)
|
||||
parser.add_argument("story", help="Path to story file (.z3/.z5/.z8)")
|
||||
parser.add_argument("--save", help="Path to Quetzal save file (.qzl)")
|
||||
parser.add_argument(
|
||||
"--disasm", type=lambda x: int(x, 0), help="Address to disassemble (hex)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--count", type=int, default=5, help="Number of instructions to disassemble"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--globals", action="store_true", help="Show first 16 global variables"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
story_path = Path(args.story)
|
||||
if not story_path.exists():
|
||||
print(f"ERROR: Story file not found: {story_path}")
|
||||
sys.exit(1)
|
||||
|
||||
# Load story
|
||||
with open(story_path, "rb") as f:
|
||||
story_bytes = f.read()
|
||||
|
||||
# Create ZMachine
|
||||
ui = create_zui()
|
||||
zmachine = ZMachine(story_bytes, ui)
|
||||
|
||||
# Load save if provided
|
||||
if args.save:
|
||||
save_path = Path(args.save)
|
||||
if not save_path.exists():
|
||||
print(f"ERROR: Save file not found: {save_path}")
|
||||
sys.exit(1)
|
||||
|
||||
if not validate_save(zmachine, save_path):
|
||||
sys.exit(1)
|
||||
|
||||
# Display state
|
||||
print("\nzmachine state")
|
||||
print("==============")
|
||||
print(f"version: {zmachine._mem.version}")
|
||||
print(f"story size: {len(story_bytes)} bytes")
|
||||
print(f"PC: 0x{zmachine._opdecoder.program_counter:06x}")
|
||||
print(f"stack: {len(zmachine._stackmanager._call_stack) - 1} frames")
|
||||
|
||||
# Location info
|
||||
location, objects = get_location_info(zmachine)
|
||||
if location:
|
||||
loc_obj, loc_name = location
|
||||
print(f"\nlocation: {loc_name} (#{loc_obj})")
|
||||
if objects:
|
||||
print("objects: " + ", ".join(objects))
|
||||
else:
|
||||
print("objects: (none)")
|
||||
else:
|
||||
print("\nlocation: (unknown)")
|
||||
|
||||
# Global variables
|
||||
if args.globals:
|
||||
print("\nglobals (first 16):")
|
||||
for i in range(16):
|
||||
global_val = zmachine._mem.read_global(0x10 + i)
|
||||
print(f" G{i:02d} (V{0x10 + i:02x}): 0x{global_val:04x} ({global_val})")
|
||||
|
||||
# Disassembly
|
||||
if args.disasm is not None:
|
||||
disassemble_at(zmachine, args.disasm, args.count)
|
||||
else:
|
||||
# Disassemble instructions at PC
|
||||
disassemble_at(zmachine, zmachine._opdecoder.program_counter, args.count)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -3,7 +3,6 @@
|
|||
import pathlib
|
||||
|
||||
from mudlib.commands import CommandDefinition, register
|
||||
from mudlib.embedded_if_session import EmbeddedIFSession
|
||||
from mudlib.if_session import IFSession, broadcast_to_spectators
|
||||
from mudlib.player import Player
|
||||
|
||||
|
|
@ -61,20 +60,8 @@ async def cmd_play(player: Player, args: str) -> None:
|
|||
await player.send(msg)
|
||||
return
|
||||
|
||||
# Ensure story_path is a Path object (for mocking compatibility)
|
||||
if not isinstance(story_path, pathlib.Path):
|
||||
story_path = pathlib.Path(story_path)
|
||||
|
||||
# Use embedded interpreter for z3 files, dfrotz for others
|
||||
if story_path.suffix == ".z3":
|
||||
try:
|
||||
session = EmbeddedIFSession(player, str(story_path), game_name)
|
||||
except (FileNotFoundError, OSError) as e:
|
||||
await player.send(f"error starting game: {e}\r\n")
|
||||
return
|
||||
else:
|
||||
session = IFSession(player, str(story_path), game_name)
|
||||
|
||||
# Create and start IF session
|
||||
session = IFSession(player, str(story_path), game_name)
|
||||
try:
|
||||
intro = await session.start()
|
||||
except FileNotFoundError:
|
||||
|
|
@ -91,9 +78,18 @@ async def cmd_play(player: Player, args: str) -> None:
|
|||
|
||||
await player.send("(type ::help for escape commands)\r\n")
|
||||
|
||||
if intro:
|
||||
# Check for saved game
|
||||
if session.save_path.exists():
|
||||
await player.send("restoring saved game...\r\n")
|
||||
restored_text = await session._do_restore()
|
||||
if restored_text:
|
||||
await player.send(restored_text + "\r\n")
|
||||
# Broadcast restored text to spectators
|
||||
spectator_msg = f"[{player.name}'s terminal]\r\n{restored_text}\r\n"
|
||||
await broadcast_to_spectators(player, spectator_msg)
|
||||
elif intro:
|
||||
await player.send(intro + "\r\n")
|
||||
# Broadcast to spectators
|
||||
# Broadcast intro to spectators
|
||||
spectator_msg = f"[{player.name}'s terminal]\r\n{intro}\r\n"
|
||||
await broadcast_to_spectators(player, spectator_msg)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,174 +0,0 @@
|
|||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
import threading
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from mudlib.if_session import IFResponse
|
||||
from mudlib.zmachine.mud_ui import create_mud_ui
|
||||
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
|
||||
from mudlib.zmachine.zcpu import ZCpuQuit, ZCpuRestart
|
||||
from mudlib.zmachine.zmachine import ZMachine
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from mudlib.player import Player
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EmbeddedIFSession:
|
||||
"""Wraps z-machine interpreter for MUD integration."""
|
||||
|
||||
def __init__(self, player: "Player", story_path: str, game_name: str = ""):
|
||||
self.player = player
|
||||
self.story_path = story_path
|
||||
self.game_name = game_name or Path(story_path).stem
|
||||
self._data_dir = Path(__file__).resolve().parents[2] / "data"
|
||||
self._thread: threading.Thread | None = None
|
||||
self._done = False
|
||||
self._error: str | None = None
|
||||
|
||||
story_bytes = Path(story_path).read_bytes()
|
||||
save_path = self.save_path
|
||||
self._ui, self._screen, self._keyboard = create_mud_ui(save_path)
|
||||
self._zmachine = ZMachine(story_bytes, self._ui)
|
||||
self._filesystem = self._ui.filesystem
|
||||
|
||||
@property
|
||||
def save_path(self) -> Path:
|
||||
safe_name = re.sub(r"[^a-zA-Z0-9_-]", "_", self.player.name)
|
||||
return self._data_dir / "if_saves" / safe_name / f"{self.game_name}.qzl"
|
||||
|
||||
def _try_restore(self) -> bool:
|
||||
"""Try to restore from save file before interpreter starts.
|
||||
|
||||
Must be called before the interpreter thread is launched.
|
||||
Returns True if state was restored successfully.
|
||||
"""
|
||||
if not self.save_path.exists():
|
||||
return False
|
||||
try:
|
||||
save_data = self.save_path.read_bytes()
|
||||
parser = QuetzalParser(self._zmachine)
|
||||
parser.load_from_bytes(save_data)
|
||||
# In V1-3, the saved PC points to branch data after the save
|
||||
# instruction. Process the branch as "save succeeded" so the
|
||||
# PC advances past it. Detect by checking for save opcode (0xB5)
|
||||
# immediately before the restored PC.
|
||||
pc = self._zmachine._opdecoder.program_counter
|
||||
if (
|
||||
self._zmachine._mem.version <= 3
|
||||
and pc > 0
|
||||
and self._zmachine._mem[pc - 1] == 0xB5
|
||||
):
|
||||
self._zmachine._cpu._branch(True)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.debug(f"Restore failed: {e}")
|
||||
return False
|
||||
|
||||
async def start(self) -> str:
|
||||
"""Start the z-machine interpreter, restoring from save if available."""
|
||||
restored = self._try_restore()
|
||||
|
||||
self._thread = threading.Thread(target=self._run_interpreter, daemon=True)
|
||||
self._thread.start()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
await loop.run_in_executor(None, self._keyboard._waiting.wait)
|
||||
|
||||
output = self._screen.flush()
|
||||
if restored:
|
||||
prefix = "restoring saved game...\r\nrestored."
|
||||
return f"{prefix}\r\n\r\n{output}" if output else prefix
|
||||
return output
|
||||
|
||||
async def handle_input(self, text: str) -> IFResponse:
|
||||
if text.lower() == "::quit":
|
||||
await self._do_save()
|
||||
return IFResponse(output="game saved.", done=True)
|
||||
|
||||
if text.lower() == "::help":
|
||||
help_text = """escape commands:
|
||||
::quit - exit the game
|
||||
::save - save game progress
|
||||
::help - show this help"""
|
||||
return IFResponse(output=help_text, done=False)
|
||||
|
||||
if text.lower() == "::save":
|
||||
confirmation = await self._do_save()
|
||||
return IFResponse(output=confirmation, done=False)
|
||||
|
||||
self._keyboard._waiting.clear()
|
||||
self._keyboard.feed(text)
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
def wait_for_next_input():
|
||||
while not self._done and not self._keyboard._waiting.is_set():
|
||||
self._keyboard._waiting.wait(timeout=0.1)
|
||||
|
||||
await loop.run_in_executor(None, wait_for_next_input)
|
||||
|
||||
output = self._screen.flush()
|
||||
if self._done and self._error:
|
||||
output = f"{output}\r\n{self._error}" if output else self._error
|
||||
return IFResponse(output=output, done=self._done)
|
||||
|
||||
async def stop(self):
|
||||
self._done = True
|
||||
if self._keyboard._waiting.is_set():
|
||||
self._keyboard.feed("")
|
||||
|
||||
def _run_interpreter(self):
|
||||
try:
|
||||
self._zmachine.run()
|
||||
except ZCpuQuit:
|
||||
logger.debug("Interpreter quit normally")
|
||||
except ZCpuRestart:
|
||||
logger.debug("Interpreter restart requested")
|
||||
except Exception as e:
|
||||
tb = traceback.format_exc()
|
||||
logger.error(f"Interpreter crashed:\n{tb}")
|
||||
self._error = f"interpreter error: {e}"
|
||||
finally:
|
||||
self._done = True
|
||||
self._keyboard._waiting.set()
|
||||
|
||||
async def _do_save(self) -> str:
|
||||
try:
|
||||
writer = QuetzalWriter(self._zmachine)
|
||||
save_data = writer.generate_save_data()
|
||||
success = self._filesystem.save_game(save_data)
|
||||
if success:
|
||||
return "saved."
|
||||
return "error: save failed"
|
||||
except Exception as e:
|
||||
return f"error: save failed ({e})"
|
||||
|
||||
def get_location_name(self) -> str | None:
|
||||
try:
|
||||
location_obj = self._zmachine._mem.read_global(0)
|
||||
if location_obj == 0:
|
||||
return None
|
||||
return self._zmachine._objectparser.get_shortname(location_obj)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def get_room_objects(self) -> list[str]:
|
||||
try:
|
||||
location_obj = self._zmachine._mem.read_global(0)
|
||||
if location_obj == 0:
|
||||
return []
|
||||
|
||||
objects = []
|
||||
child = self._zmachine._objectparser.get_child(location_obj)
|
||||
while child != 0:
|
||||
name = self._zmachine._objectparser.get_shortname(child)
|
||||
objects.append(name)
|
||||
child = self._zmachine._objectparser.get_sibling(child)
|
||||
return objects
|
||||
except Exception:
|
||||
return []
|
||||
|
|
@ -10,7 +10,6 @@ from mudlib.entity import Entity
|
|||
|
||||
if TYPE_CHECKING:
|
||||
from mudlib.editor import Editor
|
||||
from mudlib.embedded_if_session import EmbeddedIFSession
|
||||
from mudlib.if_session import IFSession
|
||||
|
||||
|
||||
|
|
@ -24,7 +23,7 @@ class Player(Entity):
|
|||
mode_stack: list[str] = field(default_factory=lambda: ["normal"])
|
||||
caps: ClientCaps = field(default_factory=lambda: ClientCaps(ansi=True))
|
||||
editor: Editor | None = None
|
||||
if_session: IFSession | EmbeddedIFSession | None = None
|
||||
if_session: IFSession | None = None
|
||||
|
||||
@property
|
||||
def mode(self) -> str:
|
||||
|
|
|
|||
|
|
@ -1,150 +0,0 @@
|
|||
import logging
|
||||
import queue
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
from . import zaudio, zfilesystem, zscreen, zstream, zui
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NullAudio(zaudio.ZAudio):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.features = {"has_more_than_a_bleep": False}
|
||||
|
||||
def play_bleep(self, bleep_type):
|
||||
pass
|
||||
|
||||
|
||||
class MudScreen(zscreen.ZScreen):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._buffer = []
|
||||
self._columns = 80
|
||||
self._rows = zscreen.INFINITE_ROWS
|
||||
self.features = {
|
||||
"has_status_line": False,
|
||||
"has_upper_window": False,
|
||||
"has_graphics_font": False,
|
||||
"has_text_colors": False,
|
||||
}
|
||||
|
||||
def write(self, string):
|
||||
self._buffer.append(string)
|
||||
|
||||
def flush(self) -> str:
|
||||
result = "".join(self._buffer)
|
||||
self._buffer.clear()
|
||||
return result
|
||||
|
||||
def split_window(self, height):
|
||||
logger.debug(f"split_window({height}) - no-op")
|
||||
|
||||
def select_window(self, window):
|
||||
logger.debug(f"select_window({window}) - no-op")
|
||||
|
||||
def set_cursor_position(self, x, y):
|
||||
logger.debug(f"set_cursor_position({x}, {y}) - no-op")
|
||||
|
||||
def erase_window(self, window=zscreen.WINDOW_LOWER, color=zscreen.COLOR_CURRENT):
|
||||
logger.debug(f"erase_window({window}, {color}) - no-op")
|
||||
|
||||
def erase_line(self):
|
||||
logger.debug("erase_line() - no-op")
|
||||
|
||||
def print_status_score_turns(self, text, score, turns):
|
||||
pass
|
||||
|
||||
def print_status_time(self, hours, minutes):
|
||||
pass
|
||||
|
||||
def set_font(self, font_number):
|
||||
if font_number == zscreen.FONT_NORMAL:
|
||||
return font_number
|
||||
return None
|
||||
|
||||
def set_text_style(self, style):
|
||||
pass
|
||||
|
||||
def set_text_color(self, foreground_color, background_color):
|
||||
pass
|
||||
|
||||
|
||||
class MudInputStream(zstream.ZInputStream):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._input_queue = queue.Queue()
|
||||
self._waiting = threading.Event()
|
||||
self._ready = threading.Event()
|
||||
self._done = False
|
||||
self.features = {"has_timed_input": False}
|
||||
|
||||
def read_line(
|
||||
self,
|
||||
original_text=None,
|
||||
max_length=0,
|
||||
terminating_characters=None,
|
||||
timed_input_routine=None,
|
||||
timed_input_interval=0,
|
||||
):
|
||||
self._waiting.set()
|
||||
self._ready.wait()
|
||||
self._ready.clear()
|
||||
self._waiting.clear()
|
||||
text = self._input_queue.get()
|
||||
if max_length > 0:
|
||||
text = text[:max_length]
|
||||
return text
|
||||
|
||||
def read_char(self, timed_input_routine=None, timed_input_interval=0):
|
||||
self._waiting.set()
|
||||
self._ready.wait()
|
||||
self._ready.clear()
|
||||
self._waiting.clear()
|
||||
text = self._input_queue.get()
|
||||
if text:
|
||||
return ord(text[0])
|
||||
return 0
|
||||
|
||||
def feed(self, text: str):
|
||||
self._input_queue.put(text)
|
||||
self._ready.set()
|
||||
|
||||
|
||||
class MudFilesystem(zfilesystem.ZFilesystem):
|
||||
def __init__(self, save_path: Path):
|
||||
self.save_path = save_path
|
||||
|
||||
def save_game(self, data, suggested_filename=None):
|
||||
try:
|
||||
self.save_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self.save_path.write_bytes(data)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save game: {e}")
|
||||
return False
|
||||
|
||||
def restore_game(self):
|
||||
if self.save_path.exists():
|
||||
try:
|
||||
return self.save_path.read_bytes()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to restore game: {e}")
|
||||
return None
|
||||
return None
|
||||
|
||||
def open_transcript_file_for_writing(self):
|
||||
return None
|
||||
|
||||
def open_transcript_file_for_reading(self):
|
||||
return None
|
||||
|
||||
|
||||
def create_mud_ui(save_path: Path) -> tuple[zui.ZUI, MudScreen, MudInputStream]:
|
||||
audio = NullAudio()
|
||||
screen = MudScreen()
|
||||
keyboard = MudInputStream()
|
||||
filesystem = MudFilesystem(save_path)
|
||||
ui = zui.ZUI(audio, screen, keyboard, filesystem)
|
||||
return ui, screen, keyboard
|
||||
|
|
@ -203,31 +203,24 @@ class QuetzalParser:
|
|||
# Read successive stack frames:
|
||||
while ptr < total_len:
|
||||
log(" Parsing stack frame...")
|
||||
return_pc = (bytes[ptr] << 16) + (bytes[ptr + 1] << 8) + bytes[ptr + 2]
|
||||
return_pc = (bytes[ptr] << 16) + (bytes[ptr + 1] << 8) + bytes[ptr + 3]
|
||||
ptr += 3
|
||||
flags_bitfield = bitfield.BitField(bytes[ptr])
|
||||
ptr += 1
|
||||
varnum = bytes[ptr]
|
||||
_varnum = bytes[ptr] ### TODO: tells us which variable gets the result
|
||||
ptr += 1
|
||||
_argflag = bytes[ptr]
|
||||
ptr += 1
|
||||
evalstack_size = (bytes[ptr] << 8) + bytes[ptr + 1]
|
||||
ptr += 2
|
||||
|
||||
# Quetzal flags bit 4: if set, routine discards its return value
|
||||
discard_result = flags_bitfield[4]
|
||||
store_var = None if discard_result else varnum
|
||||
|
||||
# read anywhere from 0 to 15 local vars, pad to 15
|
||||
num_locals = flags_bitfield[0:4]
|
||||
# read anywhere from 0 to 15 local vars
|
||||
local_vars = []
|
||||
for _i in range(num_locals):
|
||||
for _i in range(flags_bitfield[0:3]):
|
||||
var = (bytes[ptr] << 8) + bytes[ptr + 1]
|
||||
ptr += 2
|
||||
local_vars.append(var)
|
||||
# runtime expects 15 slots, save only stores declared count
|
||||
local_vars.extend([0] * (15 - num_locals))
|
||||
log(f" Found {num_locals} local vars (padded to 15)")
|
||||
log(f" Found {len(local_vars)} local vars")
|
||||
|
||||
# least recent to most recent stack values:
|
||||
stack_values = []
|
||||
|
|
@ -237,16 +230,16 @@ class QuetzalParser:
|
|||
stack_values.append(val)
|
||||
log(f" Found {len(stack_values)} local stack values")
|
||||
|
||||
# return_pc belongs on the CALLER frame (previous in the stack).
|
||||
# When this routine finishes, finish_routine() returns
|
||||
# caller.program_counter as the resume address.
|
||||
prev_frame = stackmanager._call_stack[-1]
|
||||
prev_frame.program_counter = return_pc
|
||||
### Interesting... the reconstructed stack frames have no 'start
|
||||
### address'. I guess it doesn't matter, since we only need to
|
||||
### pop back to particular return addresses to resume each
|
||||
### routine.
|
||||
|
||||
### TODO: I can exactly which of the 7 args is "supplied", but I
|
||||
### don't understand where the args *are*??
|
||||
|
||||
# store_var (varnum) is the variable that receives the return
|
||||
# value when this routine finishes — NOT the return PC.
|
||||
routine = zstackmanager.ZRoutine(
|
||||
0, store_var, self._zmachine._mem, [], local_vars, stack_values
|
||||
0, return_pc, self._zmachine._mem, [], local_vars, stack_values
|
||||
)
|
||||
stackmanager.push_routine(routine)
|
||||
log(" Added new frame to stack.")
|
||||
|
|
@ -255,12 +248,6 @@ class QuetzalParser:
|
|||
raise QuetzalStackFrameOverflow
|
||||
|
||||
self._zmachine._stackmanager = stackmanager
|
||||
# Update cached references in subsystems that store the stack manager
|
||||
# (they cache the stack at init time and won't see the replacement)
|
||||
if hasattr(self._zmachine, "_cpu"):
|
||||
self._zmachine._cpu._stackmanager = stackmanager
|
||||
if hasattr(self._zmachine, "_opdecoder"):
|
||||
self._zmachine._opdecoder._stack = stackmanager
|
||||
log(" Successfully installed all stack frames.")
|
||||
|
||||
def _parse_intd(self, data):
|
||||
|
|
@ -307,71 +294,6 @@ class QuetzalParser:
|
|||
debugging and test verification."""
|
||||
return self._last_loaded_metadata
|
||||
|
||||
def load_from_bytes(self, data):
|
||||
"""Parse Quetzal data from raw bytes (instead of a file).
|
||||
|
||||
Used by op_restore when filesystem.restore_game() returns raw bytes.
|
||||
"""
|
||||
import io
|
||||
|
||||
self._last_loaded_metadata = {}
|
||||
|
||||
if len(data) < 12:
|
||||
raise QuetzalUnrecognizedFileFormat
|
||||
|
||||
# Validate FORM header
|
||||
if data[0:4] != b"FORM":
|
||||
raise QuetzalUnrecognizedFileFormat
|
||||
|
||||
# Read total length
|
||||
self._len = (data[4] << 24) + (data[5] << 16) + (data[6] << 8) + data[7]
|
||||
log(f"Total length of FORM data is {self._len}")
|
||||
self._last_loaded_metadata["total length"] = self._len
|
||||
|
||||
# Validate IFZS type
|
||||
if data[8:12] != b"IFZS":
|
||||
raise QuetzalUnrecognizedFileFormat
|
||||
|
||||
# Create a BytesIO object to use with chunk module
|
||||
self._file = io.BytesIO(data)
|
||||
self._file.seek(12) # Skip FORM header, length, and IFZS type
|
||||
|
||||
log("Parsing chunks from byte data")
|
||||
try:
|
||||
while 1:
|
||||
c = chunk.Chunk(self._file)
|
||||
chunkname = c.getname()
|
||||
chunksize = c.getsize()
|
||||
chunk_data = c.read(chunksize)
|
||||
log(f"** Found chunk ID {chunkname}: length {chunksize}")
|
||||
self._last_loaded_metadata[chunkname] = chunksize
|
||||
|
||||
if chunkname == b"IFhd":
|
||||
self._parse_ifhd(chunk_data)
|
||||
elif chunkname == b"CMem":
|
||||
self._parse_cmem(chunk_data)
|
||||
elif chunkname == b"UMem":
|
||||
self._parse_umem(chunk_data)
|
||||
elif chunkname == b"Stks":
|
||||
self._parse_stks(chunk_data)
|
||||
elif chunkname == b"IntD":
|
||||
self._parse_intd(chunk_data)
|
||||
elif chunkname == b"AUTH":
|
||||
self._parse_auth(chunk_data)
|
||||
elif chunkname == b"(c) ":
|
||||
self._parse_copyright(chunk_data)
|
||||
elif chunkname == b"ANNO":
|
||||
self._parse_anno(chunk_data)
|
||||
else:
|
||||
# spec says to ignore and skip past unrecognized chunks
|
||||
pass
|
||||
|
||||
except EOFError:
|
||||
pass
|
||||
|
||||
self._file.close()
|
||||
log("Finished parsing Quetzal data.")
|
||||
|
||||
def load(self, savefile_path):
|
||||
"""Parse each chunk of the Quetzal file at SAVEFILE_PATH,
|
||||
initializing associated zmachine subsystems as needed."""
|
||||
|
|
@ -455,211 +377,86 @@ class QuetzalWriter:
|
|||
"""Return a chunk of type IFhd, containing metadata about the
|
||||
zmachine and story being played."""
|
||||
|
||||
mem = self._zmachine._mem
|
||||
### TODO: write this. payload must be *exactly* 13 bytes, even if
|
||||
### it means padding the program counter.
|
||||
|
||||
# Release number (2 bytes, big-endian) from header bytes 2-3
|
||||
release = mem.read_word(2)
|
||||
### Some old infocom games don't have checksums stored in header.
|
||||
### If not, generate it from the *original* story file memory
|
||||
### image and put it into this chunk. See ZMemory.generate_checksum().
|
||||
pass
|
||||
|
||||
# Serial number (6 bytes) from header bytes 0x12-0x17
|
||||
serial = bytes(mem[0x12:0x18])
|
||||
|
||||
# Checksum (2 bytes, big-endian) from header bytes 0x1C-0x1D
|
||||
checksum = mem.read_word(0x1C)
|
||||
|
||||
# Program counter (3 bytes, big-endian) - current PC
|
||||
pc = self._zmachine._cpu._program_counter
|
||||
|
||||
# Build the 13-byte chunk
|
||||
chunk_data = bytearray(13)
|
||||
|
||||
# Bytes 0-1: Release number
|
||||
chunk_data[0] = (release >> 8) & 0xFF
|
||||
chunk_data[1] = release & 0xFF
|
||||
|
||||
# Bytes 2-7: Serial number
|
||||
chunk_data[2:8] = serial
|
||||
|
||||
# Bytes 8-9: Checksum
|
||||
chunk_data[8] = (checksum >> 8) & 0xFF
|
||||
chunk_data[9] = checksum & 0xFF
|
||||
|
||||
# Bytes 10-12: Program counter (24-bit)
|
||||
chunk_data[10] = (pc >> 16) & 0xFF
|
||||
chunk_data[11] = (pc >> 8) & 0xFF
|
||||
chunk_data[12] = pc & 0xFF
|
||||
|
||||
return bytes(chunk_data)
|
||||
return "0"
|
||||
|
||||
def _generate_cmem_chunk(self):
|
||||
"""Return a compressed chunk of data representing the compressed
|
||||
image of the zmachine's main memory."""
|
||||
|
||||
pmem = self._zmachine._pristine_mem
|
||||
cmem = self._zmachine._mem
|
||||
### TODO: debug this when ready
|
||||
return "0"
|
||||
|
||||
# XOR current dynamic memory with pristine dynamic memory
|
||||
dynamic_start = pmem._dynamic_start
|
||||
dynamic_end = pmem._dynamic_end
|
||||
diffarray = []
|
||||
# XOR the original game image with the current one
|
||||
diffarray = list(self._zmachine._pristine_mem)
|
||||
for index in range(len(self._zmachine._pristine_mem._total_size)):
|
||||
diffarray[index] = (
|
||||
self._zmachine._pristine_mem[index] ^ self._zmachine._mem[index]
|
||||
)
|
||||
log(f"XOR array is {diffarray}")
|
||||
|
||||
for index in range(dynamic_start, dynamic_end + 1):
|
||||
xor_value = pmem[index] ^ cmem[index]
|
||||
diffarray.append(xor_value)
|
||||
|
||||
log(f"Generated XOR array of {len(diffarray)} bytes")
|
||||
|
||||
# Run-length encode the XOR result
|
||||
# Run-length encode the resulting list of 0's and 1's.
|
||||
result = []
|
||||
zerocounter = 0
|
||||
|
||||
for byte in diffarray:
|
||||
if byte == 0:
|
||||
for index in range(len(diffarray)):
|
||||
if diffarray[index] == 0:
|
||||
zerocounter += 1
|
||||
continue
|
||||
else:
|
||||
# Flush any pending zeros
|
||||
while zerocounter > 0:
|
||||
# Encode: 0x00 followed by count of ADDITIONAL zeros
|
||||
# Maximum count in one byte is 255, meaning 256 zeros total (1+255)
|
||||
if zerocounter > 256:
|
||||
result.append(0x00)
|
||||
result.append(0xFF) # 1 + 255 = 256 zeros
|
||||
zerocounter -= 256
|
||||
else:
|
||||
result.append(0x00)
|
||||
result.append(zerocounter - 1) # count of additional zeros
|
||||
zerocounter = 0
|
||||
|
||||
# Output non-zero byte
|
||||
result.append(byte)
|
||||
|
||||
# Don't encode trailing zeros - parser can leave them as-is
|
||||
# (per spec: "If memcounter finishes less than memlen, that's totally fine")
|
||||
|
||||
log(f"Compressed to {len(result)} bytes")
|
||||
return bytes(result)
|
||||
if zerocounter > 0:
|
||||
result.append(0)
|
||||
result.append(zerocounter)
|
||||
zerocounter = 0
|
||||
result.append(diffarray[index])
|
||||
return result
|
||||
|
||||
def _generate_stks_chunk(self):
|
||||
"""Return a stacks chunk, describing the stack state of the
|
||||
zmachine at this moment."""
|
||||
|
||||
result = bytearray()
|
||||
stackmanager = self._zmachine._stackmanager
|
||||
call_stack = stackmanager._call_stack
|
||||
|
||||
# Skip the ZStackBottom sentinel (first element)
|
||||
for i, frame in enumerate(call_stack[1:], start=1):
|
||||
num_local_vars = len(frame.local_vars)
|
||||
|
||||
# Quetzal return_pc = caller's saved program counter.
|
||||
# The previous frame (caller) stores the resume PC that
|
||||
# finish_routine() returns when this frame exits.
|
||||
prev_frame = call_stack[i - 1]
|
||||
return_pc = prev_frame.program_counter or 0
|
||||
result.append((return_pc >> 16) & 0xFF)
|
||||
result.append((return_pc >> 8) & 0xFF)
|
||||
result.append(return_pc & 0xFF)
|
||||
|
||||
# Write flags byte (bits 0-3 = num local vars,
|
||||
# bit 4 = discard return value)
|
||||
flags = num_local_vars & 0x0F
|
||||
if frame.return_addr is None:
|
||||
flags |= 0x10
|
||||
result.append(flags)
|
||||
|
||||
# Write varnum (which variable gets the return value)
|
||||
varnum = frame.return_addr if frame.return_addr is not None else 0
|
||||
result.append(varnum & 0xFF)
|
||||
|
||||
# Write argflag (bitmask of supplied arguments)
|
||||
# TODO: track this properly, for now use 0
|
||||
result.append(0)
|
||||
|
||||
# Write eval_stack_size as 16-bit big-endian
|
||||
eval_stack_size = len(frame.stack)
|
||||
result.append((eval_stack_size >> 8) & 0xFF)
|
||||
result.append(eval_stack_size & 0xFF)
|
||||
|
||||
# Write local variables (16-bit big-endian each)
|
||||
for local_var in frame.local_vars:
|
||||
result.append((local_var >> 8) & 0xFF)
|
||||
result.append(local_var & 0xFF)
|
||||
|
||||
# Write evaluation stack values (16-bit big-endian each)
|
||||
for stack_val in frame.stack:
|
||||
result.append((stack_val >> 8) & 0xFF)
|
||||
result.append(stack_val & 0xFF)
|
||||
|
||||
return bytes(result)
|
||||
### TODO: write this
|
||||
return "0"
|
||||
|
||||
def _generate_anno_chunk(self):
|
||||
"""Return an annotation chunk, containing metadata about the ZVM
|
||||
interpreter which created the savefile."""
|
||||
|
||||
### TODO: write this
|
||||
return b"0"
|
||||
return "0"
|
||||
|
||||
# --------- Public APIs -----------
|
||||
|
||||
def generate_save_data(self):
|
||||
"""Generate complete Quetzal save data as bytes (IFF/FORM/IFZS container).
|
||||
|
||||
Returns bytes representing the complete save file in IFF format.
|
||||
"""
|
||||
log("Generating Quetzal save data")
|
||||
|
||||
# Generate all chunks
|
||||
ifhd_chunk = self._generate_ifhd_chunk()
|
||||
cmem_chunk = self._generate_cmem_chunk()
|
||||
stks_chunk = self._generate_stks_chunk()
|
||||
anno_chunk = self._generate_anno_chunk()
|
||||
|
||||
# Build IFF container with proper chunk headers
|
||||
result = bytearray()
|
||||
|
||||
# Helper to write a chunk with its header
|
||||
def write_chunk(chunk_id, chunk_data):
|
||||
result.extend(chunk_id.encode("ascii"))
|
||||
size = len(chunk_data)
|
||||
result.append((size >> 24) & 0xFF)
|
||||
result.append((size >> 16) & 0xFF)
|
||||
result.append((size >> 8) & 0xFF)
|
||||
result.append(size & 0xFF)
|
||||
result.extend(chunk_data)
|
||||
# IFF chunks must be padded to even byte boundaries
|
||||
if size % 2 == 1:
|
||||
result.append(0) # padding byte
|
||||
log(f" Added {chunk_id} chunk ({size} bytes)")
|
||||
|
||||
# Write nested chunks
|
||||
write_chunk("IFhd", ifhd_chunk)
|
||||
write_chunk("CMem", cmem_chunk)
|
||||
write_chunk("Stks", stks_chunk)
|
||||
write_chunk("ANNO", anno_chunk)
|
||||
|
||||
# Calculate total size (everything after FORM header + size field)
|
||||
total_size = len(result) + 4 # +4 for "IFZS"
|
||||
|
||||
# Build final FORM container
|
||||
container = bytearray()
|
||||
container.extend(b"FORM")
|
||||
container.append((total_size >> 24) & 0xFF)
|
||||
container.append((total_size >> 16) & 0xFF)
|
||||
container.append((total_size >> 8) & 0xFF)
|
||||
container.append(total_size & 0xFF)
|
||||
container.extend(b"IFZS")
|
||||
container.extend(result)
|
||||
|
||||
log(f"Generated {len(container)} bytes of save data")
|
||||
return bytes(container)
|
||||
|
||||
def write(self, savefile_path):
|
||||
"""Write the current zmachine state to a new Quetzal-file at
|
||||
SAVEFILE_PATH."""
|
||||
|
||||
log(f"Attempting to write game-state to '{savefile_path}'")
|
||||
data = self.generate_save_data()
|
||||
self._file = open(savefile_path, "w") # noqa: SIM115
|
||||
|
||||
with open(savefile_path, "wb") as f:
|
||||
f.write(data)
|
||||
ifhd_chunk = self._generate_ifhd_chunk()
|
||||
cmem_chunk = self._generate_cmem_chunk()
|
||||
stks_chunk = self._generate_stks_chunk()
|
||||
anno_chunk = self._generate_anno_chunk()
|
||||
|
||||
_total_chunk_size = (
|
||||
len(ifhd_chunk) + len(cmem_chunk) + len(stks_chunk) + len(anno_chunk)
|
||||
)
|
||||
|
||||
# Write main FORM chunk to hold other chunks
|
||||
self._file.write("FORM")
|
||||
### TODO: self._file_write(total_chunk_size) -- spread it over 4 bytes
|
||||
self._file.write("IFZS")
|
||||
|
||||
# Write nested chunks.
|
||||
for chunk_data in (ifhd_chunk, cmem_chunk, stks_chunk, anno_chunk):
|
||||
self._file.write(chunk_data)
|
||||
log("Wrote a chunk.")
|
||||
self._file.close()
|
||||
log("Done writing game-state to savefile.")
|
||||
|
|
|
|||
|
|
@ -44,16 +44,7 @@ class ZCpuRestart(ZCpuError):
|
|||
|
||||
class ZCpu:
|
||||
def __init__(
|
||||
self,
|
||||
zmem,
|
||||
zopdecoder,
|
||||
zstack,
|
||||
zobjects,
|
||||
zstring,
|
||||
zstreammanager,
|
||||
zui,
|
||||
zlexer,
|
||||
zmachine=None,
|
||||
self, zmem, zopdecoder, zstack, zobjects, zstring, zstreammanager, zui, zlexer
|
||||
):
|
||||
self._memory = zmem
|
||||
self._opdecoder = zopdecoder
|
||||
|
|
@ -63,14 +54,8 @@ class ZCpu:
|
|||
self._streammanager = zstreammanager
|
||||
self._ui = zui
|
||||
self._lexer = zlexer
|
||||
self._zmachine = zmachine
|
||||
self._trace = deque(maxlen=20)
|
||||
|
||||
@property
|
||||
def _program_counter(self):
|
||||
"""Return the current program counter value."""
|
||||
return self._opdecoder.program_counter
|
||||
|
||||
def _get_handler(self, opcode_class, opcode_number):
|
||||
try:
|
||||
opcode_decl = self.opcodes[opcode_class][opcode_number]
|
||||
|
|
@ -236,7 +221,8 @@ class ZCpu:
|
|||
except (ZCpuQuit, ZCpuRestart):
|
||||
# Normal control flow - don't dump trace
|
||||
raise
|
||||
except Exception:
|
||||
except ZCpuError:
|
||||
# All other ZCpu errors - dump trace for debugging
|
||||
self._dump_trace()
|
||||
raise
|
||||
return True
|
||||
|
|
@ -557,26 +543,9 @@ class ZCpu:
|
|||
def op_save(self, *args):
|
||||
"""Save game state to file (V3 - branch on success).
|
||||
|
||||
Uses QuetzalWriter to generate save data in IFF/FORM/IFZS format,
|
||||
then calls the filesystem to write it. Branches true on success,
|
||||
false on failure.
|
||||
Currently always fails because QuetzalWriter is not yet functional.
|
||||
"""
|
||||
if self._zmachine is None:
|
||||
# Can't save without zmachine reference
|
||||
self._branch(False)
|
||||
return
|
||||
|
||||
from .quetzal import QuetzalWriter
|
||||
|
||||
try:
|
||||
writer = QuetzalWriter(self._zmachine)
|
||||
save_data = writer.generate_save_data()
|
||||
success = self._ui.filesystem.save_game(save_data)
|
||||
self._branch(success)
|
||||
except Exception as e:
|
||||
# Any error during save process = failure
|
||||
log(f"Save failed with exception: {e}")
|
||||
self._branch(False)
|
||||
self._branch(False)
|
||||
|
||||
def op_save_v4(self, *args):
|
||||
"""TODO: Write docstring here."""
|
||||
|
|
@ -585,41 +554,10 @@ class ZCpu:
|
|||
def op_restore(self, *args):
|
||||
"""Restore game state from file (V3 - branch on success).
|
||||
|
||||
Uses QuetzalParser to load save data from filesystem,
|
||||
validates it matches current story, and restores memory/stack/PC.
|
||||
Branches true on success, false on failure.
|
||||
Currently always fails because QuetzalWriter is not yet functional,
|
||||
so there are no valid save files to restore.
|
||||
"""
|
||||
if self._zmachine is None:
|
||||
# Can't restore without zmachine reference
|
||||
self._branch(False)
|
||||
return
|
||||
|
||||
from .quetzal import QuetzalParser
|
||||
|
||||
try:
|
||||
# Get save data from filesystem
|
||||
save_data = self._ui.filesystem.restore_game()
|
||||
if save_data is None:
|
||||
# User cancelled or no save file available
|
||||
self._branch(False)
|
||||
return
|
||||
|
||||
# Parse the save data
|
||||
parser = QuetzalParser(self._zmachine)
|
||||
parser.load_from_bytes(save_data)
|
||||
|
||||
# QuetzalParser already:
|
||||
# - Validated IFhd matches current story (release/serial/checksum)
|
||||
# - Replaced dynamic memory via _parse_cmem or _parse_umem
|
||||
# - Replaced stack manager via _parse_stks
|
||||
# - Set program counter via _parse_ifhd
|
||||
|
||||
# Success!
|
||||
self._branch(True)
|
||||
except Exception as e:
|
||||
# Any error during restore process = failure
|
||||
log(f"Restore failed with exception: {e}")
|
||||
self._branch(False)
|
||||
self._branch(False)
|
||||
|
||||
def op_restore_v4(self, *args):
|
||||
"""TODO: Write docstring here."""
|
||||
|
|
|
|||
|
|
@ -44,7 +44,6 @@ class ZMachine:
|
|||
self._stream_manager,
|
||||
self._ui,
|
||||
self._lexer,
|
||||
zmachine=self,
|
||||
)
|
||||
|
||||
# --------- Public APIs -----------
|
||||
|
|
|
|||
|
|
@ -184,15 +184,10 @@ class ZMemory:
|
|||
def _check_bounds(self, index):
|
||||
if isinstance(index, slice):
|
||||
start, stop = index.start, index.stop
|
||||
# For slices, stop can be _total_size since slicing is exclusive
|
||||
if not (
|
||||
(0 <= start < self._total_size) and (0 <= stop <= self._total_size)
|
||||
):
|
||||
raise ZMemoryOutOfBounds
|
||||
else:
|
||||
start, stop = index, index
|
||||
if not ((0 <= start < self._total_size) and (0 <= stop < self._total_size)):
|
||||
raise ZMemoryOutOfBounds
|
||||
if not ((0 <= start < self._total_size) and (0 <= stop < self._total_size)):
|
||||
raise ZMemoryOutOfBounds
|
||||
|
||||
def _check_static(self, index):
|
||||
"""Throw error if INDEX is within the static-memory area."""
|
||||
|
|
|
|||
|
|
@ -153,11 +153,6 @@ class ZStackManager:
|
|||
"Remove and return value from the top of the data stack."
|
||||
|
||||
current_routine = self._call_stack[-1]
|
||||
if not current_routine.stack:
|
||||
frame_idx = len(self._call_stack) - 1
|
||||
ra = getattr(current_routine, "return_addr", "N/A")
|
||||
pc = getattr(current_routine, "program_counter", 0)
|
||||
raise ZStackPopError(f"frame {frame_idx}, return_addr={ra}, pc=0x{pc:06x}")
|
||||
return current_routine.stack.pop()
|
||||
|
||||
def get_stack_frame_index(self):
|
||||
|
|
|
|||
|
|
@ -1,329 +0,0 @@
|
|||
"""Tests for embedded z-machine MUD integration.
|
||||
|
||||
Tests the MUD UI components and EmbeddedIFSession integration with real zork1.z3.
|
||||
"""
|
||||
|
||||
import threading
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from mudlib.zmachine.mud_ui import MudFilesystem, MudInputStream, MudScreen
|
||||
|
||||
ZORK_PATH = Path(__file__).parent.parent / "content" / "stories" / "zork1.z3"
|
||||
requires_zork = pytest.mark.skipif(not ZORK_PATH.exists(), reason="zork1.z3 not found")
|
||||
|
||||
|
||||
@dataclass
|
||||
class MockWriter:
|
||||
def write(self, data):
|
||||
pass
|
||||
|
||||
async def drain(self):
|
||||
pass
|
||||
|
||||
|
||||
# Unit tests for MUD UI components
|
||||
|
||||
|
||||
def test_mud_screen_captures_output():
|
||||
"""MudScreen captures written text and flush returns it."""
|
||||
screen = MudScreen()
|
||||
screen.write("Hello ")
|
||||
screen.write("world!")
|
||||
output = screen.flush()
|
||||
assert output == "Hello world!"
|
||||
|
||||
|
||||
def test_mud_screen_flush_clears_buffer():
|
||||
"""MudScreen flush clears buffer, second flush returns empty."""
|
||||
screen = MudScreen()
|
||||
screen.write("test")
|
||||
first = screen.flush()
|
||||
assert first == "test"
|
||||
|
||||
second = screen.flush()
|
||||
assert second == ""
|
||||
|
||||
|
||||
def test_mud_input_stream_feed_and_read():
|
||||
"""MudInputStream feed and read_line work with threading."""
|
||||
stream = MudInputStream()
|
||||
result = []
|
||||
|
||||
def reader():
|
||||
result.append(stream.read_line())
|
||||
|
||||
t = threading.Thread(target=reader)
|
||||
t.start()
|
||||
# Wait for stream to signal it's waiting
|
||||
stream._waiting.wait(timeout=2)
|
||||
stream.feed("hello")
|
||||
t.join(timeout=2)
|
||||
assert result == ["hello"]
|
||||
|
||||
|
||||
def test_mud_filesystem_save_restore(tmp_path):
|
||||
"""MudFilesystem save and restore bytes correctly."""
|
||||
save_path = tmp_path / "test.qzl"
|
||||
filesystem = MudFilesystem(save_path)
|
||||
|
||||
test_data = b"\x01\x02\x03\x04\x05"
|
||||
success = filesystem.save_game(test_data)
|
||||
assert success
|
||||
assert save_path.exists()
|
||||
|
||||
restored = filesystem.restore_game()
|
||||
assert restored == test_data
|
||||
|
||||
|
||||
# Integration tests with real zork1.z3
|
||||
|
||||
|
||||
@requires_zork
|
||||
@pytest.mark.asyncio
|
||||
async def test_embedded_session_start():
|
||||
"""EmbeddedIFSession starts and returns intro containing game info."""
|
||||
from mudlib.embedded_if_session import EmbeddedIFSession
|
||||
from mudlib.player import Player
|
||||
|
||||
mock_writer = MockWriter()
|
||||
player = Player(name="tester", x=5, y=5, writer=mock_writer)
|
||||
|
||||
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
|
||||
# Clean up any existing save to get a fresh start
|
||||
if session.save_path.exists():
|
||||
session.save_path.unlink()
|
||||
|
||||
intro = await session.start()
|
||||
|
||||
assert intro is not None
|
||||
assert len(intro) > 0
|
||||
# Intro should contain game title or location
|
||||
assert "ZORK" in intro or "West of House" in intro
|
||||
|
||||
|
||||
@requires_zork
|
||||
@pytest.mark.asyncio
|
||||
async def test_embedded_session_handle_input():
|
||||
"""EmbeddedIFSession handles input and returns response."""
|
||||
from mudlib.embedded_if_session import EmbeddedIFSession
|
||||
from mudlib.player import Player
|
||||
|
||||
mock_writer = MockWriter()
|
||||
player = Player(name="tester", x=5, y=5, writer=mock_writer)
|
||||
|
||||
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
|
||||
# Clean up any existing save to get a fresh start
|
||||
if session.save_path.exists():
|
||||
session.save_path.unlink()
|
||||
|
||||
await session.start()
|
||||
|
||||
response = await session.handle_input("look")
|
||||
|
||||
assert response is not None
|
||||
assert response.done is False
|
||||
assert len(response.output) > 0
|
||||
# Looking should describe the starting location
|
||||
assert "West of House" in response.output or "house" in response.output
|
||||
|
||||
|
||||
@requires_zork
|
||||
@pytest.mark.asyncio
|
||||
async def test_embedded_session_escape_help():
|
||||
"""EmbeddedIFSession ::help returns help text."""
|
||||
from mudlib.embedded_if_session import EmbeddedIFSession
|
||||
from mudlib.player import Player
|
||||
|
||||
mock_writer = MockWriter()
|
||||
player = Player(name="tester", x=5, y=5, writer=mock_writer)
|
||||
|
||||
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
|
||||
await session.start()
|
||||
|
||||
response = await session.handle_input("::help")
|
||||
|
||||
assert response.done is False
|
||||
assert "::quit" in response.output
|
||||
assert "::save" in response.output
|
||||
assert "::help" in response.output
|
||||
|
||||
|
||||
@requires_zork
|
||||
@pytest.mark.asyncio
|
||||
async def test_embedded_session_escape_quit():
|
||||
"""EmbeddedIFSession ::quit returns done=True."""
|
||||
from mudlib.embedded_if_session import EmbeddedIFSession
|
||||
from mudlib.player import Player
|
||||
|
||||
mock_writer = MockWriter()
|
||||
player = Player(name="tester", x=5, y=5, writer=mock_writer)
|
||||
|
||||
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
|
||||
await session.start()
|
||||
|
||||
response = await session.handle_input("::quit")
|
||||
|
||||
assert response.done is True
|
||||
assert "saved" in response.output.lower()
|
||||
|
||||
|
||||
@requires_zork
|
||||
@pytest.mark.asyncio
|
||||
async def test_embedded_session_location_name():
|
||||
"""EmbeddedIFSession get_location_name returns location after input."""
|
||||
from mudlib.embedded_if_session import EmbeddedIFSession
|
||||
from mudlib.player import Player
|
||||
|
||||
mock_writer = MockWriter()
|
||||
player = Player(name="tester", x=5, y=5, writer=mock_writer)
|
||||
|
||||
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
|
||||
await session.start()
|
||||
|
||||
# Send a command to advance game state
|
||||
await session.handle_input("look")
|
||||
|
||||
location = session.get_location_name()
|
||||
|
||||
# Location may be None or a string depending on game state
|
||||
assert location is None or isinstance(location, str)
|
||||
|
||||
|
||||
@requires_zork
|
||||
@pytest.mark.asyncio
|
||||
async def test_embedded_session_room_objects():
|
||||
"""EmbeddedIFSession get_room_objects returns a list after start."""
|
||||
from mudlib.embedded_if_session import EmbeddedIFSession
|
||||
from mudlib.player import Player
|
||||
|
||||
mock_writer = MockWriter()
|
||||
player = Player(name="tester", x=5, y=5, writer=mock_writer)
|
||||
|
||||
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
|
||||
await session.start()
|
||||
|
||||
objects = session.get_room_objects()
|
||||
|
||||
assert isinstance(objects, list)
|
||||
# Zork1 starting location usually has some objects
|
||||
assert len(objects) >= 0 # May or may not have visible objects initially
|
||||
|
||||
|
||||
@requires_zork
|
||||
@pytest.mark.asyncio
|
||||
async def test_embedded_session_try_restore_before_thread():
|
||||
"""_try_restore() is called synchronously before interpreter thread starts."""
|
||||
from unittest.mock import patch
|
||||
|
||||
from mudlib.embedded_if_session import EmbeddedIFSession
|
||||
from mudlib.player import Player
|
||||
|
||||
mock_writer = MockWriter()
|
||||
player = Player(name="tester", x=5, y=5, writer=mock_writer)
|
||||
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
|
||||
|
||||
# Create a save file
|
||||
if session.save_path.exists():
|
||||
session.save_path.unlink()
|
||||
session.save_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
# Write a minimal valid save (header only, won't actually restore correctly)
|
||||
session.save_path.write_bytes(b"FORM\x00\x00\x00\x08IFZSQUTZ\x00\x00\x00\x00")
|
||||
|
||||
call_order = []
|
||||
|
||||
original_try_restore = session._try_restore
|
||||
original_run_interpreter = session._run_interpreter
|
||||
|
||||
def track_try_restore():
|
||||
call_order.append("try_restore")
|
||||
return original_try_restore()
|
||||
|
||||
def track_run_interpreter():
|
||||
call_order.append("run_interpreter")
|
||||
original_run_interpreter()
|
||||
|
||||
with (
|
||||
patch.object(session, "_try_restore", side_effect=track_try_restore),
|
||||
patch.object(session, "_run_interpreter", side_effect=track_run_interpreter),
|
||||
):
|
||||
await session.start()
|
||||
|
||||
# Verify _try_restore was called before _run_interpreter
|
||||
assert call_order[0] == "try_restore"
|
||||
assert call_order[1] == "run_interpreter"
|
||||
|
||||
await session.stop()
|
||||
if session.save_path.exists():
|
||||
session.save_path.unlink()
|
||||
|
||||
|
||||
@requires_zork
|
||||
@pytest.mark.asyncio
|
||||
async def test_embedded_session_no_restore_without_save():
|
||||
"""start() does not restore when no save file exists."""
|
||||
from mudlib.embedded_if_session import EmbeddedIFSession
|
||||
from mudlib.player import Player
|
||||
|
||||
mock_writer = MockWriter()
|
||||
player = Player(name="nosaveplayer", writer=mock_writer, x=0, y=0)
|
||||
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
|
||||
|
||||
# Ensure no save file exists
|
||||
if session.save_path.exists():
|
||||
session.save_path.unlink()
|
||||
|
||||
intro = await session.start()
|
||||
|
||||
# Should NOT contain restore message
|
||||
assert "restoring" not in intro.lower()
|
||||
# Should contain normal game intro
|
||||
assert "ZORK" in intro or "West of House" in intro
|
||||
|
||||
await session.stop()
|
||||
|
||||
|
||||
@requires_zork
|
||||
@pytest.mark.asyncio
|
||||
async def test_embedded_session_save_and_restore():
|
||||
"""Save a game, create new session, restore it via start()."""
|
||||
from mudlib.embedded_if_session import EmbeddedIFSession
|
||||
from mudlib.player import Player
|
||||
|
||||
mock_writer = MockWriter()
|
||||
# Start first session
|
||||
player = Player(name="testplayer", writer=mock_writer, x=0, y=0)
|
||||
session = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
|
||||
# Clean up any existing save to get a fresh start
|
||||
if session.save_path.exists():
|
||||
session.save_path.unlink()
|
||||
|
||||
await session.start()
|
||||
|
||||
# Do something to change state
|
||||
await session.handle_input("open mailbox")
|
||||
|
||||
# Save
|
||||
save_result = await session.handle_input("::save")
|
||||
assert "saved" in save_result.output.lower()
|
||||
await session.stop()
|
||||
|
||||
# Start new session - should auto-restore via start()
|
||||
# start() calls _try_restore() BEFORE launching the interpreter thread
|
||||
session2 = EmbeddedIFSession(player, str(ZORK_PATH), "zork1")
|
||||
intro = await session2.start()
|
||||
# Should contain restore message prefixed to output
|
||||
assert "restoring saved game" in intro.lower()
|
||||
assert "restored" in intro.lower()
|
||||
|
||||
# The game state should reflect the restored state
|
||||
# (location may differ after restore, just verify it works)
|
||||
response = await session2.handle_input("look")
|
||||
assert response.output # Should get some output
|
||||
await session2.stop()
|
||||
|
||||
# Clean up save file
|
||||
if session2.save_path.exists():
|
||||
session2.save_path.unlink()
|
||||
|
|
@ -70,9 +70,9 @@ async def test_play_enters_if_mode(player):
|
|||
with patch("mudlib.commands.play.IFSession") as MockIFSession:
|
||||
MockIFSession.return_value = mock_session
|
||||
|
||||
# Use .z5 to test dfrotz path
|
||||
# Ensure story file exists check passes
|
||||
with patch("mudlib.commands.play._find_story") as mock_find:
|
||||
mock_find.return_value = "/fake/path/zork1.z5"
|
||||
mock_find.return_value = "/fake/path/zork1.z3"
|
||||
|
||||
await cmd_play(player, "zork1")
|
||||
|
||||
|
|
@ -108,9 +108,8 @@ async def test_play_handles_dfrotz_missing(player):
|
|||
with patch("mudlib.commands.play.IFSession") as MockIFSession:
|
||||
MockIFSession.return_value = mock_session
|
||||
|
||||
# Use .z5 to test dfrotz path
|
||||
with patch("mudlib.commands.play._find_story") as mock_find:
|
||||
mock_find.return_value = "/fake/path/zork1.z5"
|
||||
mock_find.return_value = "/fake/path/zork1.z3"
|
||||
|
||||
await cmd_play(player, "zork1")
|
||||
|
||||
|
|
@ -131,35 +130,41 @@ async def test_play_handles_dfrotz_missing(player):
|
|||
|
||||
@pytest.mark.asyncio
|
||||
async def test_play_restores_save_if_exists(player):
|
||||
"""Playing restores saved game if save file exists (via start())."""
|
||||
"""Playing restores saved game if save file exists."""
|
||||
from pathlib import Path
|
||||
|
||||
from mudlib.commands.play import cmd_play
|
||||
|
||||
# Mock IFSession - restore now happens in start() before thread launches
|
||||
# Mock IFSession
|
||||
mock_session = Mock()
|
||||
restored_output = (
|
||||
"restoring saved game...\r\nrestored.\r\n\r\n"
|
||||
"West of House\nYou are standing in an open field."
|
||||
mock_session.start = AsyncMock(return_value="Welcome to Zork!")
|
||||
mock_session._do_restore = AsyncMock(
|
||||
return_value="West of House\nYou are standing in an open field."
|
||||
)
|
||||
mock_session.start = AsyncMock(return_value=restored_output)
|
||||
mock_session.save_path = Mock(spec=Path)
|
||||
mock_session.save_path.exists = Mock(return_value=True)
|
||||
|
||||
with patch("mudlib.commands.play.IFSession") as MockIFSession:
|
||||
MockIFSession.return_value = mock_session
|
||||
|
||||
# Use .z5 to test dfrotz path
|
||||
with patch("mudlib.commands.play._find_story") as mock_find:
|
||||
mock_find.return_value = "/fake/path/zork1.z5"
|
||||
mock_find.return_value = "/fake/path/zork1.z3"
|
||||
|
||||
await cmd_play(player, "zork1")
|
||||
|
||||
# Verify restore was called
|
||||
mock_session._do_restore.assert_called_once()
|
||||
|
||||
# Verify session was created and started
|
||||
mock_session.start.assert_called_once()
|
||||
|
||||
# Verify mode was pushed
|
||||
assert "if" in player.mode_stack
|
||||
|
||||
# Verify restored text was sent (start() returns full output with restore)
|
||||
# Verify restored text was sent
|
||||
calls = [call[0][0] for call in player.writer.write.call_args_list]
|
||||
full_output = "".join(calls)
|
||||
assert "restoring" in full_output.lower()
|
||||
assert "West of House" in full_output
|
||||
assert "open field" in full_output
|
||||
|
||||
|
|
@ -167,22 +172,28 @@ async def test_play_restores_save_if_exists(player):
|
|||
@pytest.mark.asyncio
|
||||
async def test_play_no_restore_if_no_save(player):
|
||||
"""Playing does not restore if no save file exists."""
|
||||
from pathlib import Path
|
||||
|
||||
from mudlib.commands.play import cmd_play
|
||||
|
||||
# Mock IFSession
|
||||
mock_session = Mock()
|
||||
mock_session.start = AsyncMock(return_value="Welcome to Zork!")
|
||||
mock_session._do_restore = AsyncMock(return_value="")
|
||||
mock_session.save_path = Mock(spec=Path)
|
||||
mock_session.save_path.exists = Mock(return_value=False)
|
||||
|
||||
with patch("mudlib.commands.play.IFSession") as MockIFSession:
|
||||
MockIFSession.return_value = mock_session
|
||||
|
||||
# Use .z5 to test dfrotz path
|
||||
with patch("mudlib.commands.play._find_story") as mock_find:
|
||||
mock_find.return_value = "/fake/path/zork1.z5"
|
||||
mock_find.return_value = "/fake/path/zork1.z3"
|
||||
|
||||
await cmd_play(player, "zork1")
|
||||
|
||||
# Verify restore was NOT called
|
||||
mock_session._do_restore.assert_not_called()
|
||||
|
||||
# Verify session was created and started
|
||||
mock_session.start.assert_called_once()
|
||||
|
||||
|
|
|
|||
|
|
@ -1,297 +0,0 @@
|
|||
"""Integration tests for Quetzal save/restore round-trip.
|
||||
|
||||
Tests that verify the complete save/restore pipeline works end-to-end by
|
||||
generating save data with QuetzalWriter and restoring it with QuetzalParser.
|
||||
|
||||
Field mapping reminder:
|
||||
- Quetzal return_pc for frame N → caller (frame N-1) program_counter
|
||||
- Quetzal varnum → frame.return_addr (store variable for return value)
|
||||
"""
|
||||
|
||||
|
||||
class TestQuetzalRoundTrip:
|
||||
"""Test complete save/restore cycle with real zmachine state."""
|
||||
|
||||
def test_basic_round_trip(self):
|
||||
"""Test saving and restoring basic zmachine state."""
|
||||
from unittest.mock import Mock
|
||||
|
||||
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
|
||||
from mudlib.zmachine.zmemory import ZMemory
|
||||
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
|
||||
|
||||
# Create a minimal z-machine story file (V3)
|
||||
story_data = bytearray(8192)
|
||||
story_data[0] = 3 # Version 3
|
||||
story_data[0x02:0x04] = [0x12, 0x34] # Release number
|
||||
story_data[0x04:0x06] = [0x10, 0x00] # High memory start
|
||||
story_data[0x06:0x08] = [0x08, 0x00] # Initial PC
|
||||
story_data[0x0E:0x10] = [0x04, 0x00] # Static memory start
|
||||
story_data[0x0C:0x0E] = [0x02, 0x00] # Global variables start
|
||||
story_data[0x12:0x18] = b"860101" # Serial number
|
||||
story_data[0x1C:0x1E] = [0xAB, 0xCD] # Checksum
|
||||
|
||||
pristine_mem = ZMemory(bytes(story_data))
|
||||
current_mem = ZMemory(bytes(story_data))
|
||||
|
||||
# Modify some bytes in dynamic memory (before static start at 0x400)
|
||||
current_mem[0x100] = 0x42
|
||||
current_mem[0x101] = 0x43
|
||||
current_mem[0x200] = 0xFF
|
||||
|
||||
# Create a stack with one frame
|
||||
# return_addr=5 means "store return value in local var 4"
|
||||
stack_manager = ZStackManager(current_mem)
|
||||
routine = ZRoutine(
|
||||
start_addr=0x5000,
|
||||
return_addr=5,
|
||||
zmem=current_mem,
|
||||
args=[],
|
||||
local_vars=[0x0001, 0x0002, 0x0003],
|
||||
stack=[0x1111, 0x2222],
|
||||
)
|
||||
stack_manager.push_routine(routine)
|
||||
|
||||
# Set up mock zmachine
|
||||
zmachine = Mock()
|
||||
zmachine._pristine_mem = pristine_mem
|
||||
zmachine._mem = current_mem
|
||||
zmachine._stackmanager = stack_manager
|
||||
zmachine._cpu = Mock()
|
||||
zmachine._cpu._program_counter = 0x0850
|
||||
zmachine._opdecoder = Mock()
|
||||
|
||||
# SAVE
|
||||
writer = QuetzalWriter(zmachine)
|
||||
save_data = writer.generate_save_data()
|
||||
|
||||
assert save_data[:4] == b"FORM"
|
||||
assert save_data[8:12] == b"IFZS"
|
||||
|
||||
# CORRUPT: Change the zmachine state
|
||||
current_mem[0x100] = 0x99
|
||||
current_mem[0x101] = 0x99
|
||||
current_mem[0x200] = 0x00
|
||||
stack_manager._call_stack.clear()
|
||||
from mudlib.zmachine.zstackmanager import ZStackBottom
|
||||
|
||||
stack_manager._call_stack.append(ZStackBottom())
|
||||
zmachine._cpu._program_counter = 0x9999
|
||||
|
||||
assert current_mem[0x100] == 0x99
|
||||
assert len(stack_manager._call_stack) == 1
|
||||
assert zmachine._cpu._program_counter == 0x9999
|
||||
|
||||
# RESTORE
|
||||
parser = QuetzalParser(zmachine)
|
||||
parser.load_from_bytes(save_data)
|
||||
|
||||
# VERIFY: Memory was restored
|
||||
assert current_mem[0x100] == 0x42
|
||||
assert current_mem[0x101] == 0x43
|
||||
assert current_mem[0x200] == 0xFF
|
||||
|
||||
# VERIFY: Stack was restored
|
||||
restored_stack = zmachine._stackmanager
|
||||
assert len(restored_stack._call_stack) == 2 # Bottom + one frame
|
||||
restored_frame = restored_stack._call_stack[1]
|
||||
# return_addr is the store variable (varnum), not a PC
|
||||
assert restored_frame.return_addr == 5
|
||||
assert restored_frame.local_vars[:3] == [0x0001, 0x0002, 0x0003]
|
||||
assert restored_frame.stack == [0x1111, 0x2222]
|
||||
|
||||
# VERIFY: Program counter was restored
|
||||
assert zmachine._opdecoder.program_counter == 0x0850
|
||||
|
||||
def test_round_trip_with_multiple_frames(self):
|
||||
"""Test save/restore with nested call frames."""
|
||||
from unittest.mock import Mock
|
||||
|
||||
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
|
||||
from mudlib.zmachine.zmemory import ZMemory
|
||||
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
|
||||
|
||||
story_data = bytearray(8192)
|
||||
story_data[0] = 3
|
||||
story_data[0x02:0x04] = [0x10, 0x00]
|
||||
story_data[0x04:0x06] = [0x10, 0x00]
|
||||
story_data[0x06:0x08] = [0x08, 0x00]
|
||||
story_data[0x0E:0x10] = [0x04, 0x00]
|
||||
story_data[0x0C:0x0E] = [0x02, 0x00]
|
||||
story_data[0x12:0x18] = b"860101"
|
||||
story_data[0x1C:0x1E] = [0x00, 0x00]
|
||||
|
||||
pristine_mem = ZMemory(bytes(story_data))
|
||||
current_mem = ZMemory(bytes(story_data))
|
||||
|
||||
# Create nested call frames with proper semantics:
|
||||
# return_addr = store variable (varnum)
|
||||
# program_counter = resume PC for when the next frame returns
|
||||
stack_manager = ZStackManager(current_mem)
|
||||
|
||||
routine1 = ZRoutine(
|
||||
start_addr=0x5000,
|
||||
return_addr=0, # varnum 0 = push to stack
|
||||
zmem=current_mem,
|
||||
args=[],
|
||||
local_vars=[0xAAAA, 0xBBBB],
|
||||
stack=[0x1111],
|
||||
)
|
||||
stack_manager.push_routine(routine1)
|
||||
# resume PC in routine1 after routine2 returns
|
||||
routine1.program_counter = 0x5123
|
||||
|
||||
routine2 = ZRoutine(
|
||||
start_addr=0x6000,
|
||||
return_addr=3, # varnum 3 = local var 2
|
||||
zmem=current_mem,
|
||||
args=[],
|
||||
local_vars=[0xCCCC],
|
||||
stack=[0x2222, 0x3333],
|
||||
)
|
||||
stack_manager.push_routine(routine2)
|
||||
|
||||
zmachine = Mock()
|
||||
zmachine._pristine_mem = pristine_mem
|
||||
zmachine._mem = current_mem
|
||||
zmachine._stackmanager = stack_manager
|
||||
zmachine._cpu = Mock()
|
||||
zmachine._cpu._program_counter = 0x0900
|
||||
zmachine._opdecoder = Mock()
|
||||
|
||||
# Save
|
||||
writer = QuetzalWriter(zmachine)
|
||||
save_data = writer.generate_save_data()
|
||||
|
||||
# Clear stack
|
||||
stack_manager._call_stack.clear()
|
||||
from mudlib.zmachine.zstackmanager import ZStackBottom
|
||||
|
||||
stack_manager._call_stack.append(ZStackBottom())
|
||||
|
||||
# Restore
|
||||
parser = QuetzalParser(zmachine)
|
||||
parser.load_from_bytes(save_data)
|
||||
|
||||
# Verify both frames restored
|
||||
restored_stack = zmachine._stackmanager
|
||||
assert len(restored_stack._call_stack) == 3 # Bottom + two frames
|
||||
|
||||
frame1 = restored_stack._call_stack[1]
|
||||
assert frame1.return_addr == 0 # varnum
|
||||
assert frame1.local_vars[:2] == [0xAAAA, 0xBBBB]
|
||||
assert frame1.stack == [0x1111]
|
||||
# frame1 should have the resume PC for after frame2 returns
|
||||
assert frame1.program_counter == 0x5123
|
||||
|
||||
frame2 = restored_stack._call_stack[2]
|
||||
assert frame2.return_addr == 3 # varnum
|
||||
assert frame2.local_vars[:1] == [0xCCCC]
|
||||
assert frame2.stack == [0x2222, 0x3333]
|
||||
|
||||
def test_round_trip_preserves_unchanged_memory(self):
|
||||
"""Test that unchanged memory bytes are preserved correctly."""
|
||||
from unittest.mock import Mock
|
||||
|
||||
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
|
||||
from mudlib.zmachine.zmemory import ZMemory
|
||||
from mudlib.zmachine.zstackmanager import ZStackManager
|
||||
|
||||
story_data = bytearray(8192)
|
||||
story_data[0] = 3
|
||||
story_data[0x02:0x04] = [0x10, 0x00]
|
||||
story_data[0x04:0x06] = [0x10, 0x00]
|
||||
story_data[0x06:0x08] = [0x08, 0x00]
|
||||
story_data[0x0E:0x10] = [0x04, 0x00]
|
||||
story_data[0x0C:0x0E] = [0x02, 0x00]
|
||||
story_data[0x12:0x18] = b"860101"
|
||||
story_data[0x1C:0x1E] = [0x00, 0x00]
|
||||
|
||||
for i in range(0x100, 0x200):
|
||||
story_data[i] = i & 0xFF
|
||||
|
||||
pristine_mem = ZMemory(bytes(story_data))
|
||||
current_mem = ZMemory(bytes(story_data))
|
||||
|
||||
current_mem[0x150] = 0xFF
|
||||
current_mem[0x180] = 0xAA
|
||||
|
||||
stack_manager = ZStackManager(current_mem)
|
||||
|
||||
zmachine = Mock()
|
||||
zmachine._pristine_mem = pristine_mem
|
||||
zmachine._mem = current_mem
|
||||
zmachine._stackmanager = stack_manager
|
||||
zmachine._cpu = Mock()
|
||||
zmachine._cpu._program_counter = 0x0800
|
||||
zmachine._opdecoder = Mock()
|
||||
|
||||
writer = QuetzalWriter(zmachine)
|
||||
save_data = writer.generate_save_data()
|
||||
|
||||
for i in range(0x100, 0x200):
|
||||
current_mem[i] = 0x00
|
||||
|
||||
parser = QuetzalParser(zmachine)
|
||||
parser.load_from_bytes(save_data)
|
||||
|
||||
for i in range(0x100, 0x200):
|
||||
if i == 0x150:
|
||||
assert current_mem[i] == 0xFF
|
||||
elif i == 0x180:
|
||||
assert current_mem[i] == 0xAA
|
||||
else:
|
||||
assert current_mem[i] == (i & 0xFF)
|
||||
|
||||
def test_round_trip_empty_stack(self):
|
||||
"""Test save/restore with no routine frames (just bottom sentinel)."""
|
||||
from unittest.mock import Mock
|
||||
|
||||
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
|
||||
from mudlib.zmachine.zmemory import ZMemory
|
||||
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
|
||||
|
||||
story_data = bytearray(8192)
|
||||
story_data[0] = 3
|
||||
story_data[0x02:0x04] = [0x10, 0x00]
|
||||
story_data[0x04:0x06] = [0x10, 0x00]
|
||||
story_data[0x06:0x08] = [0x08, 0x00]
|
||||
story_data[0x0E:0x10] = [0x04, 0x00]
|
||||
story_data[0x0C:0x0E] = [0x02, 0x00]
|
||||
story_data[0x12:0x18] = b"860101"
|
||||
story_data[0x1C:0x1E] = [0x00, 0x00]
|
||||
|
||||
pristine_mem = ZMemory(bytes(story_data))
|
||||
current_mem = ZMemory(bytes(story_data))
|
||||
current_mem[0x100] = 0x42
|
||||
|
||||
stack_manager = ZStackManager(current_mem)
|
||||
|
||||
zmachine = Mock()
|
||||
zmachine._pristine_mem = pristine_mem
|
||||
zmachine._mem = current_mem
|
||||
zmachine._stackmanager = stack_manager
|
||||
zmachine._cpu = Mock()
|
||||
zmachine._cpu._program_counter = 0x0800
|
||||
zmachine._opdecoder = Mock()
|
||||
|
||||
writer = QuetzalWriter(zmachine)
|
||||
save_data = writer.generate_save_data()
|
||||
|
||||
# Add a dummy frame to verify it gets cleared
|
||||
dummy = ZRoutine(
|
||||
start_addr=0x5000,
|
||||
return_addr=1,
|
||||
zmem=current_mem,
|
||||
args=[],
|
||||
local_vars=[0x9999],
|
||||
stack=[],
|
||||
)
|
||||
stack_manager.push_routine(dummy)
|
||||
|
||||
parser = QuetzalParser(zmachine)
|
||||
parser.load_from_bytes(save_data)
|
||||
|
||||
restored_stack = zmachine._stackmanager
|
||||
assert len(restored_stack._call_stack) == 1
|
||||
assert current_mem[0x100] == 0x42
|
||||
|
|
@ -1,356 +0,0 @@
|
|||
"""
|
||||
Tests for QuetzalWriter._generate_stks_chunk() serialization.
|
||||
|
||||
The Stks chunk serializes the Z-machine call stack. Each frame has:
|
||||
- Bytes 0-2: return_pc (24-bit big-endian) — caller's resume PC
|
||||
- Byte 3: flags (bits 0-3 = num local vars, bit 4 = discard result)
|
||||
- Byte 4: varnum (which variable gets return value)
|
||||
- Byte 5: argflag (bitmask of supplied arguments)
|
||||
- Bytes 6-7: eval_stack_size (16-bit big-endian)
|
||||
- Next (num_local_vars * 2) bytes: local variables
|
||||
- Next (eval_stack_size * 2) bytes: evaluation stack values
|
||||
|
||||
All multi-byte values are big-endian. Bottom frame has return_pc=0.
|
||||
|
||||
Field mapping to runtime:
|
||||
- return_pc for frame N → stored as program_counter on frame N-1 (the caller)
|
||||
- varnum → stored as return_addr on the frame (the store variable)
|
||||
"""
|
||||
|
||||
from unittest import TestCase
|
||||
|
||||
from mudlib.zmachine.quetzal import QuetzalWriter
|
||||
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
|
||||
|
||||
|
||||
class MockMemory:
|
||||
"""Mock memory for testing."""
|
||||
|
||||
def __init__(self):
|
||||
self.version = 5
|
||||
|
||||
|
||||
class MockZMachine:
|
||||
"""Mock z-machine with stack manager."""
|
||||
|
||||
def __init__(self):
|
||||
self._mem = MockMemory()
|
||||
self._stackmanager = ZStackManager(self._mem)
|
||||
|
||||
|
||||
class QuetzalStksTests(TestCase):
|
||||
"""Test suite for Stks chunk generation."""
|
||||
|
||||
def setUp(self):
|
||||
self.zmachine = MockZMachine()
|
||||
self.writer = QuetzalWriter(self.zmachine)
|
||||
|
||||
def test_empty_call_stack_generates_empty_chunk(self):
|
||||
"""With only the sentinel bottom, should generate empty bytes."""
|
||||
# Call stack has only ZStackBottom sentinel
|
||||
chunk = self.writer._generate_stks_chunk()
|
||||
self.assertEqual(chunk, b"")
|
||||
|
||||
def test_single_frame_serialization(self):
|
||||
"""Single routine frame should serialize correctly."""
|
||||
# Set up caller resume PC on ZStackBottom
|
||||
sentinel = self.zmachine._stackmanager._call_stack[0]
|
||||
sentinel.program_counter = 0x4200
|
||||
|
||||
# Create a routine with return_addr = store variable (varnum 5)
|
||||
routine = ZRoutine(
|
||||
start_addr=0x5000,
|
||||
return_addr=5,
|
||||
zmem=self._mem,
|
||||
args=[],
|
||||
local_vars=[0x1234, 0x5678, 0xABCD],
|
||||
stack=[0x1111, 0x2222],
|
||||
)
|
||||
self.zmachine._stackmanager.push_routine(routine)
|
||||
|
||||
chunk = self.writer._generate_stks_chunk()
|
||||
|
||||
# return_pc comes from ZStackBottom.program_counter (0x4200)
|
||||
# varnum is frame.return_addr (5)
|
||||
expected = bytes(
|
||||
[
|
||||
0x00,
|
||||
0x42,
|
||||
0x00, # return_pc (from caller's program_counter)
|
||||
0x03, # flags (3 local vars)
|
||||
0x05, # varnum (store variable)
|
||||
0x00, # argflag
|
||||
0x00,
|
||||
0x02, # eval_stack_size = 2
|
||||
0x12,
|
||||
0x34, # local_vars[0]
|
||||
0x56,
|
||||
0x78, # local_vars[1]
|
||||
0xAB,
|
||||
0xCD, # local_vars[2]
|
||||
0x11,
|
||||
0x11, # stack[0]
|
||||
0x22,
|
||||
0x22, # stack[1]
|
||||
]
|
||||
)
|
||||
self.assertEqual(chunk, expected)
|
||||
|
||||
def test_multiple_frames_serialization(self):
|
||||
"""Multiple nested frames should serialize in order."""
|
||||
sentinel = self.zmachine._stackmanager._call_stack[0]
|
||||
sentinel.program_counter = 0 # main routine has no caller
|
||||
|
||||
# Frame 1: outer routine (varnum=0 means push result to stack)
|
||||
routine1 = ZRoutine(
|
||||
start_addr=0x1000,
|
||||
return_addr=0,
|
||||
zmem=self._mem,
|
||||
args=[],
|
||||
local_vars=[0x0001],
|
||||
stack=[],
|
||||
)
|
||||
self.zmachine._stackmanager.push_routine(routine1)
|
||||
# Set routine1's resume PC (where to go after frame2 returns)
|
||||
routine1.program_counter = 0x1050
|
||||
|
||||
# Frame 2: inner routine (varnum=3)
|
||||
routine2 = ZRoutine(
|
||||
start_addr=0x2000,
|
||||
return_addr=3,
|
||||
zmem=self._mem,
|
||||
args=[],
|
||||
local_vars=[0x0002, 0x0003],
|
||||
stack=[0xAAAA],
|
||||
)
|
||||
self.zmachine._stackmanager.push_routine(routine2)
|
||||
|
||||
chunk = self.writer._generate_stks_chunk()
|
||||
|
||||
# Frame 1: return_pc from sentinel.pc (0), varnum=0
|
||||
frame1 = bytes(
|
||||
[
|
||||
0x00,
|
||||
0x00,
|
||||
0x00, # return_pc = 0 (from sentinel)
|
||||
0x01, # flags (1 local var)
|
||||
0x00, # varnum = 0 (push to stack)
|
||||
0x00, # argflag
|
||||
0x00,
|
||||
0x00, # eval_stack_size = 0
|
||||
0x00,
|
||||
0x01, # local_vars[0]
|
||||
]
|
||||
)
|
||||
# Frame 2: return_pc from routine1.pc (0x1050), varnum=3
|
||||
frame2 = bytes(
|
||||
[
|
||||
0x00,
|
||||
0x10,
|
||||
0x50, # return_pc (from routine1.program_counter)
|
||||
0x02, # flags (2 local vars)
|
||||
0x03, # varnum = 3
|
||||
0x00, # argflag
|
||||
0x00,
|
||||
0x01, # eval_stack_size = 1
|
||||
0x00,
|
||||
0x02, # local_vars[0]
|
||||
0x00,
|
||||
0x03, # local_vars[1]
|
||||
0xAA,
|
||||
0xAA, # stack[0]
|
||||
]
|
||||
)
|
||||
expected = frame1 + frame2
|
||||
self.assertEqual(chunk, expected)
|
||||
|
||||
def test_frame_with_no_locals_or_stack(self):
|
||||
"""Frame with no local vars or stack values should serialize correctly."""
|
||||
sentinel = self.zmachine._stackmanager._call_stack[0]
|
||||
sentinel.program_counter = 0x2500
|
||||
|
||||
routine = ZRoutine(
|
||||
start_addr=0x3000,
|
||||
return_addr=1,
|
||||
zmem=self._mem,
|
||||
args=[],
|
||||
local_vars=[],
|
||||
stack=[],
|
||||
)
|
||||
self.zmachine._stackmanager.push_routine(routine)
|
||||
|
||||
chunk = self.writer._generate_stks_chunk()
|
||||
|
||||
expected = bytes(
|
||||
[
|
||||
0x00,
|
||||
0x25,
|
||||
0x00, # return_pc (from sentinel.program_counter)
|
||||
0x00, # flags (0 local vars)
|
||||
0x01, # varnum = 1
|
||||
0x00, # argflag
|
||||
0x00,
|
||||
0x00, # eval_stack_size = 0
|
||||
]
|
||||
)
|
||||
self.assertEqual(chunk, expected)
|
||||
|
||||
def test_discard_result_sets_flags_bit4(self):
|
||||
"""Frame with return_addr=None should set bit 4 in flags byte."""
|
||||
sentinel = self.zmachine._stackmanager._call_stack[0]
|
||||
sentinel.program_counter = 0
|
||||
|
||||
routine = ZRoutine(
|
||||
start_addr=0x1000,
|
||||
return_addr=None,
|
||||
zmem=self._mem,
|
||||
args=[],
|
||||
local_vars=[0x0001, 0x0002],
|
||||
stack=[],
|
||||
)
|
||||
self.zmachine._stackmanager.push_routine(routine)
|
||||
|
||||
chunk = self.writer._generate_stks_chunk()
|
||||
|
||||
# flags = 0x02 (2 locals) | 0x10 (discard) = 0x12
|
||||
expected = bytes(
|
||||
[
|
||||
0x00,
|
||||
0x00,
|
||||
0x00, # return_pc
|
||||
0x12, # flags (2 locals + discard bit)
|
||||
0x00, # varnum (0 when discarding)
|
||||
0x00, # argflag
|
||||
0x00,
|
||||
0x00, # eval_stack_size = 0
|
||||
0x00,
|
||||
0x01, # local_vars[0]
|
||||
0x00,
|
||||
0x02, # local_vars[1]
|
||||
]
|
||||
)
|
||||
self.assertEqual(chunk, expected)
|
||||
|
||||
def test_round_trip_with_parser(self):
|
||||
"""Generated stks bytes should parse back identically."""
|
||||
from mudlib.zmachine.quetzal import QuetzalParser
|
||||
|
||||
sentinel = self.zmachine._stackmanager._call_stack[0]
|
||||
sentinel.program_counter = 0 # main routine caller
|
||||
|
||||
# Create a complex stack state
|
||||
routine1 = ZRoutine(
|
||||
start_addr=0x1000,
|
||||
return_addr=0,
|
||||
zmem=self._mem,
|
||||
args=[],
|
||||
local_vars=[0x1111, 0x2222, 0x3333],
|
||||
stack=[0xAAAA, 0xBBBB],
|
||||
)
|
||||
self.zmachine._stackmanager.push_routine(routine1)
|
||||
routine1.program_counter = 0x1234 # resume PC for after routine2
|
||||
|
||||
routine2 = ZRoutine(
|
||||
start_addr=0x2000,
|
||||
return_addr=5,
|
||||
zmem=self._mem,
|
||||
args=[],
|
||||
local_vars=[0x4444],
|
||||
stack=[0xCCCC, 0xDDDD, 0xEEEE],
|
||||
)
|
||||
self.zmachine._stackmanager.push_routine(routine2)
|
||||
|
||||
# Generate the stks chunk
|
||||
stks_bytes = self.writer._generate_stks_chunk()
|
||||
|
||||
# Parse it back
|
||||
parser = QuetzalParser(self.zmachine)
|
||||
parser._parse_stks(stks_bytes)
|
||||
|
||||
# Verify the stack was reconstructed correctly
|
||||
# Parser creates a new stack manager, skip bottom sentinel
|
||||
call_stack = self.zmachine._stackmanager._call_stack
|
||||
frames = call_stack[1:]
|
||||
self.assertEqual(len(frames), 2)
|
||||
|
||||
# Check frame 1: return_addr = varnum, caller PC on sentinel
|
||||
assert isinstance(frames[0], ZRoutine)
|
||||
self.assertEqual(frames[0].return_addr, 0)
|
||||
self.assertEqual(frames[0].local_vars[:3], [0x1111, 0x2222, 0x3333])
|
||||
self.assertEqual(frames[0].stack, [0xAAAA, 0xBBBB])
|
||||
# Sentinel should have frame1's return_pc (0)
|
||||
self.assertEqual(call_stack[0].program_counter, 0)
|
||||
|
||||
# Check frame 2: return_addr = varnum, caller PC on frame1
|
||||
assert isinstance(frames[1], ZRoutine)
|
||||
self.assertEqual(frames[1].return_addr, 5)
|
||||
self.assertEqual(frames[1].local_vars[:1], [0x4444])
|
||||
self.assertEqual(frames[1].stack, [0xCCCC, 0xDDDD, 0xEEEE])
|
||||
# Frame1 should have frame2's return_pc (0x1234)
|
||||
self.assertEqual(frames[0].program_counter, 0x1234)
|
||||
|
||||
def test_parse_return_pc_goes_to_caller(self):
|
||||
"""Parser should put return_pc on the caller frame's program_counter."""
|
||||
from mudlib.zmachine.quetzal import QuetzalParser
|
||||
|
||||
# Construct a minimal stack frame with return_pc=0x123456
|
||||
stks_bytes = bytes(
|
||||
[
|
||||
0x12,
|
||||
0x34,
|
||||
0x56, # return_pc = 0x123456
|
||||
0x00, # flags (0 local vars)
|
||||
0x07, # varnum = 7
|
||||
0x00, # argflag
|
||||
0x00,
|
||||
0x00, # eval_stack_size = 0
|
||||
]
|
||||
)
|
||||
|
||||
parser = QuetzalParser(self.zmachine)
|
||||
parser._parse_stks(stks_bytes)
|
||||
|
||||
call_stack = self.zmachine._stackmanager._call_stack
|
||||
frames = call_stack[1:] # skip sentinel
|
||||
self.assertEqual(len(frames), 1)
|
||||
|
||||
# return_pc goes to caller (sentinel) program_counter
|
||||
self.assertEqual(call_stack[0].program_counter, 0x123456)
|
||||
# varnum goes to frame's return_addr
|
||||
assert isinstance(frames[0], ZRoutine)
|
||||
self.assertEqual(frames[0].return_addr, 7)
|
||||
|
||||
def test_parse_discard_bit_restores_none(self):
|
||||
"""Parser should set return_addr=None when flags bit 4 is set."""
|
||||
from mudlib.zmachine.quetzal import QuetzalParser
|
||||
|
||||
# flags = 0x12 = 2 locals + discard bit
|
||||
stks_bytes = bytes(
|
||||
[
|
||||
0x00,
|
||||
0x00,
|
||||
0x00, # return_pc = 0
|
||||
0x12, # flags (2 locals + discard)
|
||||
0x00, # varnum (ignored when discarding)
|
||||
0x00, # argflag
|
||||
0x00,
|
||||
0x00, # eval_stack_size = 0
|
||||
0x00,
|
||||
0x01, # local_vars[0]
|
||||
0x00,
|
||||
0x02, # local_vars[1]
|
||||
]
|
||||
)
|
||||
|
||||
parser = QuetzalParser(self.zmachine)
|
||||
parser._parse_stks(stks_bytes)
|
||||
|
||||
frames = self.zmachine._stackmanager._call_stack[1:]
|
||||
self.assertEqual(len(frames), 1)
|
||||
self.assertIsNone(frames[0].return_addr)
|
||||
self.assertEqual(frames[0].local_vars[:2], [1, 2])
|
||||
|
||||
@property
|
||||
def _mem(self):
|
||||
"""Helper to get mock memory."""
|
||||
return self.zmachine._mem
|
||||
|
|
@ -1,406 +0,0 @@
|
|||
"""Tests for QuetzalWriter Stks chunk generation.
|
||||
|
||||
Field mapping:
|
||||
- Quetzal return_pc → previous frame's program_counter (caller resume PC)
|
||||
- Quetzal varnum → frame.return_addr (store variable for return value)
|
||||
"""
|
||||
|
||||
|
||||
class TestStksChunkGeneration:
|
||||
"""Test Stks chunk generation and serialization."""
|
||||
|
||||
def test_empty_stack_serialization(self):
|
||||
"""Test serializing an empty stack (just the bottom sentinel)."""
|
||||
from unittest.mock import Mock
|
||||
|
||||
from mudlib.zmachine.quetzal import QuetzalWriter
|
||||
from mudlib.zmachine.zstackmanager import ZStackManager
|
||||
|
||||
zmachine = Mock()
|
||||
zmachine._mem = Mock()
|
||||
zmachine._mem.version = 5
|
||||
stack_manager = ZStackManager(zmachine._mem)
|
||||
zmachine._stackmanager = stack_manager
|
||||
|
||||
writer = QuetzalWriter(zmachine)
|
||||
result = writer._generate_stks_chunk()
|
||||
|
||||
assert result == b""
|
||||
|
||||
def test_single_frame_no_locals_no_stack(self):
|
||||
"""Test serializing a single routine frame with no locals or stack values."""
|
||||
from unittest.mock import Mock
|
||||
|
||||
from mudlib.zmachine.quetzal import QuetzalWriter
|
||||
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
|
||||
|
||||
zmachine = Mock()
|
||||
zmachine._mem = Mock()
|
||||
zmachine._mem.version = 5
|
||||
stack_manager = ZStackManager(zmachine._mem)
|
||||
|
||||
# Set caller resume PC on sentinel
|
||||
stack_manager._call_stack[0].program_counter = 0x1234
|
||||
|
||||
routine = ZRoutine(
|
||||
start_addr=0x5000,
|
||||
return_addr=7, # varnum: store to local var 6
|
||||
zmem=zmachine._mem,
|
||||
args=[],
|
||||
local_vars=[],
|
||||
stack=[],
|
||||
)
|
||||
stack_manager.push_routine(routine)
|
||||
zmachine._stackmanager = stack_manager
|
||||
|
||||
writer = QuetzalWriter(zmachine)
|
||||
result = writer._generate_stks_chunk()
|
||||
|
||||
expected = bytes(
|
||||
[
|
||||
0x00,
|
||||
0x12,
|
||||
0x34, # return_pc (from sentinel.program_counter)
|
||||
0x00, # flags (0 locals)
|
||||
0x07, # varnum (frame.return_addr)
|
||||
0x00, # argflag
|
||||
0x00,
|
||||
0x00, # eval_stack_size = 0
|
||||
]
|
||||
)
|
||||
|
||||
assert result == expected
|
||||
|
||||
def test_single_frame_with_locals(self):
|
||||
"""Test serializing a frame with local variables."""
|
||||
from unittest.mock import Mock
|
||||
|
||||
from mudlib.zmachine.quetzal import QuetzalWriter
|
||||
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
|
||||
|
||||
zmachine = Mock()
|
||||
zmachine._mem = Mock()
|
||||
zmachine._mem.version = 5
|
||||
stack_manager = ZStackManager(zmachine._mem)
|
||||
stack_manager._call_stack[0].program_counter = 0x2000
|
||||
|
||||
routine = ZRoutine(
|
||||
start_addr=0x5000,
|
||||
return_addr=0x10, # varnum: store to global var 0x10
|
||||
zmem=zmachine._mem,
|
||||
args=[],
|
||||
local_vars=[0x1111, 0x2222, 0x3333],
|
||||
stack=[],
|
||||
)
|
||||
stack_manager.push_routine(routine)
|
||||
zmachine._stackmanager = stack_manager
|
||||
|
||||
writer = QuetzalWriter(zmachine)
|
||||
result = writer._generate_stks_chunk()
|
||||
|
||||
expected = bytes(
|
||||
[
|
||||
0x00,
|
||||
0x20,
|
||||
0x00, # return_pc (from sentinel.program_counter)
|
||||
0x03, # flags (3 locals)
|
||||
0x10, # varnum (frame.return_addr)
|
||||
0x00, # argflag
|
||||
0x00,
|
||||
0x00, # eval_stack_size = 0
|
||||
0x11,
|
||||
0x11, # local_var[0]
|
||||
0x22,
|
||||
0x22, # local_var[1]
|
||||
0x33,
|
||||
0x33, # local_var[2]
|
||||
]
|
||||
)
|
||||
|
||||
assert result == expected
|
||||
|
||||
def test_single_frame_with_stack_values(self):
|
||||
"""Test serializing a frame with evaluation stack values."""
|
||||
from unittest.mock import Mock
|
||||
|
||||
from mudlib.zmachine.quetzal import QuetzalWriter
|
||||
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
|
||||
|
||||
zmachine = Mock()
|
||||
zmachine._mem = Mock()
|
||||
zmachine._mem.version = 5
|
||||
stack_manager = ZStackManager(zmachine._mem)
|
||||
stack_manager._call_stack[0].program_counter = 0x3000
|
||||
|
||||
routine = ZRoutine(
|
||||
start_addr=0x5000,
|
||||
return_addr=0, # varnum 0: push result to eval stack
|
||||
zmem=zmachine._mem,
|
||||
args=[],
|
||||
local_vars=[],
|
||||
stack=[0xABCD, 0xEF01],
|
||||
)
|
||||
stack_manager.push_routine(routine)
|
||||
zmachine._stackmanager = stack_manager
|
||||
|
||||
writer = QuetzalWriter(zmachine)
|
||||
result = writer._generate_stks_chunk()
|
||||
|
||||
expected = bytes(
|
||||
[
|
||||
0x00,
|
||||
0x30,
|
||||
0x00, # return_pc
|
||||
0x00, # flags (0 locals)
|
||||
0x00, # varnum (push to stack)
|
||||
0x00, # argflag
|
||||
0x00,
|
||||
0x02, # eval_stack_size = 2
|
||||
0xAB,
|
||||
0xCD, # stack[0]
|
||||
0xEF,
|
||||
0x01, # stack[1]
|
||||
]
|
||||
)
|
||||
|
||||
assert result == expected
|
||||
|
||||
def test_single_frame_full(self):
|
||||
"""Test serializing a frame with both locals and stack values."""
|
||||
from unittest.mock import Mock
|
||||
|
||||
from mudlib.zmachine.quetzal import QuetzalWriter
|
||||
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
|
||||
|
||||
zmachine = Mock()
|
||||
zmachine._mem = Mock()
|
||||
zmachine._mem.version = 5
|
||||
stack_manager = ZStackManager(zmachine._mem)
|
||||
stack_manager._call_stack[0].program_counter = 0x4567
|
||||
|
||||
routine = ZRoutine(
|
||||
start_addr=0x5000,
|
||||
return_addr=2, # varnum: store to local var 1
|
||||
zmem=zmachine._mem,
|
||||
args=[],
|
||||
local_vars=[0x0001, 0x0002],
|
||||
stack=[0x1000, 0x2000, 0x3000],
|
||||
)
|
||||
stack_manager.push_routine(routine)
|
||||
zmachine._stackmanager = stack_manager
|
||||
|
||||
writer = QuetzalWriter(zmachine)
|
||||
result = writer._generate_stks_chunk()
|
||||
|
||||
expected = bytes(
|
||||
[
|
||||
0x00,
|
||||
0x45,
|
||||
0x67, # return_pc (from sentinel.program_counter)
|
||||
0x02, # flags (2 locals)
|
||||
0x02, # varnum
|
||||
0x00, # argflag
|
||||
0x00,
|
||||
0x03, # eval_stack_size = 3
|
||||
0x00,
|
||||
0x01, # local_var[0]
|
||||
0x00,
|
||||
0x02, # local_var[1]
|
||||
0x10,
|
||||
0x00, # stack[0]
|
||||
0x20,
|
||||
0x00, # stack[1]
|
||||
0x30,
|
||||
0x00, # stack[2]
|
||||
]
|
||||
)
|
||||
|
||||
assert result == expected
|
||||
|
||||
def test_multiple_nested_frames(self):
|
||||
"""Test serializing multiple nested routine frames."""
|
||||
from unittest.mock import Mock
|
||||
|
||||
from mudlib.zmachine.quetzal import QuetzalWriter
|
||||
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
|
||||
|
||||
zmachine = Mock()
|
||||
zmachine._mem = Mock()
|
||||
zmachine._mem.version = 5
|
||||
stack_manager = ZStackManager(zmachine._mem)
|
||||
|
||||
# Sentinel has return PC for frame 1
|
||||
stack_manager._call_stack[0].program_counter = 0x1000
|
||||
|
||||
routine1 = ZRoutine(
|
||||
start_addr=0x5000,
|
||||
return_addr=0, # varnum: push to stack
|
||||
zmem=zmachine._mem,
|
||||
args=[],
|
||||
local_vars=[0xAAAA],
|
||||
stack=[0xBBBB],
|
||||
)
|
||||
stack_manager.push_routine(routine1)
|
||||
# Frame1 has return PC for frame 2
|
||||
routine1.program_counter = 0x2000
|
||||
|
||||
routine2 = ZRoutine(
|
||||
start_addr=0x6000,
|
||||
return_addr=5, # varnum: store to local var 4
|
||||
zmem=zmachine._mem,
|
||||
args=[],
|
||||
local_vars=[0xCCCC],
|
||||
stack=[0xDDDD],
|
||||
)
|
||||
stack_manager.push_routine(routine2)
|
||||
|
||||
zmachine._stackmanager = stack_manager
|
||||
|
||||
writer = QuetzalWriter(zmachine)
|
||||
result = writer._generate_stks_chunk()
|
||||
|
||||
expected = bytes(
|
||||
[
|
||||
# Frame 1: return_pc from sentinel (0x1000), varnum=0
|
||||
0x00,
|
||||
0x10,
|
||||
0x00, # return_pc
|
||||
0x01, # flags (1 local)
|
||||
0x00, # varnum
|
||||
0x00, # argflag
|
||||
0x00,
|
||||
0x01, # eval_stack_size = 1
|
||||
0xAA,
|
||||
0xAA, # local_var[0]
|
||||
0xBB,
|
||||
0xBB, # stack[0]
|
||||
# Frame 2: return_pc from routine1 (0x2000), varnum=5
|
||||
0x00,
|
||||
0x20,
|
||||
0x00, # return_pc
|
||||
0x01, # flags (1 local)
|
||||
0x05, # varnum
|
||||
0x00, # argflag
|
||||
0x00,
|
||||
0x01, # eval_stack_size = 1
|
||||
0xCC,
|
||||
0xCC, # local_var[0]
|
||||
0xDD,
|
||||
0xDD, # stack[0]
|
||||
]
|
||||
)
|
||||
|
||||
assert result == expected
|
||||
|
||||
def test_bottom_frame_zero_return_pc(self):
|
||||
"""Test that a bottom/dummy frame with return_pc=0 is handled correctly."""
|
||||
from unittest.mock import Mock
|
||||
|
||||
from mudlib.zmachine.quetzal import QuetzalWriter
|
||||
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
|
||||
|
||||
zmachine = Mock()
|
||||
zmachine._mem = Mock()
|
||||
zmachine._mem.version = 5
|
||||
stack_manager = ZStackManager(zmachine._mem)
|
||||
# Sentinel PC = 0 (main routine has no caller)
|
||||
stack_manager._call_stack[0].program_counter = 0
|
||||
|
||||
routine = ZRoutine(
|
||||
start_addr=0x5000,
|
||||
return_addr=0, # varnum: push to stack
|
||||
zmem=zmachine._mem,
|
||||
args=[],
|
||||
local_vars=[0x1234],
|
||||
stack=[],
|
||||
)
|
||||
stack_manager.push_routine(routine)
|
||||
zmachine._stackmanager = stack_manager
|
||||
|
||||
writer = QuetzalWriter(zmachine)
|
||||
result = writer._generate_stks_chunk()
|
||||
|
||||
expected = bytes(
|
||||
[
|
||||
0x00,
|
||||
0x00,
|
||||
0x00, # return_pc = 0
|
||||
0x01, # flags (1 local)
|
||||
0x00, # varnum
|
||||
0x00, # argflag
|
||||
0x00,
|
||||
0x00, # eval_stack_size = 0
|
||||
0x12,
|
||||
0x34, # local_var[0]
|
||||
]
|
||||
)
|
||||
|
||||
assert result == expected
|
||||
|
||||
|
||||
class TestStksRoundTrip:
|
||||
"""Test that Stks serialization/deserialization is symmetrical."""
|
||||
|
||||
def test_round_trip_serialization(self):
|
||||
"""Test that we can serialize and deserialize frames correctly."""
|
||||
from unittest.mock import Mock
|
||||
|
||||
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
|
||||
from mudlib.zmachine.zstackmanager import ZRoutine, ZStackManager
|
||||
|
||||
zmachine = Mock()
|
||||
zmachine._mem = Mock()
|
||||
zmachine._mem.version = 5
|
||||
|
||||
original_stack = ZStackManager(zmachine._mem)
|
||||
# Sentinel has return PC for frame 1
|
||||
original_stack._call_stack[0].program_counter = 0
|
||||
|
||||
routine1 = ZRoutine(
|
||||
start_addr=0x5000,
|
||||
return_addr=5, # varnum: store to local var 4
|
||||
zmem=zmachine._mem,
|
||||
args=[],
|
||||
local_vars=[0x0001, 0x0002, 0x0003],
|
||||
stack=[0x1111, 0x2222],
|
||||
)
|
||||
original_stack.push_routine(routine1)
|
||||
routine1.program_counter = 0x5678 # resume PC for after routine2
|
||||
|
||||
routine2 = ZRoutine(
|
||||
start_addr=0x6000,
|
||||
return_addr=3, # varnum: store to local var 2
|
||||
zmem=zmachine._mem,
|
||||
args=[],
|
||||
local_vars=[0xAAAA],
|
||||
stack=[0xBBBB, 0xCCCC, 0xDDDD],
|
||||
)
|
||||
original_stack.push_routine(routine2)
|
||||
|
||||
zmachine._stackmanager = original_stack
|
||||
|
||||
# Serialize
|
||||
writer = QuetzalWriter(zmachine)
|
||||
stks_data = writer._generate_stks_chunk()
|
||||
|
||||
# Deserialize
|
||||
parser = QuetzalParser(zmachine)
|
||||
parser._parse_stks(stks_data)
|
||||
|
||||
restored_stack = zmachine._stackmanager
|
||||
|
||||
assert len(restored_stack._call_stack) == 3
|
||||
|
||||
# Check frame 1: return_addr is varnum
|
||||
frame1 = restored_stack._call_stack[1]
|
||||
assert frame1.return_addr == 5
|
||||
assert frame1.local_vars[:3] == [0x0001, 0x0002, 0x0003]
|
||||
assert frame1.stack == [0x1111, 0x2222]
|
||||
# Frame1's program_counter was set from frame2's return_pc
|
||||
assert frame1.program_counter == 0x5678
|
||||
|
||||
# Check frame 2
|
||||
frame2 = restored_stack._call_stack[2]
|
||||
assert frame2.return_addr == 3
|
||||
assert frame2.local_vars[:1] == [0xAAAA]
|
||||
assert frame2.stack == [0xBBBB, 0xCCCC, 0xDDDD]
|
||||
|
|
@ -50,10 +50,6 @@ class MockStackManager:
|
|||
def __init__(self):
|
||||
self.stack = []
|
||||
self.locals = [0] * 15
|
||||
# For QuetzalWriter support - empty call stack
|
||||
from mudlib.zmachine.zstackmanager import ZStackBottom
|
||||
|
||||
self._call_stack = [ZStackBottom()]
|
||||
|
||||
def push_stack(self, value):
|
||||
self.stack.append(value)
|
||||
|
|
@ -96,7 +92,6 @@ class MockUI:
|
|||
self.screen.write = Mock()
|
||||
self.keyboard_input = Mock()
|
||||
self.keyboard_input.read_line = Mock()
|
||||
self.filesystem = Mock()
|
||||
|
||||
|
||||
class ZMachineOpcodeTests(TestCase):
|
||||
|
|
@ -119,7 +114,6 @@ class ZMachineOpcodeTests(TestCase):
|
|||
Mock(), # stream manager
|
||||
self.ui,
|
||||
Mock(), # lexer
|
||||
zmachine=None,
|
||||
)
|
||||
|
||||
def test_op_nop(self):
|
||||
|
|
@ -457,7 +451,6 @@ class ZMachineObjectOpcodeTests(TestCase):
|
|||
Mock(), # stream manager
|
||||
self.ui,
|
||||
Mock(), # lexer
|
||||
zmachine=None,
|
||||
)
|
||||
|
||||
def test_op_get_sibling_with_sibling(self):
|
||||
|
|
@ -668,7 +661,6 @@ class ZMachineComplexOpcodeTests(TestCase):
|
|||
Mock(), # stream manager
|
||||
self.ui,
|
||||
Mock(), # lexer
|
||||
zmachine=None,
|
||||
)
|
||||
|
||||
def test_op_sread_v3_basic_input(self):
|
||||
|
|
@ -754,27 +746,8 @@ class ZMachineComplexOpcodeTests(TestCase):
|
|||
# Should have called show_status once
|
||||
self.assertEqual(call_count[0], 1)
|
||||
|
||||
def test_op_save_v3_branches_false_when_filesystem_fails(self):
|
||||
"""Test save (V3) branches false when filesystem returns False."""
|
||||
# Need a valid zmachine for the test to proceed
|
||||
from mudlib.zmachine.zmemory import ZMemory
|
||||
|
||||
story = bytearray(1024)
|
||||
story[0] = 3
|
||||
story[0x0E] = 0x04
|
||||
story[0x0F] = 0x00
|
||||
|
||||
zmachine_mock = Mock()
|
||||
zmachine_mock._mem = ZMemory(bytes(story))
|
||||
zmachine_mock._pristine_mem = ZMemory(bytes(story))
|
||||
zmachine_mock._cpu = self.cpu
|
||||
zmachine_mock._stackmanager = self.stack
|
||||
|
||||
self.cpu._zmachine = zmachine_mock
|
||||
|
||||
# Mock filesystem to fail
|
||||
self.ui.filesystem.save_game = Mock(return_value=False)
|
||||
|
||||
def test_op_save_v3_branches_false(self):
|
||||
"""Test save (V3) branches false (QuetzalWriter not functional)."""
|
||||
self.decoder.branch_condition = True
|
||||
self.decoder.branch_offset = 100
|
||||
old_pc = self.cpu._opdecoder.program_counter
|
||||
|
|
@ -784,26 +757,8 @@ class ZMachineComplexOpcodeTests(TestCase):
|
|||
# Should not have branched (test is false)
|
||||
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
|
||||
|
||||
def test_op_restore_v3_branches_false_when_filesystem_returns_none(self):
|
||||
"""Test restore (V3) branches false when filesystem returns None."""
|
||||
from mudlib.zmachine.zmemory import ZMemory
|
||||
|
||||
story = bytearray(1024)
|
||||
story[0] = 3
|
||||
story[0x0E] = 0x04
|
||||
story[0x0F] = 0x00
|
||||
|
||||
zmachine_mock = Mock()
|
||||
zmachine_mock._mem = ZMemory(bytes(story))
|
||||
zmachine_mock._pristine_mem = ZMemory(bytes(story))
|
||||
zmachine_mock._cpu = self.cpu
|
||||
zmachine_mock._stackmanager = self.stack
|
||||
|
||||
self.cpu._zmachine = zmachine_mock
|
||||
|
||||
# Mock filesystem to return None (user cancelled or no file)
|
||||
self.ui.filesystem.restore_game = Mock(return_value=None)
|
||||
|
||||
def test_op_restore_v3_branches_false(self):
|
||||
"""Test restore (V3) branches false (no valid save files)."""
|
||||
self.decoder.branch_condition = True
|
||||
self.decoder.branch_offset = 100
|
||||
old_pc = self.cpu._opdecoder.program_counter
|
||||
|
|
@ -813,256 +768,6 @@ class ZMachineComplexOpcodeTests(TestCase):
|
|||
# Should not have branched (test is false)
|
||||
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
|
||||
|
||||
def test_op_restore_v3_branches_false_when_no_zmachine(self):
|
||||
"""Test restore (V3) branches false when zmachine is not set."""
|
||||
self.cpu._zmachine = None
|
||||
|
||||
self.decoder.branch_condition = True
|
||||
self.decoder.branch_offset = 100
|
||||
old_pc = self.cpu._opdecoder.program_counter
|
||||
|
||||
self.cpu.op_restore()
|
||||
|
||||
# Should not have branched (test is false)
|
||||
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
|
||||
|
||||
def test_op_restore_v3_branches_true_on_success(self):
|
||||
"""Test restore (V3) branches true when restore succeeds."""
|
||||
from mudlib.zmachine.quetzal import QuetzalWriter
|
||||
from mudlib.zmachine.zmemory import ZMemory
|
||||
|
||||
# Create a story with some dynamic memory
|
||||
story = bytearray(1024)
|
||||
story[0] = 3 # version 3
|
||||
story[0x0E] = 0x04 # static memory starts at 0x0400
|
||||
story[0x0F] = 0x00
|
||||
# Set header values
|
||||
story[0x02] = 0x12 # release high byte
|
||||
story[0x03] = 0x34 # release low byte
|
||||
for i, byte in enumerate(b"860509"):
|
||||
story[0x12 + i] = byte
|
||||
story[0x1C] = 0xAB # checksum high
|
||||
story[0x1D] = 0xCD # checksum low
|
||||
|
||||
# Create zmachine with modified memory state
|
||||
zmachine_mock = Mock()
|
||||
zmachine_mock._mem = ZMemory(bytes(story))
|
||||
zmachine_mock._pristine_mem = ZMemory(bytes(story))
|
||||
zmachine_mock._cpu = self.cpu
|
||||
zmachine_mock._stackmanager = self.stack
|
||||
|
||||
# Modify some dynamic memory to create a save state
|
||||
zmachine_mock._mem[0x100] = 0x42
|
||||
zmachine_mock._mem[0x200] = 0x99
|
||||
|
||||
self.cpu._zmachine = zmachine_mock
|
||||
|
||||
# Generate save data using QuetzalWriter
|
||||
writer = QuetzalWriter(zmachine_mock)
|
||||
save_data = writer.generate_save_data()
|
||||
|
||||
# Mock filesystem to return the save data
|
||||
self.ui.filesystem.restore_game = Mock(return_value=save_data)
|
||||
|
||||
self.decoder.branch_condition = True
|
||||
self.decoder.branch_offset = 10
|
||||
old_pc = self.cpu._opdecoder.program_counter
|
||||
|
||||
self.cpu.op_restore()
|
||||
|
||||
# Should have branched (test is true)
|
||||
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc + 8)
|
||||
|
||||
def test_op_restore_v3_restores_memory_state(self):
|
||||
"""Test restore (V3) correctly restores dynamic memory."""
|
||||
from mudlib.zmachine.quetzal import QuetzalWriter
|
||||
from mudlib.zmachine.zmemory import ZMemory
|
||||
|
||||
# Create a story
|
||||
story = bytearray(1024)
|
||||
story[0] = 3
|
||||
story[0x0E] = 0x04
|
||||
story[0x0F] = 0x00
|
||||
story[0x02] = 0x12
|
||||
story[0x03] = 0x34
|
||||
for i, byte in enumerate(b"860509"):
|
||||
story[0x12 + i] = byte
|
||||
story[0x1C] = 0xAB
|
||||
story[0x1D] = 0xCD
|
||||
|
||||
# Create zmachine with saved state
|
||||
saved_zmachine = Mock()
|
||||
saved_zmachine._mem = ZMemory(bytes(story))
|
||||
saved_zmachine._pristine_mem = ZMemory(bytes(story))
|
||||
saved_zmachine._cpu = self.cpu
|
||||
saved_zmachine._stackmanager = self.stack
|
||||
|
||||
# Modify memory to create unique save state
|
||||
saved_zmachine._mem[0x50] = 0xAA
|
||||
saved_zmachine._mem[0x150] = 0xBB
|
||||
saved_zmachine._mem[0x250] = 0xCC
|
||||
|
||||
# Generate save data
|
||||
writer = QuetzalWriter(saved_zmachine)
|
||||
save_data = writer.generate_save_data()
|
||||
|
||||
# Create fresh zmachine with different state
|
||||
current_zmachine = Mock()
|
||||
current_zmachine._mem = ZMemory(bytes(story))
|
||||
current_zmachine._pristine_mem = ZMemory(bytes(story))
|
||||
current_zmachine._cpu = self.cpu
|
||||
current_zmachine._stackmanager = self.stack
|
||||
|
||||
# Different values in memory
|
||||
current_zmachine._mem[0x50] = 0x11
|
||||
current_zmachine._mem[0x150] = 0x22
|
||||
current_zmachine._mem[0x250] = 0x33
|
||||
|
||||
self.cpu._zmachine = current_zmachine
|
||||
|
||||
# Mock filesystem to return save data
|
||||
self.ui.filesystem.restore_game = Mock(return_value=save_data)
|
||||
|
||||
self.decoder.branch_condition = True
|
||||
self.decoder.branch_offset = 10
|
||||
|
||||
self.cpu.op_restore()
|
||||
|
||||
# Memory should now match saved state
|
||||
self.assertEqual(current_zmachine._mem[0x50], 0xAA)
|
||||
self.assertEqual(current_zmachine._mem[0x150], 0xBB)
|
||||
self.assertEqual(current_zmachine._mem[0x250], 0xCC)
|
||||
|
||||
def test_op_restore_v3_branches_false_on_malformed_data(self):
|
||||
"""Test restore (V3) branches false when save data is malformed."""
|
||||
from mudlib.zmachine.zmemory import ZMemory
|
||||
|
||||
story = bytearray(1024)
|
||||
story[0] = 3
|
||||
story[0x0E] = 0x04
|
||||
story[0x0F] = 0x00
|
||||
|
||||
zmachine_mock = Mock()
|
||||
zmachine_mock._mem = ZMemory(bytes(story))
|
||||
zmachine_mock._pristine_mem = ZMemory(bytes(story))
|
||||
zmachine_mock._cpu = self.cpu
|
||||
zmachine_mock._stackmanager = self.stack
|
||||
|
||||
self.cpu._zmachine = zmachine_mock
|
||||
|
||||
# Mock filesystem to return invalid data
|
||||
self.ui.filesystem.restore_game = Mock(return_value=b"invalid data")
|
||||
|
||||
self.decoder.branch_condition = True
|
||||
self.decoder.branch_offset = 10
|
||||
old_pc = self.cpu._opdecoder.program_counter
|
||||
|
||||
self.cpu.op_restore()
|
||||
|
||||
# Should not have branched (test is false)
|
||||
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
|
||||
|
||||
def test_op_save_v3_branches_true_on_success(self):
|
||||
"""Test save (V3) branches true when filesystem succeeds."""
|
||||
# Create minimal zmachine mock
|
||||
from mudlib.zmachine.zmemory import ZMemory
|
||||
|
||||
story = bytearray(1024)
|
||||
story[0] = 3 # version 3
|
||||
story[0x0E] = 0x04 # static memory starts at 0x0400
|
||||
story[0x0F] = 0x00
|
||||
|
||||
zmachine_mock = Mock()
|
||||
zmachine_mock._mem = ZMemory(bytes(story))
|
||||
zmachine_mock._pristine_mem = ZMemory(bytes(story))
|
||||
zmachine_mock._cpu = self.cpu
|
||||
zmachine_mock._stackmanager = self.stack
|
||||
|
||||
# Attach zmachine to cpu
|
||||
self.cpu._zmachine = zmachine_mock
|
||||
|
||||
# Mock filesystem to succeed
|
||||
self.ui.filesystem.save_game = Mock(return_value=True)
|
||||
|
||||
self.decoder.branch_condition = True
|
||||
self.decoder.branch_offset = 10
|
||||
old_pc = self.cpu._opdecoder.program_counter
|
||||
|
||||
self.cpu.op_save()
|
||||
|
||||
# Should have branched (test is true)
|
||||
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc + 8)
|
||||
# Filesystem should have been called with bytes
|
||||
self.assertTrue(self.ui.filesystem.save_game.called)
|
||||
call_args = self.ui.filesystem.save_game.call_args[0]
|
||||
self.assertIsInstance(call_args[0], bytes)
|
||||
|
||||
def test_op_save_v3_generates_valid_iff_data(self):
|
||||
"""Test save generates valid IFF/FORM/IFZS container."""
|
||||
from mudlib.zmachine.zmemory import ZMemory
|
||||
|
||||
story = bytearray(1024)
|
||||
story[0] = 3
|
||||
story[0x0E] = 0x04
|
||||
story[0x0F] = 0x00
|
||||
# Set header values
|
||||
story[0x02] = 0x12 # release high byte
|
||||
story[0x03] = 0x34 # release low byte
|
||||
for i, byte in enumerate(b"860509"):
|
||||
story[0x12 + i] = byte
|
||||
story[0x1C] = 0xAB # checksum high
|
||||
story[0x1D] = 0xCD # checksum low
|
||||
|
||||
zmachine_mock = Mock()
|
||||
zmachine_mock._mem = ZMemory(bytes(story))
|
||||
zmachine_mock._pristine_mem = ZMemory(bytes(story))
|
||||
zmachine_mock._cpu = self.cpu
|
||||
zmachine_mock._stackmanager = self.stack
|
||||
|
||||
self.cpu._zmachine = zmachine_mock
|
||||
|
||||
# Mock filesystem to capture data
|
||||
captured_data = []
|
||||
|
||||
def capture_save(data):
|
||||
captured_data.append(data)
|
||||
return True
|
||||
|
||||
self.ui.filesystem.save_game = Mock(side_effect=capture_save)
|
||||
|
||||
self.decoder.branch_condition = True
|
||||
self.decoder.branch_offset = 10
|
||||
|
||||
self.cpu.op_save()
|
||||
|
||||
# Verify we got data
|
||||
self.assertEqual(len(captured_data), 1)
|
||||
data = captured_data[0]
|
||||
|
||||
# Verify IFF structure
|
||||
self.assertEqual(data[0:4], b"FORM") # FORM header
|
||||
# Bytes 4-7 are the size (big-endian 32-bit)
|
||||
self.assertEqual(data[8:12], b"IFZS") # IFZS type
|
||||
|
||||
# Verify chunks are present
|
||||
self.assertIn(b"IFhd", data)
|
||||
self.assertIn(b"CMem", data)
|
||||
self.assertIn(b"Stks", data)
|
||||
self.assertIn(b"ANNO", data)
|
||||
|
||||
def test_op_save_v3_without_zmachine_branches_false(self):
|
||||
"""Test save fails gracefully when zmachine is not set."""
|
||||
# Don't set zmachine
|
||||
self.cpu._zmachine = None
|
||||
|
||||
self.decoder.branch_condition = True
|
||||
old_pc = self.cpu._opdecoder.program_counter
|
||||
|
||||
self.cpu.op_save()
|
||||
|
||||
# Should not have branched
|
||||
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
|
||||
|
||||
def test_op_input_stream_is_noop(self):
|
||||
"""Test input_stream is a no-op stub."""
|
||||
# Should not raise
|
||||
|
|
@ -1079,204 +784,6 @@ class ZMachineComplexOpcodeTests(TestCase):
|
|||
self.cpu.op_restart()
|
||||
|
||||
|
||||
class QuetzalWriterTests(TestCase):
|
||||
"""Test suite for QuetzalWriter save functionality."""
|
||||
|
||||
def test_generate_ifhd_chunk(self):
|
||||
"""Test _generate_ifhd_chunk() produces correct 13-byte IFhd chunk."""
|
||||
from mudlib.zmachine.quetzal import QuetzalWriter
|
||||
|
||||
# Create a mock zmachine with known header values
|
||||
mock_zmachine = Mock()
|
||||
mock_zmachine._mem = MockMemory()
|
||||
|
||||
# Set header values in memory:
|
||||
# Bytes 2-3: Release number (0x1234)
|
||||
mock_zmachine._mem.write_word(0x02, 0x1234)
|
||||
# Bytes 0x12-0x17: Serial number (6 bytes: "860509")
|
||||
serial = b"860509"
|
||||
for i, byte in enumerate(serial):
|
||||
mock_zmachine._mem[0x12 + i] = byte
|
||||
# Bytes 0x1C-0x1D: Checksum (0xABCD)
|
||||
mock_zmachine._mem.write_word(0x1C, 0xABCD)
|
||||
|
||||
# Set program counter
|
||||
mock_cpu = Mock()
|
||||
mock_cpu._program_counter = 0x123456 # 24-bit PC
|
||||
mock_zmachine._cpu = mock_cpu
|
||||
|
||||
# Create writer and generate chunk
|
||||
writer = QuetzalWriter(mock_zmachine)
|
||||
chunk_data = writer._generate_ifhd_chunk()
|
||||
|
||||
# Verify chunk is exactly 13 bytes
|
||||
self.assertEqual(len(chunk_data), 13)
|
||||
|
||||
# Verify release number (bytes 0-1)
|
||||
self.assertEqual(chunk_data[0], 0x12)
|
||||
self.assertEqual(chunk_data[1], 0x34)
|
||||
|
||||
# Verify serial number (bytes 2-7)
|
||||
for i, expected in enumerate(serial):
|
||||
self.assertEqual(chunk_data[2 + i], expected)
|
||||
|
||||
# Verify checksum (bytes 8-9)
|
||||
self.assertEqual(chunk_data[8], 0xAB)
|
||||
self.assertEqual(chunk_data[9], 0xCD)
|
||||
|
||||
# Verify program counter (bytes 10-12)
|
||||
self.assertEqual(chunk_data[10], 0x12)
|
||||
self.assertEqual(chunk_data[11], 0x34)
|
||||
self.assertEqual(chunk_data[12], 0x56)
|
||||
|
||||
def test_cmem_all_unchanged(self):
|
||||
"""Test CMem chunk with no changes (all zeros after XOR)."""
|
||||
from mudlib.zmachine.quetzal import QuetzalWriter
|
||||
from mudlib.zmachine.zmemory import ZMemory
|
||||
|
||||
# Create a minimal z3 story file
|
||||
story = bytearray(1024)
|
||||
story[0] = 3 # version 3
|
||||
story[0x0E] = 0x04 # static memory starts at 0x0400
|
||||
story[0x0F] = 0x00
|
||||
|
||||
pristine = ZMemory(bytes(story))
|
||||
current = ZMemory(bytes(story))
|
||||
|
||||
zmachine = Mock()
|
||||
zmachine._pristine_mem = pristine
|
||||
zmachine._mem = current
|
||||
|
||||
writer = QuetzalWriter(zmachine)
|
||||
result = writer._generate_cmem_chunk()
|
||||
|
||||
# All identical means no output (trailing zeros omitted)
|
||||
self.assertIsInstance(result, bytes)
|
||||
self.assertEqual(len(result), 0)
|
||||
|
||||
def test_cmem_single_byte_change(self):
|
||||
"""Test CMem chunk with one byte changed."""
|
||||
from mudlib.zmachine.quetzal import QuetzalWriter
|
||||
from mudlib.zmachine.zmemory import ZMemory
|
||||
|
||||
story = bytearray(1024)
|
||||
story[0] = 3
|
||||
story[0x0E] = 0x04
|
||||
story[0x0F] = 0x00
|
||||
|
||||
pristine = ZMemory(bytes(story))
|
||||
current = ZMemory(bytes(story))
|
||||
current[0x0100] = 0x42
|
||||
|
||||
zmachine = Mock()
|
||||
zmachine._pristine_mem = pristine
|
||||
zmachine._mem = current
|
||||
|
||||
writer = QuetzalWriter(zmachine)
|
||||
result = writer._generate_cmem_chunk()
|
||||
|
||||
self.assertIsInstance(result, bytes)
|
||||
self.assertIn(0x42, result)
|
||||
|
||||
def test_cmem_multiple_scattered_changes(self):
|
||||
"""Test CMem chunk with multiple changes across memory."""
|
||||
from mudlib.zmachine.quetzal import QuetzalWriter
|
||||
from mudlib.zmachine.zmemory import ZMemory
|
||||
|
||||
story = bytearray(1024)
|
||||
story[0] = 3
|
||||
story[0x0E] = 0x04
|
||||
story[0x0F] = 0x00
|
||||
|
||||
pristine = ZMemory(bytes(story))
|
||||
current = ZMemory(bytes(story))
|
||||
current[0x0010] = 0xAA
|
||||
current[0x0100] = 0xBB
|
||||
current[0x0200] = 0xCC
|
||||
|
||||
zmachine = Mock()
|
||||
zmachine._pristine_mem = pristine
|
||||
zmachine._mem = current
|
||||
|
||||
writer = QuetzalWriter(zmachine)
|
||||
result = writer._generate_cmem_chunk()
|
||||
|
||||
self.assertIsInstance(result, bytes)
|
||||
self.assertIn(0xAA, result)
|
||||
self.assertIn(0xBB, result)
|
||||
self.assertIn(0xCC, result)
|
||||
self.assertLess(len(result), 1024)
|
||||
|
||||
def test_cmem_roundtrip_with_parser(self):
|
||||
"""Test that CMem output can be decoded by QuetzalParser._parse_cmem()."""
|
||||
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
|
||||
from mudlib.zmachine.zmemory import ZMemory
|
||||
|
||||
story = bytearray(1024)
|
||||
story[0] = 3
|
||||
story[0x0E] = 0x04
|
||||
story[0x0F] = 0x00
|
||||
|
||||
pristine = ZMemory(bytes(story))
|
||||
current = ZMemory(bytes(story))
|
||||
current[0x0050] = 0x12
|
||||
current[0x0051] = 0x34
|
||||
current[0x0150] = 0xAB
|
||||
current[0x0300] = 0xFF
|
||||
|
||||
zmachine = Mock()
|
||||
zmachine._pristine_mem = pristine
|
||||
zmachine._mem = current
|
||||
|
||||
writer = QuetzalWriter(zmachine)
|
||||
compressed_bytes = writer._generate_cmem_chunk()
|
||||
|
||||
# Create fresh memory for parsing into
|
||||
restored = ZMemory(bytes(story))
|
||||
restored_zmachine = Mock()
|
||||
restored_zmachine._pristine_mem = pristine
|
||||
restored_zmachine._mem = restored
|
||||
|
||||
parser = QuetzalParser(restored_zmachine)
|
||||
parser._parse_cmem(compressed_bytes)
|
||||
|
||||
# Verify restored memory matches current memory
|
||||
for addr in [0x0050, 0x0051, 0x0150, 0x0300]:
|
||||
self.assertEqual(
|
||||
restored[addr],
|
||||
current[addr],
|
||||
f"Mismatch at address 0x{addr:04X}",
|
||||
)
|
||||
|
||||
def test_cmem_consecutive_zeros(self):
|
||||
"""Test CMem encoding handles consecutive zero XOR results correctly."""
|
||||
from mudlib.zmachine.quetzal import QuetzalWriter
|
||||
from mudlib.zmachine.zmemory import ZMemory
|
||||
|
||||
story = bytearray(1024)
|
||||
story[0] = 3
|
||||
story[0x0E] = 0x04
|
||||
story[0x0F] = 0x00
|
||||
|
||||
pristine = ZMemory(bytes(story))
|
||||
current = ZMemory(bytes(story))
|
||||
current[0x0040] = 0x11
|
||||
current[0x0045] = 0x22
|
||||
|
||||
zmachine = Mock()
|
||||
zmachine._pristine_mem = pristine
|
||||
zmachine._mem = current
|
||||
|
||||
writer = QuetzalWriter(zmachine)
|
||||
result = writer._generate_cmem_chunk()
|
||||
|
||||
self.assertIsInstance(result, bytes)
|
||||
idx_11 = result.index(0x11)
|
||||
self.assertEqual(result[idx_11 + 1], 0x00)
|
||||
self.assertEqual(result[idx_11 + 2], 0x03)
|
||||
self.assertEqual(result[idx_11 + 3], 0x22)
|
||||
|
||||
|
||||
# Note: ZObjectParser methods are tested through integration tests
|
||||
# with real story files, not unit tests with mock memory, as the
|
||||
# interaction with ZStringFactory makes mocking complex.
|
||||
|
|
|
|||
Loading…
Reference in a new issue