Compare commits

..

9 commits

Author SHA1 Message Date
1b3a3646d6
Pre-consume store byte in op_aread and op_read_char before blocking reads
Without this, MUD-level saves during read_line/read_char capture PC pointing
at the store byte, which gets misinterpreted as an opcode on restore.
2026-02-10 15:45:52 -05:00
ad47ee05bd
Document z-machine performance analysis and optimization roadmap 2026-02-10 15:45:52 -05:00
4f570ae1af
Add profiling and timing scripts for z-machine performance analysis 2026-02-10 15:45:52 -05:00
bb2f1989cb
Optimize z-machine hot loop: fast step, dispatch table, inline bit ops
Add step_fast() that skips trace/logging overhead (saves ~22% at 1M+
avoided log calls). Pre-resolve opcode dispatch table at init to
eliminate per-instruction version checks and isinstance calls. Replace
BitField allocations with direct bit masks in opcode decoder.

Cold start: 4720ms -> 786ms. Steady state: ~500ms -> ~460ms.
2026-02-10 15:05:34 -05:00
802c72819c
Fix op_read_char to accept optional timing arguments
Lost Pig calls read_char with only the required first operand.
The Z-machine spec says time and input_routine are optional.
2026-02-10 14:48:57 -05:00
4313941334
Update if-journey: mark V8 MUD wiring as done, note upper window fix 2026-02-10 14:36:57 -05:00
5f12a4f841
Suppress upper window writes in MudScreen to fix Lost Pig output
V5+ games write room names to the upper window (status line) via
select_window(1). Since select_window was a no-op, status line text
leaked into the main output buffer, causing ">Outside" on prompt lines.

Track the active window and only buffer writes to window 0 (lower).
2026-02-10 14:36:42 -05:00
e55294af78
Implement V5+ save/restore opcodes and handle in-game saves on restore
- op_save_v5: generates Quetzal save, stores 1 on success / 0 on failure
- op_restore_v5: loads Quetzal save, stores 2 ("restored") via store byte
- _try_restore: detect V5+ in-game saves (0xBE 0x00 before PC) and process
  the store byte with result 2, matching the V3 branch-on-restore pattern
2026-02-10 14:18:42 -05:00
602da45ac2
Fix IF bugs: case-insensitive story lookup, double prompt, phantom restore command
- _find_story() now compares path.stem.lower() so "lostpig" matches "LostPig.z8"
- Server no longer writes its own prompt in IF mode (game handles prompting)
- Suppress phantom game output on restore (saved PC past sread causes garbage)
- Route .z5/.z8 files to EmbeddedIFSession now that V5+ is supported
2026-02-10 14:16:19 -05:00
12 changed files with 654 additions and 74 deletions

View file

@ -270,7 +270,7 @@ Concrete next steps, roughly ordered. Update as items get done.
- [x] V8 support and Lost Pig: relaxed version gates (V8 = V5 with ×8 packed addresses), implemented extended opcode decoder, ported V5+ opcodes (aread, call_vn/vn2/vs2, save_undo stub, log/art shift, scan_table, tokenize, copy_table, print_unicode). found and fixed: insert_object wrong removal order, double-byte operand decoder reading type bytes interleaved with operands instead of all types first, opcode detection using 7-bit mask instead of 5-bit. Lost Pig now runs to completion (101K steps, 61 opcodes). (done — LostPig.z8 bundled in content/stories/, see ``scripts/trace_lostpig.py``) - [x] V8 support and Lost Pig: relaxed version gates (V8 = V5 with ×8 packed addresses), implemented extended opcode decoder, ported V5+ opcodes (aread, call_vn/vn2/vs2, save_undo stub, log/art shift, scan_table, tokenize, copy_table, print_unicode). found and fixed: insert_object wrong removal order, double-byte operand decoder reading type bytes interleaved with operands instead of all types first, opcode detection using 7-bit mask instead of 5-bit. Lost Pig now runs to completion (101K steps, 61 opcodes). (done — LostPig.z8 bundled in content/stories/, see ``scripts/trace_lostpig.py``)
- [ ] wire V8 games to MUD: ``EmbeddedIFSession`` currently only handles .z3 files. extend to .z5/.z8 so Lost Pig is playable via ``play lostpig`` in the MUD. - [x] wire V8 games to MUD: ``play.py`` routes .z3/.z5/.z8 to ``EmbeddedIFSession``. Lost Pig playable via ``play lostpig``. fixed upper window leak: V5+ games write room names to window 1 (status line) via ``select_window``. ``MudScreen`` now tracks the active window and suppresses writes to the upper window, preventing status line text from appearing in game output.
- [ ] implement real save_undo: currently stubs returning -1 ("not available"). a proper implementation needs in-memory state snapshots (dynamic memory + call stack). Lost Pig works without undo but players expect it. - [ ] implement real save_undo: currently stubs returning -1 ("not available"). a proper implementation needs in-memory state snapshots (dynamic memory + call stack). Lost Pig works without undo but players expect it.
@ -338,6 +338,8 @@ What was needed:
Lost Pig trace: 101K instructions, 61 unique opcodes, full gameplay loop working (room descriptions, parser, object manipulation). ``scripts/trace_lostpig.py`` for tracing. Lost Pig trace: 101K instructions, 61 unique opcodes, full gameplay loop working (room descriptions, parser, object manipulation). ``scripts/trace_lostpig.py`` for tracing.
Performance profiling and optimization work is documented in ``docs/how/zmachine-performance.rst``. Baseline: ~30K opcodes/command at 60 ops/ms (500ms steady-state). Optimization brought cold-start from 4720ms to 786ms (6x improvement).
What this enables: What this enables:
- modern IF games compiled with Inform 6/7 to Z-machine V5/V8 format are now playable - modern IF games compiled with Inform 6/7 to Z-machine V5/V8 format are now playable

