Compare commits

..

9 commits

Author SHA1 Message Date
1b3a3646d6
Pre-consume store byte in op_aread and op_read_char before blocking reads
Without this, MUD-level saves during read_line/read_char capture PC pointing
at the store byte, which gets misinterpreted as an opcode on restore.
2026-02-10 15:45:52 -05:00
ad47ee05bd
Document z-machine performance analysis and optimization roadmap 2026-02-10 15:45:52 -05:00
4f570ae1af
Add profiling and timing scripts for z-machine performance analysis 2026-02-10 15:45:52 -05:00
bb2f1989cb
Optimize z-machine hot loop: fast step, dispatch table, inline bit ops
Add step_fast() that skips trace/logging overhead (saves ~22% at 1M+
avoided log calls). Pre-resolve opcode dispatch table at init to
eliminate per-instruction version checks and isinstance calls. Replace
BitField allocations with direct bit masks in opcode decoder.

Cold start: 4720ms -> 786ms. Steady state: ~500ms -> ~460ms.
2026-02-10 15:05:34 -05:00
802c72819c
Fix op_read_char to accept optional timing arguments
Lost Pig calls read_char with only the required first operand.
The Z-machine spec says time and input_routine are optional.
2026-02-10 14:48:57 -05:00
4313941334
Update if-journey: mark V8 MUD wiring as done, note upper window fix 2026-02-10 14:36:57 -05:00
5f12a4f841
Suppress upper window writes in MudScreen to fix Lost Pig output
V5+ games write room names to the upper window (status line) via
select_window(1). Since select_window was a no-op, status line text
leaked into the main output buffer, causing ">Outside" on prompt lines.

Track the active window and only buffer writes to window 0 (lower).
2026-02-10 14:36:42 -05:00
e55294af78
Implement V5+ save/restore opcodes and handle in-game saves on restore
- op_save_v5: generates Quetzal save, stores 1 on success / 0 on failure
- op_restore_v5: loads Quetzal save, stores 2 ("restored") via store byte
- _try_restore: detect V5+ in-game saves (0xBE 0x00 before PC) and process
  the store byte with result 2, matching the V3 branch-on-restore pattern
2026-02-10 14:18:42 -05:00
602da45ac2
Fix IF bugs: case-insensitive story lookup, double prompt, phantom restore command
- _find_story() now compares path.stem.lower() so "lostpig" matches "LostPig.z8"
- Server no longer writes its own prompt in IF mode (game handles prompting)
- Suppress phantom game output on restore (saved PC past sread causes garbage)
- Route .z5/.z8 files to EmbeddedIFSession now that V5+ is supported
2026-02-10 14:16:19 -05:00
12 changed files with 654 additions and 74 deletions

View file

@ -270,7 +270,7 @@ Concrete next steps, roughly ordered. Update as items get done.
- [x] V8 support and Lost Pig: relaxed version gates (V8 = V5 with ×8 packed addresses), implemented extended opcode decoder, ported V5+ opcodes (aread, call_vn/vn2/vs2, save_undo stub, log/art shift, scan_table, tokenize, copy_table, print_unicode). found and fixed: insert_object wrong removal order, double-byte operand decoder reading type bytes interleaved with operands instead of all types first, opcode detection using 7-bit mask instead of 5-bit. Lost Pig now runs to completion (101K steps, 61 opcodes). (done — LostPig.z8 bundled in content/stories/, see ``scripts/trace_lostpig.py``)
- [ ] wire V8 games to MUD: ``EmbeddedIFSession`` currently only handles .z3 files. extend to .z5/.z8 so Lost Pig is playable via ``play lostpig`` in the MUD.
- [x] wire V8 games to MUD: ``play.py`` routes .z3/.z5/.z8 to ``EmbeddedIFSession``. Lost Pig playable via ``play lostpig``. fixed upper window leak: V5+ games write room names to window 1 (status line) via ``select_window``. ``MudScreen`` now tracks the active window and suppresses writes to the upper window, preventing status line text from appearing in game output.
- [ ] implement real save_undo: currently stubs returning -1 ("not available"). a proper implementation needs in-memory state snapshots (dynamic memory + call stack). Lost Pig works without undo but players expect it.
@ -338,6 +338,8 @@ What was needed:
Lost Pig trace: 101K instructions, 61 unique opcodes, full gameplay loop working (room descriptions, parser, object manipulation). ``scripts/trace_lostpig.py`` for tracing.
Performance profiling and optimization work is documented in ``docs/how/zmachine-performance.rst``. Baseline: ~30K opcodes/command at 60 ops/ms (500ms steady-state). Optimization brought cold-start from 4720ms to 786ms (6x improvement).
What this enables:
- modern IF games compiled with Inform 6/7 to Z-machine V5/V8 format are now playable