View file

@ -0,0 +1,120 @@
Z-Machine Interpreter Performance
==================================
The Problem
-----------
Lost Pig (V8, ~30K opcodes per command) was taking ~500ms steady-state and
~4700ms cold-start. Zork (V3) felt fast because it uses dfrotz (C subprocess),
but Lost Pig uses the embedded Python interpreter. Pure Python was interpreting
~30K opcodes at 60 ops/ms.
Profiling Results (Baseline)
-----------------------------
103K steps for 3 commands, 9.6M function calls total.
Top time consumers from cProfile:
- Logging/tracing: ~1.14s (22%)
log(), debug(), isEnabledFor(), log_disasm(), str.join for trace strings.
Even with logging disabled, 1M+ calls to log() that check isEnabledFor()
and return.
- BitField: ~0.67s (13%)
878K __getitem__ calls, 304K __init__ allocations, 1.8M isinstance() calls.
- Memory bounds checking: ~0.55s (11%)
512K _check_bounds() calls on every memory read.
Total overhead: ~2.36s out of 5.1s interpreter time (46%).
What We Optimized
-----------------
- step_fast(): hot loop without trace/log string building. Main run() uses
this by default.
- Pre-resolved dispatch table: opcode handlers resolved once at init, no
per-instruction isinstance/version checks.
- Inline bit operations: replaced BitField object allocation + slice ops with
direct & and >> in opcode decoder, branch offset, _make_signed, type byte
parsing.
Results: cold start 4720ms -> 786ms (6x), steady state ~500ms -> ~460ms (~8%).
Future Work (Prioritized)
--------------------------
1. Strip log() calls from opcode decoder internals (zopdecoder.py)
974K calls still executing in fast path. These are inside _parse_operand,
get_branch_offset, _parse_opcode_*, _get_pc, _write_result, etc. Even with
logging disabled, the function call + isEnabledFor check costs ~0.5s.
2. Remove memory bounds checking on internal access
_check_bounds in zmemory.__getitem__ — 512K calls, ~0.28s. Trust story
file for internal decode path, keep bounds checks only at API boundary.
3. Local variable caching in hot methods
Cache self._memory, self._stack as locals in tight loops to avoid repeated
attribute lookups.
4. BitField still used in zobjectparser.py
get_all_properties, get_prop_addr_len, get_attribute — 200K+ calls remain.
5. Operand parsing local bindings
Cache pop_stack, get_local_variable, read_global as locals.
6. Consider bytearray/memoryview for memory access
Instead of custom __getitem__.
Measurement Tools
-----------------
- scripts/time_lostpig.py
Per-command wall clock + opcode count, shows ops/ms rate.
- scripts/profile_lostpig.py
cProfile wrapper, shows top functions by cumtime and tottime.
- scripts/trace_lostpig.py
Step-by-step opcode tracer with frequency counter.
Architecture Notes
------------------
- step() (debug) vs step_fast() (production)
step() still exists with full trace/logging for debugging.
- _build_dispatch_table() runs at init
Pre-resolves version-specific handlers.
- Dispatch table structure
Dict of lists: dispatch[opcode_class][opcode_number] = (implemented, func)
or None.
- Threading overhead
The embedded interpreter runs in a thread; async plumbing (Event.wait) adds
negligible overhead — the bottleneck is pure z-machine execution time.

97
scripts/profile_lostpig.py Executable file
View file

@ -0,0 +1,97 @@
#!/usr/bin/env -S uv run --script
"""Profile Lost Pig V8 execution — find performance bottlenecks.
Runs the interpreter under cProfile for a few commands, then reports
the top functions by cumulative time and total time, plus callers of
expensive functions.
"""
# ruff: noqa: E402
import contextlib
import cProfile
import pstats
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))
from mudlib.zmachine import ZMachine, zstream, zui
from mudlib.zmachine.trivialzui import TrivialAudio, TrivialFilesystem, TrivialScreen
from mudlib.zmachine.zcpu import ZCpuQuit, ZCpuRestart
story_path = project_root / "content" / "stories" / "LostPig.z8"
if not story_path.exists():
print(f"ERROR: {story_path} not found")
sys.exit(1)
story_bytes = story_path.read_bytes()
print(f"Loaded LostPig.z8: {len(story_bytes)} bytes, version {story_bytes[0]}")
class AutoInputStream(zstream.ZInputStream):
"""Input stream that auto-feeds commands then quits."""
def __init__(self):
super().__init__()
self._commands = ["look", "look", "inventory"]
self._command_index = 0
def read_line(self, *args, **kwargs):
if self._command_index >= len(self._commands):
raise ZCpuQuit
cmd = self._commands[self._command_index]
self._command_index += 1
print(f"> {cmd}")
return cmd
def read_char(self, timed_input_routine=None, timed_input_interval=0):
# Return enter key (13) when read_char is called
return 13
audio = TrivialAudio()
screen = TrivialScreen()
keyboard = AutoInputStream()
filesystem = TrivialFilesystem()
ui = zui.ZUI(audio, screen, keyboard, filesystem)
zm = ZMachine(story_bytes, ui)
print("Running under cProfile...\n")
profiler = cProfile.Profile()
profiler.enable()
with contextlib.suppress(ZCpuQuit, ZCpuRestart):
zm.run()
profiler.disable()
print("\n" + "=" * 80)
print("PROFILING RESULTS")
print("=" * 80 + "\n")
# Create stats object
stats = pstats.Stats(profiler)
stats.strip_dirs()
stats.sort_stats("cumulative")
print("TOP 40 FUNCTIONS BY CUMULATIVE TIME:")
print("-" * 80)
stats.print_stats(40)
print("\n" + "=" * 80)
print("TOP 40 FUNCTIONS BY TOTAL TIME:")
print("-" * 80)
stats.sort_stats("time")
stats.print_stats(40)
print("\n" + "=" * 80)
print("TOP 40 CALLERS OF MOST EXPENSIVE FUNCTIONS:")
print("-" * 80)
stats.sort_stats("cumulative")
stats.print_callers(40)
print("\n" + "=" * 80)
print("PROFILING COMPLETE")
print("=" * 80)

217
scripts/time_lostpig.py Executable file
View file