View file

@ -0,0 +1,120 @@
Z-Machine Interpreter Performance
==================================
The Problem
-----------
Lost Pig (V8, ~30K opcodes per command) was taking ~500ms steady-state and
~4700ms cold-start. Zork (V3) felt fast because it uses dfrotz (C subprocess),
but Lost Pig uses the embedded Python interpreter. Pure Python was interpreting
~30K opcodes at 60 ops/ms.
Profiling Results (Baseline)
-----------------------------
103K steps for 3 commands, 9.6M function calls total.
Top time consumers from cProfile:
- Logging/tracing: ~1.14s (22%)
log(), debug(), isEnabledFor(), log_disasm(), str.join for trace strings.
Even with logging disabled, 1M+ calls to log() that check isEnabledFor()
and return.
- BitField: ~0.67s (13%)
878K __getitem__ calls, 304K __init__ allocations, 1.8M isinstance() calls.
- Memory bounds checking: ~0.55s (11%)
512K _check_bounds() calls on every memory read.
Total overhead: ~2.36s out of 5.1s interpreter time (46%).
What We Optimized
-----------------
- step_fast(): hot loop without trace/log string building. Main run() uses
this by default.
- Pre-resolved dispatch table: opcode handlers resolved once at init, no
per-instruction isinstance/version checks.
- Inline bit operations: replaced BitField object allocation + slice ops with
direct & and >> in opcode decoder, branch offset, _make_signed, type byte
parsing.
Results: cold start 4720ms -> 786ms (6x), steady state ~500ms -> ~460ms (~8%).
Future Work (Prioritized)
--------------------------
1. Strip log() calls from opcode decoder internals (zopdecoder.py)
974K calls still executing in fast path. These are inside _parse_operand,
get_branch_offset, _parse_opcode_*, _get_pc, _write_result, etc. Even with
logging disabled, the function call + isEnabledFor check costs ~0.5s.
2. Remove memory bounds checking on internal access
_check_bounds in zmemory.__getitem__ — 512K calls, ~0.28s. Trust story
file for internal decode path, keep bounds checks only at API boundary.
3. Local variable caching in hot methods
Cache self._memory, self._stack as locals in tight loops to avoid repeated
attribute lookups.
4. BitField still used in zobjectparser.py
get_all_properties, get_prop_addr_len, get_attribute — 200K+ calls remain.
5. Operand parsing local bindings
Cache pop_stack, get_local_variable, read_global as locals.
6. Consider bytearray/memoryview for memory access
Instead of custom __getitem__.
Measurement Tools
-----------------
- scripts/time_lostpig.py
Per-command wall clock + opcode count, shows ops/ms rate.
- scripts/profile_lostpig.py
cProfile wrapper, shows top functions by cumtime and tottime.
- scripts/trace_lostpig.py
Step-by-step opcode tracer with frequency counter.
Architecture Notes
------------------
- step() (debug) vs step_fast() (production)
step() still exists with full trace/logging for debugging.
- _build_dispatch_table() runs at init
Pre-resolves version-specific handlers.
- Dispatch table structure
Dict of lists: dispatch[opcode_class][opcode_number] = (implemented, func)
or None.
- Threading overhead
The embedded interpreter runs in a thread; async plumbing (Event.wait) adds
negligible overhead — the bottleneck is pure z-machine execution time.

97
scripts/profile_lostpig.py Executable file
View file

@ -0,0 +1,97 @@
#!/usr/bin/env -S uv run --script
"""Profile Lost Pig V8 execution — find performance bottlenecks.
Runs the interpreter under cProfile for a few commands, then reports
the top functions by cumulative time and total time, plus callers of
expensive functions.
"""
# ruff: noqa: E402
import contextlib
import cProfile
import pstats
import sys
from pathlib import Path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))
from mudlib.zmachine import ZMachine, zstream, zui
from mudlib.zmachine.trivialzui import TrivialAudio, TrivialFilesystem, TrivialScreen
from mudlib.zmachine.zcpu import ZCpuQuit, ZCpuRestart
story_path = project_root / "content" / "stories" / "LostPig.z8"
if not story_path.exists():
print(f"ERROR: {story_path} not found")
sys.exit(1)
story_bytes = story_path.read_bytes()
print(f"Loaded LostPig.z8: {len(story_bytes)} bytes, version {story_bytes[0]}")
class AutoInputStream(zstream.ZInputStream):
"""Input stream that auto-feeds commands then quits."""
def __init__(self):
super().__init__()
self._commands = ["look", "look", "inventory"]
self._command_index = 0
def read_line(self, *args, **kwargs):
if self._command_index >= len(self._commands):
raise ZCpuQuit
cmd = self._commands[self._command_index]
self._command_index += 1
print(f"> {cmd}")
return cmd
def read_char(self, timed_input_routine=None, timed_input_interval=0):
# Return enter key (13) when read_char is called
return 13
audio = TrivialAudio()
screen = TrivialScreen()
keyboard = AutoInputStream()
filesystem = TrivialFilesystem()
ui = zui.ZUI(audio, screen, keyboard, filesystem)
zm = ZMachine(story_bytes, ui)
print("Running under cProfile...\n")
profiler = cProfile.Profile()
profiler.enable()
with contextlib.suppress(ZCpuQuit, ZCpuRestart):
zm.run()
profiler.disable()
print("\n" + "=" * 80)
print("PROFILING RESULTS")
print("=" * 80 + "\n")
# Create stats object
stats = pstats.Stats(profiler)
stats.strip_dirs()
stats.sort_stats("cumulative")
print("TOP 40 FUNCTIONS BY CUMULATIVE TIME:")
print("-" * 80)
stats.print_stats(40)
print("\n" + "=" * 80)
print("TOP 40 FUNCTIONS BY TOTAL TIME:")
print("-" * 80)
stats.sort_stats("time")
stats.print_stats(40)
print("\n" + "=" * 80)
print("TOP 40 CALLERS OF MOST EXPENSIVE FUNCTIONS:")
print("-" * 80)
stats.sort_stats("cumulative")
stats.print_callers(40)
print("\n" + "=" * 80)
print("PROFILING COMPLETE")
print("=" * 80)

217
scripts/time_lostpig.py Executable file
View file