@ -0,0 +1,217 @@
#!/usr/bin/env -S uv run --script
"""Time Lost Pig V8 per-command execution.
Measures wall-clock time and opcode count for each command to diagnose
whether slowness is in z-machine processing or MUD async plumbing.
Usage:
time_lostpig.py # default commands
time_lostpig.py look look inventory # custom commands
"""
# ruff: noqa: E402
import sys
import time
from pathlib import Path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))
from mudlib.zmachine import ZMachine, zstream, zui
from mudlib.zmachine.quetzal import QuetzalParser
from mudlib.zmachine.trivialzui import (
TrivialAudio,
TrivialFilesystem,
TrivialScreen,
)
from mudlib.zmachine.zcpu import (
ZCpuNotImplemented,
ZCpuQuit,
ZCpuRestart,
)
story_path = project_root / "content" / "stories" / "LostPig.z8"
if not story_path.exists():
print(f"ERROR: {story_path} not found")
sys.exit(1)
story_bytes = story_path.read_bytes()
print(f"Loaded LostPig.z8: {len(story_bytes)} bytes, version {story_bytes[0]}")
# Parse command line arguments for custom commands
default_commands = ["look", "look", "inventory", "x me", "n"]
test_commands = sys.argv[1:] if len(sys.argv) > 1 else default_commands
class TimedInputStream(zstream.ZInputStream):
"""Input stream that tracks timing and feeds commands."""
def __init__(self, commands):
super().__init__()
self._commands = commands
self._command_index = 0
self._command_start_time: float | None = None
self._command_start_opcodes: int | None = None
self._timings: list[tuple[int, int, int]] = []
self.total_opcodes: int = 0
def read_line(self, *args, **kwargs):
# Mark end of previous command processing
if (
self._command_start_time is not None
and self._command_start_opcodes is not None
):
elapsed_ms = int((time.perf_counter() - self._command_start_time) * 1000)
opcodes = self.total_opcodes - self._command_start_opcodes
self._timings.append((self._command_index - 1, elapsed_ms, opcodes))
# Return next command or quit
if self._command_index >= len(self._commands):
raise ZCpuQuit
command = self._commands[self._command_index]
self._command_index += 1
# Mark start of new command processing
self._command_start_time = time.perf_counter()
self._command_start_opcodes = self.total_opcodes
return command
def read_char(self, *args, **kwargs):
# For simple timing, just return enter key
# Mark timing same as read_line
if (
self._command_start_time is not None
and self._command_start_opcodes is not None
):
elapsed_ms = int((time.perf_counter() - self._command_start_time) * 1000)
opcodes = self.total_opcodes - self._command_start_opcodes
self._timings.append((self._command_index - 1, elapsed_ms, opcodes))
if self._command_index >= len(self._commands):
raise ZCpuQuit
# Just return enter
self._command_start_time = time.perf_counter()
self._command_start_opcodes = self.total_opcodes
return ord("\r")
def get_timings(self):
return self._timings
# Set up UI
audio = TrivialAudio()
screen = TrivialScreen()
keyboard = TimedInputStream(test_commands)
filesystem = TrivialFilesystem()
ui = zui.ZUI(audio, screen, keyboard, filesystem)
zm = ZMachine(story_bytes, ui)
print(f"Testing {len(test_commands)} commands: {test_commands}")
print()
# Try to restore from save file
data_dir = project_root / "data"
save_dir = data_dir / "if_saves"
restored = False
if save_dir.exists():
# Look for any save file matching LostPig
for save_path in save_dir.glob("*/LostPig.qzl"):
print(f"Found save file: {save_path}")
try:
save_data = save_path.read_bytes()
parser = QuetzalParser(zm)
parser.load_from_bytes(save_data)
pc = zm._opdecoder.program_counter
mem = zm._mem
# Handle V5+ in-game save restore
if (
mem.version >= 5
and pc >= 3
and mem[pc - 3] == 0xBE
and mem[pc - 2] == 0x00
):
zm._cpu._write_result(2)
restored = True
print(f"Restored from {save_path}")
break
except Exception as e:
print(f"Could not restore from {save_path}: {e}")
if not restored:
print("No save file found, starting from beginning")
print()
print("Running timed commands...")
print()
# Run interpreter with stepping to count opcodes
step_count = 0
max_steps = 2_000_000
try:
while step_count < max_steps:
try:
zm._cpu.step()
step_count += 1
keyboard.total_opcodes = step_count
except ZCpuQuit:
print(f"Game quit after {step_count:,} steps")
break
except ZCpuRestart:
print(f"Game restart after {step_count:,} steps")
break
except ZCpuNotImplemented as e:
print(f"NOT IMPLEMENTED at step {step_count:,}: {e}")
break
except Exception as e:
print(f"ERROR at step {step_count:,}: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
zm._cpu._dump_trace()
break
except KeyboardInterrupt:
print(f"\nInterrupted at step {step_count:,}")
print()
print("=" * 60)
print("TIMING RESULTS")
print("=" * 60)
print()
timings = keyboard.get_timings()
if timings:
total_ms = 0
total_cmd_opcodes = 0
for idx, elapsed_ms, opcodes in timings:
if idx < len(keyboard._commands):
cmd = keyboard._commands[idx]
rate = opcodes / elapsed_ms if elapsed_ms > 0 else 0
print(
f'command {idx + 1} "{cmd}": '
f"{opcodes:,} opcodes in {elapsed_ms}ms ({rate:,.0f} ops/ms)"
)
total_ms += elapsed_ms
total_cmd_opcodes += opcodes
if total_ms > 0:
avg_rate = total_cmd_opcodes / total_ms
print()
print(
f"Average rate: {avg_rate:,.0f} opcodes/ms "
f"({avg_rate * 1000:,.0f} opcodes/sec)"
)
else:
print("No timing data collected")
print()
print(f"Total opcodes executed: {step_count:,}")

View file

@ -24,7 +24,7 @@ def _find_story(name: str) -> pathlib.Path | None:
# prefix match (e.g. "zork" matches "zork1.z3") # prefix match (e.g. "zork" matches "zork1.z3")
if _stories_dir.exists(): if _stories_dir.exists():
for path in sorted(_stories_dir.iterdir()): for path in sorted(_stories_dir.iterdir()):
if path.stem.startswith(name) and path.suffix in _STORY_EXTENSIONS: if path.stem.lower().startswith(name) and path.suffix in _STORY_EXTENSIONS:
return path return path
return None return None
@ -65,8 +65,8 @@ async def cmd_play(player: Player, args: str) -> None:
if not isinstance(story_path, pathlib.Path): if not isinstance(story_path, pathlib.Path):
story_path = pathlib.Path(story_path) story_path = pathlib.Path(story_path)
# Use embedded interpreter for z3 files, dfrotz for others # Use embedded interpreter for z-machine files, dfrotz for others
if story_path.suffix == ".z3": if story_path.suffix in (".z3", ".z5", ".z8"):
try: try:
session = EmbeddedIFSession(player, str(story_path), game_name) session = EmbeddedIFSession(player, str(story_path), game_name)
except (FileNotFoundError, OSError) as e: except (FileNotFoundError, OSError) as e:

View file

@ -46,6 +46,14 @@ class EmbeddedIFSession:
Must be called before the interpreter thread is launched. Must be called before the interpreter thread is launched.
Returns True if state was restored successfully. Returns True if state was restored successfully.
Handles two save origins:
- In-game save (V3: opcode 0xB5, V5+: EXT 0xBE/0x00): PC points at
branch data (V3) or store byte (V5+). Process them so execution
resumes at the next instruction.
- MUD-level _do_save during sread/aread: PC points past the read
instruction (store byte pre-consumed in op_aread/op_read_char).
No post-processing needed (phantom output suppressed in start()).
""" """
if not self.save_path.exists(): if not self.save_path.exists():
return False return False
@ -53,17 +61,24 @@ class EmbeddedIFSession:
save_data = self.save_path.read_bytes() save_data = self.save_path.read_bytes()
parser = QuetzalParser(self._zmachine) parser = QuetzalParser(self._zmachine)
parser.load_from_bytes(save_data) parser.load_from_bytes(save_data)
# In V1-3, the saved PC points to branch data after the save
# instruction. Process the branch as "save succeeded" so the
# PC advances past it. Detect by checking for save opcode (0xB5)
# immediately before the restored PC.
pc = self._zmachine._opdecoder.program_counter pc = self._zmachine._opdecoder.program_counter
if ( mem = self._zmachine._mem
self._zmachine._mem.version <= 3
and pc > 0 if mem.version <= 3 and pc > 0 and mem[pc - 1] == 0xB5:
and self._zmachine._mem[pc - 1] == 0xB5 # V3 in-game save: PC at branch data after save opcode (0xB5).
): # Process the branch as "save succeeded".
self._zmachine._cpu._branch(True) self._zmachine._cpu._branch(True)
elif (
mem.version >= 5
and pc >= 3
and mem[pc - 3] == 0xBE
and mem[pc - 2] == 0x00
):
# V5+ in-game save: PC at store byte after EXT save opcode.
# Read store byte and write 2 ("restored") to that variable.
self._zmachine._cpu._write_result(2)
return True return True
except Exception as e: except Exception as e:
logger.debug(f"Restore failed: {e}") logger.debug(f"Restore failed: {e}")
@ -81,8 +96,10 @@ class EmbeddedIFSession:
output = self._screen.flush() output = self._screen.flush()
if restored: if restored:
prefix = "restoring saved game...\r\nrestored." # After restore, the game processes phantom input (garbage in text
return f"{prefix}\r\n\r\n{output}" if output else prefix # buffer), producing unwanted output. Suppress it and only show the
# restore confirmation.
return "restoring saved game...\r\nrestored."
return output return output
async def handle_input(self, text: str) -> IFResponse: async def handle_input(self, text: str) -> IFResponse:

View file

@ -315,7 +315,8 @@ async def shell(
if player.mode == "editor" and player.editor: if player.mode == "editor" and player.editor:
_writer.write(f" {player.editor.cursor + 1}> ") _writer.write(f" {player.editor.cursor + 1}> ")
elif player.mode == "if" and player.if_session: elif player.mode == "if" and player.if_session:
_writer.write("\r\n> ") # IF mode: game writes its own prompt, don't add another
pass
else: else:
_writer.write("mud> ") _writer.write("mud> ")
await _writer.drain() await _writer.drain()

View file

@ -21,6 +21,7 @@ class MudScreen(zscreen.ZScreen):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self._buffer = [] self._buffer = []
self._active_window = 0 # 0=lower (main text), 1=upper (status line)
self._columns = 80 self._columns = 80
self._rows = zscreen.INFINITE_ROWS self._rows = zscreen.INFINITE_ROWS
self.features = { self.features = {
@ -31,6 +32,7 @@ class MudScreen(zscreen.ZScreen):
} }
def write(self, string): def write(self, string):
if self._active_window == 0:
self._buffer.append(string) self._buffer.append(string)
def flush(self) -> str: def flush(self) -> str:
@ -39,10 +41,10 @@ class MudScreen(zscreen.ZScreen):
return result return result
def split_window(self, height): def split_window(self, height):
logger.debug(f"split_window({height}) - no-op") logger.debug(f"split_window({height})")
def select_window(self, window): def select_window(self, window):
logger.debug(f"select_window({window}) - no-op") self._active_window = window
def set_cursor_position(self, x, y): def set_cursor_position(self, x, y):
logger.debug(f"set_cursor_position({x}, {y}) - no-op") logger.debug(f"set_cursor_position({x}, {y}) - no-op")

View file

@ -10,7 +10,7 @@ import random
import time import time
from collections import deque from collections import deque
from . import bitfield, zopdecoder, zscreen from . import zopdecoder, zscreen
from .zlogging import log, log_disasm from .zlogging import log, log_disasm
@ -65,6 +65,7 @@ class ZCpu:
self._lexer = zlexer self._lexer = zlexer
self._zmachine = zmachine self._zmachine = zmachine
self._trace = deque(maxlen=20) self._trace = deque(maxlen=20)
self._dispatch = self._build_dispatch_table()
@property @property
def _program_counter(self): def _program_counter(self):
@ -111,9 +112,7 @@ class ZCpu:
def _make_signed(self, a): def _make_signed(self, a):
"""Turn the given 16-bit value into a signed integer.""" """Turn the given 16-bit value into a signed integer."""
assert a < (1 << 16) assert a < (1 << 16)
# This is a little ugly. if (a >> 15) & 1:
bf = bitfield.BitField(a)
if bf[15]:
a = a - (1 << 16) a = a - (1 << 16)
return a return a
@ -197,6 +196,55 @@ class ZCpu:
print(entry) print(entry)
print("===================================\n") print("===================================\n")
def _build_dispatch_table(self):
"""Pre-resolve all opcode handlers for current version."""
table = {}
for opcode_class, opcode_class_list in self.opcodes.items():
class_table = []
for opcode_decl in opcode_class_list:
if not opcode_decl:
class_table.append(None)
continue
if not isinstance(opcode_decl, (list, tuple)):
func = opcode_decl
else:
func = None
if isinstance(opcode_decl[0], (list, tuple)):
for f, version in opcode_decl: # type: ignore
if version <= self._memory.version:
func = f
break
elif opcode_decl[1] <= self._memory.version:
func = opcode_decl[0]
if func is None:
class_table.append(None)
continue
implemented = bool(func.__doc__)
class_table.append((implemented, func))
table[opcode_class] = class_table
return table
def step_fast(self):
"""Execute a single instruction without tracing.
Returns True if execution should continue.
"""
(opcode_class, opcode_number, operands) = self._opdecoder.get_next_instruction()
entry = self._dispatch[opcode_class][opcode_number]
if entry is None:
raise ZCpuIllegalInstruction
implemented, func = entry
if not implemented:
return False
try:
func(self, *operands)
except (ZCpuQuit, ZCpuRestart):
raise
except Exception:
self._dump_trace()
raise
return True
def step(self): def step(self):
"""Execute a single instruction. Returns True if execution should continue.""" """Execute a single instruction. Returns True if execution should continue."""
current_pc = self._opdecoder.program_counter current_pc = self._opdecoder.program_counter
@ -245,7 +293,7 @@ class ZCpu:
"""The Magic Function that takes little bits and bytes, twirls """The Magic Function that takes little bits and bytes, twirls
them around, and brings the magic to your screen!""" them around, and brings the magic to your screen!"""
log("Execution started") log("Execution started")
while self.step(): while self.step_fast():
pass pass
## ##
@ -760,7 +808,13 @@ class ZCpu:
text_buffer_addr = args[0] text_buffer_addr = args[0]
parse_buffer_addr = args[1] if len(args) > 1 else 0 parse_buffer_addr = args[1] if len(args) > 1 else 0
# Read input from keyboard # Consume store byte BEFORE blocking in read_line(). This ensures
# the PC is past the entire instruction when MUD-level saves capture
# state during read_line(). Without this, saves point PC at the store
# byte, which gets misinterpreted as an opcode on restore.
store_addr = self._opdecoder.get_store_address()
# Read input from keyboard (blocks until player types something)
text = self._ui.keyboard_input.read_line() text = self._ui.keyboard_input.read_line()
text = text.lower().strip("\n\r") text = text.lower().strip("\n\r")
@ -792,7 +846,7 @@ class ZCpu:
offset = pos + word_len offset = pos + word_len
# Store terminating character (13 = newline) # Store terminating character (13 = newline)
self._write_result(13) self._write_result(13, store_addr=store_addr)
def op_print_char(self, char): def op_print_char(self, char):
"""Output the given ZSCII character.""" """Output the given ZSCII character."""
@ -906,7 +960,7 @@ class ZCpu:
"""Play sound effect (no-op - sound not supported in text MUD).""" """Play sound effect (no-op - sound not supported in text MUD)."""
pass pass
def op_read_char(self, unused, time, input_routine): def op_read_char(self, unused, time=0, input_routine=0):
"""Read a single character from input stream 0 (keyboard). """Read a single character from input stream 0 (keyboard).
Optionally, call a routine periodically to decide whether or Optionally, call a routine periodically to decide whether or
@ -920,8 +974,11 @@ class ZCpu:
if time != 0 or input_routine != 0: if time != 0 or input_routine != 0:
raise ZCpuNotImplemented raise ZCpuNotImplemented
# Consume store byte BEFORE blocking in read_char() — same reason
# as op_aread: PC must be past the full instruction for MUD saves.
store_addr = self._opdecoder.get_store_address()
char = self._ui.keyboard_input.read_char() char = self._ui.keyboard_input.read_char()
self._write_result(char) self._write_result(char, store_addr=store_addr)
def op_scan_table(self, x, table, length, *args): def op_scan_table(self, x, table, length, *args):
"""Search a table for a value, branch if found, store address (V4+). """Search a table for a value, branch if found, store address (V4+).
@ -1034,12 +1091,57 @@ class ZCpu:
## EXT opcodes (opcodes 256-284) ## EXT opcodes (opcodes 256-284)
def op_save_v5(self, *args): def op_save_v5(self, *args):
"""TODO: Write docstring here.""" """Save game state to file (V5+ - stores result).
raise ZCpuNotImplemented
Generates Quetzal save data and writes via filesystem.
Stores 1 on success, 0 on failure. On restore, the game
will see 2 stored in the same variable.
"""
if self._zmachine is None:
self._write_result(0)
return
from .quetzal import QuetzalWriter
try:
writer = QuetzalWriter(self._zmachine)
save_data = writer.generate_save_data()
success = self._ui.filesystem.save_game(save_data)
self._write_result(1 if success else 0)
except Exception as e:
log(f"Save failed with exception: {e}")
self._write_result(0)
def op_restore_v5(self, *args): def op_restore_v5(self, *args):
"""TODO: Write docstring here.""" """Restore game state from file (V5+ - stores result).
raise ZCpuNotImplemented
Loads Quetzal save data and restores memory/stack/PC.
The restored PC points at the store byte of the original save
instruction. We read it and write 2 (meaning "restored") to
the indicated variable.
Stores 0 on failure (in the current, un-restored state).
"""
if self._zmachine is None:
self._write_result(0)
return
from .quetzal import QuetzalParser
try:
save_data = self._ui.filesystem.restore_game()
if save_data is None:
self._write_result(0)
return
parser = QuetzalParser(self._zmachine)
parser.load_from_bytes(save_data)
# Restored PC points at the store byte of the save instruction.
# Read it and write 2 ("restored") to that variable.
self._write_result(2)
except Exception as e:
log(f"Restore failed with exception: {e}")
self._write_result(0)
def op_log_shift(self, number, places): def op_log_shift(self, number, places):
"""Logical shift: positive places = left, negative = right (V5+). """Logical shift: positive places = left, negative = right (V5+).

View file

@ -6,7 +6,6 @@
# root directory of this distribution. # root directory of this distribution.
# #
from .bitfield import BitField
from .zlogging import log from .zlogging import log
@ -74,15 +73,11 @@ class ZOpDecoder:
# Extended opcode # Extended opcode
return self._parse_opcode_extended() return self._parse_opcode_extended()
opcode = BitField(opcode) if not (opcode & 0x80):
if opcode[7] == 0:
# Long opcode
return self._parse_opcode_long(opcode) return self._parse_opcode_long(opcode)
elif opcode[6] == 0: elif not (opcode & 0x40):
# Short opcode
return self._parse_opcode_short(opcode) return self._parse_opcode_short(opcode)
else: else:
# Variable opcode
return self._parse_opcode_variable(opcode) return self._parse_opcode_variable(opcode)
def _parse_opcode_long(self, opcode): def _parse_opcode_long(self, opcode):
@ -92,35 +87,35 @@ class ZOpDecoder:
log("Opcode is long") log("Opcode is long")
LONG_OPERAND_TYPES = [SMALL_CONSTANT, VARIABLE] LONG_OPERAND_TYPES = [SMALL_CONSTANT, VARIABLE]
operands = [ operands = [
self._parse_operand(LONG_OPERAND_TYPES[opcode[6]]), self._parse_operand(LONG_OPERAND_TYPES[(opcode >> 6) & 1]),
self._parse_operand(LONG_OPERAND_TYPES[opcode[5]]), self._parse_operand(LONG_OPERAND_TYPES[(opcode >> 5) & 1]),
] ]
return (OPCODE_2OP, opcode[0:5], operands) return (OPCODE_2OP, opcode & 0x1F, operands)
def _parse_opcode_short(self, opcode): def _parse_opcode_short(self, opcode):
"""Parse an opcode of the short form.""" """Parse an opcode of the short form."""
# Short opcodes can have either 1 operand, or no operand. # Short opcodes can have either 1 operand, or no operand.
log("Opcode is short") log("Opcode is short")
operand_type = opcode[4:6] operand_type = (opcode >> 4) & 0x03
operand = self._parse_operand(operand_type) operand = self._parse_operand(operand_type)
if operand is None: # 0OP variant if operand is None: # 0OP variant
log("Opcode is 0OP variant") log("Opcode is 0OP variant")
return (OPCODE_0OP, opcode[0:4], []) return (OPCODE_0OP, opcode & 0x0F, [])
else: else:
log("Opcode is 1OP variant") log("Opcode is 1OP variant")
return (OPCODE_1OP, opcode[0:4], [operand]) return (OPCODE_1OP, opcode & 0x0F, [operand])
def _parse_opcode_variable(self, opcode): def _parse_opcode_variable(self, opcode):
"""Parse an opcode of the variable form.""" """Parse an opcode of the variable form."""
log("Opcode is variable") log("Opcode is variable")
if opcode[5]: if (opcode >> 5) & 1:
log("Variable opcode of VAR kind") log("Variable opcode of VAR kind")
opcode_type = OPCODE_VAR opcode_type = OPCODE_VAR
else: else:
log("Variable opcode of 2OP kind") log("Variable opcode of 2OP kind")
opcode_type = OPCODE_2OP opcode_type = OPCODE_2OP
opcode_num = opcode[0:5] opcode_num = opcode & 0x1F
# Read all type bytes FIRST, before parsing any operands. # Read all type bytes FIRST, before parsing any operands.
# call_vs2 (VAR:12) and call_vn2 (VAR:26) have two type bytes; # call_vs2 (VAR:12) and call_vn2 (VAR:26) have two type bytes;
@ -183,12 +178,12 @@ class ZOpDecoder:
def _read_type_byte(self): def _read_type_byte(self):
"""Read one operand type byte and return a list of type codes.""" """Read one operand type byte and return a list of type codes."""
operand_byte = BitField(self._get_pc()) operand_byte = self._get_pc()
return [ return [
operand_byte[6:8], (operand_byte >> 6) & 0x03,
operand_byte[4:6], (operand_byte >> 4) & 0x03,
operand_byte[2:4], (operand_byte >> 2) & 0x03,
operand_byte[0:2], operand_byte & 0x03,
] ]
def _parse_operand_list(self, operand_types): def _parse_operand_list(self, operand_types):
@ -214,12 +209,11 @@ class ZOpDecoder:
to by the PC. Increment PC just past the text.""" to by the PC. Increment PC just past the text."""
start_addr = self.program_counter start_addr = self.program_counter
bf = BitField(0)
while True: while True:
bf.__init__(self._memory[self.program_counter]) byte = self._memory[self.program_counter]
self.program_counter += 2 self.program_counter += 2
if bf[7] == 1: if (byte >> 7) & 1:
break break
return start_addr return start_addr
@ -236,10 +230,10 @@ class ZOpDecoder:
to branch if true or branch if false), and second, the address to to branch if true or branch if false), and second, the address to
jump to. Increment the PC as necessary.""" jump to. Increment the PC as necessary."""
bf = BitField(self._get_pc()) byte = self._get_pc()
branch_if_true = bool(bf[7]) branch_if_true = bool((byte >> 7) & 1)
if bf[6]: if (byte >> 6) & 1:
branch_offset = bf[0:6] branch_offset = byte & 0x3F
else: else:
# We need to do a little magic here. The branch offset is # We need to do a little magic here. The branch offset is
# written as a signed 14-bit number, with signed meaning '-n' is # written as a signed 14-bit number, with signed meaning '-n' is
@ -253,8 +247,8 @@ class ZOpDecoder:
# If the MSB is not set, we just extract the value and return it. # If the MSB is not set, we just extract the value and return it.
# #
# Can you spell "Weird" ? # Can you spell "Weird" ?
branch_offset = self._get_pc() + (bf[0:5] << 8) branch_offset = self._get_pc() + ((byte & 0x1F) << 8)
if bf[5]: if (byte >> 5) & 1:
branch_offset -= 8192 branch_offset -= 8192
log(f"Branch if {branch_if_true} to offset {branch_offset:+d}") log(f"Branch if {branch_if_true} to offset {branch_offset:+d}")

View file

@ -47,6 +47,37 @@ def test_mud_screen_flush_clears_buffer():
assert second == "" assert second == ""
def test_mud_screen_suppresses_upper_window_writes():
"""MudScreen discards writes to upper window (status line)."""
screen = MudScreen()
screen.write("before ")
screen.select_window(1) # upper window
screen.write("STATUS LINE")
screen.select_window(0) # back to lower window
screen.write("after")
output = screen.flush()
assert output == "before after"
def test_mud_screen_lower_window_is_default():
"""MudScreen starts with lower window active, writes are captured."""
screen = MudScreen()
screen.write("hello")
assert screen.flush() == "hello"
def test_mud_screen_upper_window_multiple_writes():
"""MudScreen discards all writes while upper window is active."""
screen = MudScreen()
screen.select_window(1)
screen.write("Room Name")
screen.write(" | Score: 0")
screen.select_window(0)
screen.write("You are in a dark room.\n")
output = screen.flush()
assert output == "You are in a dark room.\n"
def test_mud_input_stream_feed_and_read(): def test_mud_input_stream_feed_and_read():
"""MudInputStream feed and read_line work with threading.""" """MudInputStream feed and read_line work with threading."""
stream = MudInputStream() stream = MudInputStream()

View file

@ -67,10 +67,9 @@ async def test_play_enters_if_mode(player):
mock_session.save_path = Mock(spec=Path) mock_session.save_path = Mock(spec=Path)
mock_session.save_path.exists = Mock(return_value=False) mock_session.save_path.exists = Mock(return_value=False)
with patch("mudlib.commands.play.IFSession") as MockIFSession: with patch("mudlib.commands.play.EmbeddedIFSession") as MockSession:
MockIFSession.return_value = mock_session MockSession.return_value = mock_session
# Use .z5 to test dfrotz path
with patch("mudlib.commands.play._find_story") as mock_find: with patch("mudlib.commands.play._find_story") as mock_find:
mock_find.return_value = "/fake/path/zork1.z5" mock_find.return_value = "/fake/path/zork1.z5"
@ -108,11 +107,11 @@ async def test_play_handles_dfrotz_missing(player):
with patch("mudlib.commands.play.IFSession") as MockIFSession: with patch("mudlib.commands.play.IFSession") as MockIFSession:
MockIFSession.return_value = mock_session MockIFSession.return_value = mock_session
# Use .z5 to test dfrotz path # Use .zblorb to test dfrotz path (z3/z5/z8 go to embedded)
with patch("mudlib.commands.play._find_story") as mock_find: with patch("mudlib.commands.play._find_story") as mock_find:
mock_find.return_value = "/fake/path/zork1.z5" mock_find.return_value = "/fake/path/game.zblorb"
await cmd_play(player, "zork1") await cmd_play(player, "game")
# Verify error message was sent # Verify error message was sent
player.writer.write.assert_called() player.writer.write.assert_called()
@ -142,10 +141,9 @@ async def test_play_restores_save_if_exists(player):
) )
mock_session.start = AsyncMock(return_value=restored_output) mock_session.start = AsyncMock(return_value=restored_output)
with patch("mudlib.commands.play.IFSession") as MockIFSession: with patch("mudlib.commands.play.EmbeddedIFSession") as MockSession:
MockIFSession.return_value = mock_session MockSession.return_value = mock_session
# Use .z5 to test dfrotz path
with patch("mudlib.commands.play._find_story") as mock_find: with patch("mudlib.commands.play._find_story") as mock_find:
mock_find.return_value = "/fake/path/zork1.z5" mock_find.return_value = "/fake/path/zork1.z5"
@ -170,14 +168,13 @@ async def test_play_no_restore_if_no_save(player):
from mudlib.commands.play import cmd_play from mudlib.commands.play import cmd_play
# Mock IFSession # Mock EmbeddedIFSession
mock_session = Mock() mock_session = Mock()
mock_session.start = AsyncMock(return_value="Welcome to Zork!") mock_session.start = AsyncMock(return_value="Welcome to Zork!")
with patch("mudlib.commands.play.IFSession") as MockIFSession: with patch("mudlib.commands.play.EmbeddedIFSession") as MockSession:
MockIFSession.return_value = mock_session MockSession.return_value = mock_session
# Use .z5 to test dfrotz path
with patch("mudlib.commands.play._find_story") as mock_find: with patch("mudlib.commands.play._find_story") as mock_find:
mock_find.return_value = "/fake/path/zork1.z5" mock_find.return_value = "/fake/path/zork1.z5"