@ -0,0 +1,217 @@
#!/usr/bin/env -S uv run --script
"""Time Lost Pig V8 per-command execution.
Measures wall-clock time and opcode count for each command to diagnose
whether slowness is in z-machine processing or MUD async plumbing.
Usage:
time_lostpig.py # default commands
time_lostpig.py look look inventory # custom commands
"""
# ruff: noqa: E402
import sys
import time
from pathlib import Path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))
from mudlib.zmachine import ZMachine, zstream, zui
from mudlib.zmachine.quetzal import QuetzalParser
from mudlib.zmachine.trivialzui import (
TrivialAudio,
TrivialFilesystem,
TrivialScreen,
)
from mudlib.zmachine.zcpu import (
ZCpuNotImplemented,
ZCpuQuit,
ZCpuRestart,
)
story_path = project_root / "content" / "stories" / "LostPig.z8"
if not story_path.exists():
print(f"ERROR: {story_path} not found")
sys.exit(1)
story_bytes = story_path.read_bytes()
print(f"Loaded LostPig.z8: {len(story_bytes)} bytes, version {story_bytes[0]}")
# Parse command line arguments for custom commands
default_commands = ["look", "look", "inventory", "x me", "n"]
test_commands = sys.argv[1:] if len(sys.argv) > 1 else default_commands
class TimedInputStream(zstream.ZInputStream):
"""Input stream that tracks timing and feeds commands."""
def __init__(self, commands):
super().__init__()
self._commands = commands
self._command_index = 0
self._command_start_time: float | None = None
self._command_start_opcodes: int | None = None
self._timings: list[tuple[int, int, int]] = []
self.total_opcodes: int = 0
def read_line(self, *args, **kwargs):
# Mark end of previous command processing
if (
self._command_start_time is not None
and self._command_start_opcodes is not None
):
elapsed_ms = int((time.perf_counter() - self._command_start_time) * 1000)
opcodes = self.total_opcodes - self._command_start_opcodes
self._timings.append((self._command_index - 1, elapsed_ms, opcodes))
# Return next command or quit
if self._command_index >= len(self._commands):
raise ZCpuQuit
command = self._commands[self._command_index]
self._command_index += 1
# Mark start of new command processing
self._command_start_time = time.perf_counter()
self._command_start_opcodes = self.total_opcodes
return command
def read_char(self, *args, **kwargs):
# For simple timing, just return enter key
# Mark timing same as read_line
if (
self._command_start_time is not None
and self._command_start_opcodes is not None
):
elapsed_ms = int((time.perf_counter() - self._command_start_time) * 1000)
opcodes = self.total_opcodes - self._command_start_opcodes
self._timings.append((self._command_index - 1, elapsed_ms, opcodes))
if self._command_index >= len(self._commands):
raise ZCpuQuit
# Just return enter
self._command_start_time = time.perf_counter()
self._command_start_opcodes = self.total_opcodes
return ord("\r")
def get_timings(self):
return self._timings
# Set up UI
audio = TrivialAudio()
screen = TrivialScreen()
keyboard = TimedInputStream(test_commands)
filesystem = TrivialFilesystem()
ui = zui.ZUI(audio, screen, keyboard, filesystem)
zm = ZMachine(story_bytes, ui)
print(f"Testing {len(test_commands)} commands: {test_commands}")
print()
# Try to restore from save file
data_dir = project_root / "data"
save_dir = data_dir / "if_saves"
restored = False
if save_dir.exists():
# Look for any save file matching LostPig
for save_path in save_dir.glob("*/LostPig.qzl"):
print(f"Found save file: {save_path}")
try:
save_data = save_path.read_bytes()
parser = QuetzalParser(zm)
parser.load_from_bytes(save_data)
pc = zm._opdecoder.program_counter
mem = zm._mem
# Handle V5+ in-game save restore
if (
mem.version >= 5
and pc >= 3
and mem[pc - 3] == 0xBE
and mem[pc - 2] == 0x00
):
zm._cpu._write_result(2)
restored = True
print(f"Restored from {save_path}")
break
except Exception as e:
print(f"Could not restore from {save_path}: {e}")
if not restored:
print("No save file found, starting from beginning")
print()
print("Running timed commands...")
print()
# Run interpreter with stepping to count opcodes
step_count = 0
max_steps = 2_000_000
try:
while step_count < max_steps:
try:
zm._cpu.step()
step_count += 1
keyboard.total_opcodes = step_count
except ZCpuQuit:
print(f"Game quit after {step_count:,} steps")
break
except ZCpuRestart:
print(f"Game restart after {step_count:,} steps")
break
except ZCpuNotImplemented as e:
print(f"NOT IMPLEMENTED at step {step_count:,}: {e}")
break
except Exception as e:
print(f"ERROR at step {step_count:,}: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
zm._cpu._dump_trace()
break
except KeyboardInterrupt:
print(f"\nInterrupted at step {step_count:,}")
print()
print("=" * 60)
print("TIMING RESULTS")
print("=" * 60)
print()
timings = keyboard.get_timings()
if timings:
total_ms = 0
total_cmd_opcodes = 0
for idx, elapsed_ms, opcodes in timings:
if idx < len(keyboard._commands):
cmd = keyboard._commands[idx]
rate = opcodes / elapsed_ms if elapsed_ms > 0 else 0
print(
f'command {idx + 1} "{cmd}": '
f"{opcodes:,} opcodes in {elapsed_ms}ms ({rate:,.0f} ops/ms)"
)
total_ms += elapsed_ms
total_cmd_opcodes += opcodes
if total_ms > 0:
avg_rate = total_cmd_opcodes / total_ms
print()
print(
f"Average rate: {avg_rate:,.0f} opcodes/ms "
f"({avg_rate * 1000:,.0f} opcodes/sec)"
)
else:
print("No timing data collected")
print()
print(f"Total opcodes executed: {step_count:,}")

View file

@ -24,7 +24,7 @@ def _find_story(name: str) -> pathlib.Path | None:
# prefix match (e.g. "zork" matches "zork1.z3")
if _stories_dir.exists():
for path in sorted(_stories_dir.iterdir()):
if path.stem.startswith(name) and path.suffix in _STORY_EXTENSIONS:
if path.stem.lower().startswith(name) and path.suffix in _STORY_EXTENSIONS:
return path
return None
@ -65,8 +65,8 @@ async def cmd_play(player: Player, args: str) -> None:
if not isinstance(story_path, pathlib.Path):
story_path = pathlib.Path(story_path)
# Use embedded interpreter for z3 files, dfrotz for others
if story_path.suffix == ".z3":
# Use embedded interpreter for z-machine files, dfrotz for others
if story_path.suffix in (".z3", ".z5", ".z8"):
try:
session = EmbeddedIFSession(player, str(story_path), game_name)
except (FileNotFoundError, OSError) as e:

View file

@ -46,6 +46,14 @@ class EmbeddedIFSession:
Must be called before the interpreter thread is launched.
Returns True if state was restored successfully.
Handles two save origins:
- In-game save (V3: opcode 0xB5, V5+: EXT 0xBE/0x00): PC points at
branch data (V3) or store byte (V5+). Process them so execution
resumes at the next instruction.
- MUD-level _do_save during sread/aread: PC points past the read
instruction (store byte pre-consumed in op_aread/op_read_char).
No post-processing needed (phantom output suppressed in start()).
"""
if not self.save_path.exists():
return False
@ -53,17 +61,24 @@ class EmbeddedIFSession:
save_data = self.save_path.read_bytes()
parser = QuetzalParser(self._zmachine)
parser.load_from_bytes(save_data)
# In V1-3, the saved PC points to branch data after the save
# instruction. Process the branch as "save succeeded" so the
# PC advances past it. Detect by checking for save opcode (0xB5)
# immediately before the restored PC.
pc = self._zmachine._opdecoder.program_counter
if (
self._zmachine._mem.version <= 3
and pc > 0
and self._zmachine._mem[pc - 1] == 0xB5
):
mem = self._zmachine._mem
if mem.version <= 3 and pc > 0 and mem[pc - 1] == 0xB5:
# V3 in-game save: PC at branch data after save opcode (0xB5).
# Process the branch as "save succeeded".
self._zmachine._cpu._branch(True)
elif (
mem.version >= 5
and pc >= 3
and mem[pc - 3] == 0xBE
and mem[pc - 2] == 0x00
):
# V5+ in-game save: PC at store byte after EXT save opcode.
# Read store byte and write 2 ("restored") to that variable.
self._zmachine._cpu._write_result(2)
return True
except Exception as e:
logger.debug(f"Restore failed: {e}")
@ -81,8 +96,10 @@ class EmbeddedIFSession:
output = self._screen.flush()
if restored:
prefix = "restoring saved game...\r\nrestored."
return f"{prefix}\r\n\r\n{output}" if output else prefix
# After restore, the game processes phantom input (garbage in text
# buffer), producing unwanted output. Suppress it and only show the
# restore confirmation.
return "restoring saved game...\r\nrestored."
return output
async def handle_input(self, text: str) -> IFResponse:

View file

@ -315,7 +315,8 @@ async def shell(
if player.mode == "editor" and player.editor:
_writer.write(f" {player.editor.cursor + 1}> ")
elif player.mode == "if" and player.if_session:
_writer.write("\r\n> ")
# IF mode: game writes its own prompt, don't add another
pass
else:
_writer.write("mud> ")
await _writer.drain()

View file

@ -21,6 +21,7 @@ class MudScreen(zscreen.ZScreen):
def __init__(self):
super().__init__()
self._buffer = []
self._active_window = 0 # 0=lower (main text), 1=upper (status line)
self._columns = 80
self._rows = zscreen.INFINITE_ROWS
self.features = {
@ -31,6 +32,7 @@ class MudScreen(zscreen.ZScreen):
}
def write(self, string):
if self._active_window == 0:
self._buffer.append(string)
def flush(self) -> str:
@ -39,10 +41,10 @@ class MudScreen(zscreen.ZScreen):
return result
def split_window(self, height):
logger.debug(f"split_window({height}) - no-op")
logger.debug(f"split_window({height})")
def select_window(self, window):
logger.debug(f"select_window({window}) - no-op")
self._active_window = window
def set_cursor_position(self, x, y):
logger.debug(f"set_cursor_position({x}, {y}) - no-op")

View file

@ -10,7 +10,7 @@ import random
import time
from collections import deque
from . import bitfield, zopdecoder, zscreen
from . import zopdecoder, zscreen
from .zlogging import log, log_disasm
@ -65,6 +65,7 @@ class ZCpu:
self._lexer = zlexer
self._zmachine = zmachine
self._trace = deque(maxlen=20)
self._dispatch = self._build_dispatch_table()
@property
def _program_counter(self):
@ -111,9 +112,7 @@ class ZCpu:
def _make_signed(self, a):
"""Turn the given 16-bit value into a signed integer."""
assert a < (1 << 16)
# This is a little ugly.
bf = bitfield.BitField(a)
if bf[15]:
if (a >> 15) & 1:
a = a - (1 << 16)
return a
@ -197,6 +196,55 @@ class ZCpu:
print(entry)
print("===================================\n")
def _build_dispatch_table(self):
"""Pre-resolve all opcode handlers for current version."""
table = {}
for opcode_class, opcode_class_list in self.opcodes.items():
class_table = []
for opcode_decl in opcode_class_list:
if not opcode_decl:
class_table.append(None)
continue
if not isinstance(opcode_decl, (list, tuple)):
func = opcode_decl
else:
func = None
if isinstance(opcode_decl[0], (list, tuple)):
for f, version in opcode_decl: # type: ignore
if version <= self._memory.version:
func = f
break
elif opcode_decl[1] <= self._memory.version:
func = opcode_decl[0]
if func is None:
class_table.append(None)
continue
implemented = bool(func.__doc__)
class_table.append((implemented, func))
table[opcode_class] = class_table
return table
def step_fast(self):
"""Execute a single instruction without tracing.
Returns True if execution should continue.
"""
(opcode_class, opcode_number, operands) = self._opdecoder.get_next_instruction()
entry = self._dispatch[opcode_class][opcode_number]
if entry is None:
raise ZCpuIllegalInstruction
implemented, func = entry
if not implemented:
return False
try:
func(self, *operands)
except (ZCpuQuit, ZCpuRestart):
raise
except Exception:
self._dump_trace()
raise
return True
def step(self):
"""Execute a single instruction. Returns True if execution should continue."""
current_pc = self._opdecoder.program_counter
@ -245,7 +293,7 @@ class ZCpu:
"""The Magic Function that takes little bits and bytes, twirls
them around, and brings the magic to your screen!"""
log("Execution started")
while self.step():
while self.step_fast():
pass
##
@ -760,7 +808,13 @@ class ZCpu:
text_buffer_addr = args[0]
parse_buffer_addr = args[1] if len(args) > 1 else 0
# Read input from keyboard
# Consume store byte BEFORE blocking in read_line(). This ensures
# the PC is past the entire instruction when MUD-level saves capture
# state during read_line(). Without this, saves point PC at the store
# byte, which gets misinterpreted as an opcode on restore.
store_addr = self._opdecoder.get_store_address()
# Read input from keyboard (blocks until player types something)
text = self._ui.keyboard_input.read_line()
text = text.lower().strip("\n\r")
@ -792,7 +846,7 @@ class ZCpu:
offset = pos + word_len
# Store terminating character (13 = newline)
self._write_result(13)
self._write_result(13, store_addr=store_addr)
def op_print_char(self, char):
"""Output the given ZSCII character."""
@ -906,7 +960,7 @@ class ZCpu:
"""Play sound effect (no-op - sound not supported in text MUD)."""
pass
def op_read_char(self, unused, time, input_routine):
def op_read_char(self, unused, time=0, input_routine=0):
"""Read a single character from input stream 0 (keyboard).
Optionally, call a routine periodically to decide whether or
@ -920,8 +974,11 @@ class ZCpu:
if time != 0 or input_routine != 0:
raise ZCpuNotImplemented
# Consume store byte BEFORE blocking in read_char() — same reason
# as op_aread: PC must be past the full instruction for MUD saves.
store_addr = self._opdecoder.get_store_address()
char = self._ui.keyboard_input.read_char()
self._write_result(char)
self._write_result(char, store_addr=store_addr)
def op_scan_table(self, x, table, length, *args):
"""Search a table for a value, branch if found, store address (V4+).
@ -1034,12 +1091,57 @@ class ZCpu:
## EXT opcodes (opcodes 256-284)
def op_save_v5(self, *args):
"""TODO: Write docstring here."""
raise ZCpuNotImplemented
"""Save game state to file (V5+ - stores result).
Generates Quetzal save data and writes via filesystem.
Stores 1 on success, 0 on failure. On restore, the game
will see 2 stored in the same variable.
"""
if self._zmachine is None:
self._write_result(0)
return
from .quetzal import QuetzalWriter
try:
writer = QuetzalWriter(self._zmachine)
save_data = writer.generate_save_data()
success = self._ui.filesystem.save_game(save_data)
self._write_result(1 if success else 0)
except Exception as e:
log(f"Save failed with exception: {e}")
self._write_result(0)
def op_restore_v5(self, *args):
"""TODO: Write docstring here."""
raise ZCpuNotImplemented
"""Restore game state from file (V5+ - stores result).
Loads Quetzal save data and restores memory/stack/PC.
The restored PC points at the store byte of the original save
instruction. We read it and write 2 (meaning "restored") to
the indicated variable.
Stores 0 on failure (in the current, un-restored state).
"""
if self._zmachine is None:
self._write_result(0)
return
from .quetzal import QuetzalParser
try:
save_data = self._ui.filesystem.restore_game()
if save_data is None:
self._write_result(0)
return
parser = QuetzalParser(self._zmachine)
parser.load_from_bytes(save_data)
# Restored PC points at the store byte of the save instruction.
# Read it and write 2 ("restored") to that variable.
self._write_result(2)
except Exception as e:
log(f"Restore failed with exception: {e}")
self._write_result(0)
def op_log_shift(self, number, places):
"""Logical shift: positive places = left, negative = right (V5+).

View file

@ -6,7 +6,6 @@
# root directory of this distribution.
#
from .bitfield import BitField
from .zlogging import log
@ -74,15 +73,11 @@ class ZOpDecoder:
# Extended opcode
return self._parse_opcode_extended()
opcode = BitField(opcode)
if opcode[7] == 0:
# Long opcode
if not (opcode & 0x80):
return self._parse_opcode_long(opcode)
elif opcode[6] == 0:
# Short opcode
elif not (opcode & 0x40):
return self._parse_opcode_short(opcode)
else:
# Variable opcode
return self._parse_opcode_variable(opcode)
def _parse_opcode_long(self, opcode):
@ -92,35 +87,35 @@ class ZOpDecoder:
log("Opcode is long")
LONG_OPERAND_TYPES = [SMALL_CONSTANT, VARIABLE]
operands = [
self._parse_operand(LONG_OPERAND_TYPES[opcode[6]]),
self._parse_operand(LONG_OPERAND_TYPES[opcode[5]]),
self._parse_operand(LONG_OPERAND_TYPES[(opcode >> 6) & 1]),
self._parse_operand(LONG_OPERAND_TYPES[(opcode >> 5) & 1]),
]
return (OPCODE_2OP, opcode[0:5], operands)
return (OPCODE_2OP, opcode & 0x1F, operands)
def _parse_opcode_short(self, opcode):
"""Parse an opcode of the short form."""
# Short opcodes can have either 1 operand, or no operand.
log("Opcode is short")
operand_type = opcode[4:6]
operand_type = (opcode >> 4) & 0x03
operand = self._parse_operand(operand_type)
if operand is None: # 0OP variant
log("Opcode is 0OP variant")
return (OPCODE_0OP, opcode[0:4], [])
return (OPCODE_0OP, opcode & 0x0F, [])
else:
log("Opcode is 1OP variant")
return (OPCODE_1OP, opcode[0:4], [operand])
return (OPCODE_1OP, opcode & 0x0F, [operand])
def _parse_opcode_variable(self, opcode):
"""Parse an opcode of the variable form."""
log("Opcode is variable")
if opcode[5]:
if (opcode >> 5) & 1:
log("Variable opcode of VAR kind")
opcode_type = OPCODE_VAR
else:
log("Variable opcode of 2OP kind")
opcode_type = OPCODE_2OP
opcode_num = opcode[0:5]
opcode_num = opcode & 0x1F
# Read all type bytes FIRST, before parsing any operands.
# call_vs2 (VAR:12) and call_vn2 (VAR:26) have two type bytes;
@ -183,12 +178,12 @@ class ZOpDecoder:
def _read_type_byte(self):
"""Read one operand type byte and return a list of type codes."""
operand_byte = BitField(self._get_pc())
operand_byte = self._get_pc()
return [
operand_byte[6:8],
operand_byte[4:6],
operand_byte[2:4],
operand_byte[0:2],
(operand_byte >> 6) & 0x03,
(operand_byte >> 4) & 0x03,
(operand_byte >> 2) & 0x03,
operand_byte & 0x03,
]
def _parse_operand_list(self, operand_types):
@ -214,12 +209,11 @@ class ZOpDecoder:
to by the PC. Increment PC just past the text."""
start_addr = self.program_counter
bf = BitField(0)
while True:
bf.__init__(self._memory[self.program_counter])
byte = self._memory[self.program_counter]
self.program_counter += 2
if bf[7] == 1:
if (byte >> 7) & 1:
break
return start_addr
@ -236,10 +230,10 @@ class ZOpDecoder:
to branch if true or branch if false), and second, the address to
jump to. Increment the PC as necessary."""
bf = BitField(self._get_pc())
branch_if_true = bool(bf[7])
if bf[6]:
branch_offset = bf[0:6]
byte = self._get_pc()
branch_if_true = bool((byte >> 7) & 1)
if (byte >> 6) & 1:
branch_offset = byte & 0x3F
else:
# We need to do a little magic here. The branch offset is
# written as a signed 14-bit number, with signed meaning '-n' is
@ -253,8 +247,8 @@ class ZOpDecoder:
# If the MSB is not set, we just extract the value and return it.
#
# Can you spell "Weird" ?
branch_offset = self._get_pc() + (bf[0:5] << 8)
if bf[5]:
branch_offset = self._get_pc() + ((byte & 0x1F) << 8)
if (byte >> 5) & 1:
branch_offset -= 8192
log(f"Branch if {branch_if_true} to offset {branch_offset:+d}")

View file

@ -47,6 +47,37 @@ def test_mud_screen_flush_clears_buffer():
assert second == ""
def test_mud_screen_suppresses_upper_window_writes():
"""MudScreen discards writes to upper window (status line)."""
screen = MudScreen()
screen.write("before ")
screen.select_window(1) # upper window
screen.write("STATUS LINE")
screen.select_window(0) # back to lower window
screen.write("after")
output = screen.flush()
assert output == "before after"
def test_mud_screen_lower_window_is_default():
"""MudScreen starts with lower window active, writes are captured."""
screen = MudScreen()
screen.write("hello")
assert screen.flush() == "hello"
def test_mud_screen_upper_window_multiple_writes():
"""MudScreen discards all writes while upper window is active."""
screen = MudScreen()
screen.select_window(1)
screen.write("Room Name")
screen.write(" | Score: 0")
screen.select_window(0)
screen.write("You are in a dark room.\n")
output = screen.flush()
assert output == "You are in a dark room.\n"
def test_mud_input_stream_feed_and_read():
"""MudInputStream feed and read_line work with threading."""
stream = MudInputStream()

View file

@ -67,10 +67,9 @@ async def test_play_enters_if_mode(player):
mock_session.save_path = Mock(spec=Path)
mock_session.save_path.exists = Mock(return_value=False)
with patch("mudlib.commands.play.IFSession") as MockIFSession:
MockIFSession.return_value = mock_session
with patch("mudlib.commands.play.EmbeddedIFSession") as MockSession:
MockSession.return_value = mock_session
# Use .z5 to test dfrotz path
with patch("mudlib.commands.play._find_story") as mock_find:
mock_find.return_value = "/fake/path/zork1.z5"
@ -108,11 +107,11 @@ async def test_play_handles_dfrotz_missing(player):
with patch("mudlib.commands.play.IFSession") as MockIFSession:
MockIFSession.return_value = mock_session
# Use .z5 to test dfrotz path
# Use .zblorb to test dfrotz path (z3/z5/z8 go to embedded)
with patch("mudlib.commands.play._find_story") as mock_find:
mock_find.return_value = "/fake/path/zork1.z5"
mock_find.return_value = "/fake/path/game.zblorb"
await cmd_play(player, "zork1")
await cmd_play(player, "game")
# Verify error message was sent
player.writer.write.assert_called()
@ -142,10 +141,9 @@ async def test_play_restores_save_if_exists(player):
)
mock_session.start = AsyncMock(return_value=restored_output)
with patch("mudlib.commands.play.IFSession") as MockIFSession:
MockIFSession.return_value = mock_session
with patch("mudlib.commands.play.EmbeddedIFSession") as MockSession:
MockSession.return_value = mock_session
# Use .z5 to test dfrotz path
with patch("mudlib.commands.play._find_story") as mock_find:
mock_find.return_value = "/fake/path/zork1.z5"
@ -170,14 +168,13 @@ async def test_play_no_restore_if_no_save(player):
from mudlib.commands.play import cmd_play
# Mock IFSession
# Mock EmbeddedIFSession
mock_session = Mock()
mock_session.start = AsyncMock(return_value="Welcome to Zork!")
with patch("mudlib.commands.play.IFSession") as MockIFSession:
MockIFSession.return_value = mock_session
with patch("mudlib.commands.play.EmbeddedIFSession") as MockSession:
MockSession.return_value = mock_session
# Use .z5 to test dfrotz path
with patch("mudlib.commands.play._find_story") as mock_find:
mock_find.return_value = "/fake/path/zork1.z5"