#!/usr/bin/env -S uv run --script """Time Lost Pig V8 per-command execution. Measures wall-clock time and opcode count for each command to diagnose whether slowness is in z-machine processing or MUD async plumbing. Usage: time_lostpig.py # default commands time_lostpig.py look look inventory # custom commands """ # ruff: noqa: E402 import sys import time from pathlib import Path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root / "src")) from mudlib.zmachine import ZMachine, zstream, zui from mudlib.zmachine.quetzal import QuetzalParser from mudlib.zmachine.trivialzui import ( TrivialAudio, TrivialFilesystem, TrivialScreen, ) from mudlib.zmachine.zcpu import ( ZCpuNotImplemented, ZCpuQuit, ZCpuRestart, ) story_path = project_root / "content" / "stories" / "LostPig.z8" if not story_path.exists(): print(f"ERROR: {story_path} not found") sys.exit(1) story_bytes = story_path.read_bytes() print(f"Loaded LostPig.z8: {len(story_bytes)} bytes, version {story_bytes[0]}") # Parse command line arguments for custom commands default_commands = ["look", "look", "inventory", "x me", "n"] test_commands = sys.argv[1:] if len(sys.argv) > 1 else default_commands class TimedInputStream(zstream.ZInputStream): """Input stream that tracks timing and feeds commands.""" def __init__(self, commands): super().__init__() self._commands = commands self._command_index = 0 self._command_start_time: float | None = None self._command_start_opcodes: int | None = None self._timings: list[tuple[int, int, int]] = [] self.total_opcodes: int = 0 def read_line(self, *args, **kwargs): # Mark end of previous command processing if ( self._command_start_time is not None and self._command_start_opcodes is not None ): elapsed_ms = int((time.perf_counter() - self._command_start_time) * 1000) opcodes = self.total_opcodes - self._command_start_opcodes self._timings.append((self._command_index - 1, elapsed_ms, opcodes)) # Return next command or quit if self._command_index >= len(self._commands): raise ZCpuQuit command = self._commands[self._command_index] self._command_index += 1 # Mark start of new command processing self._command_start_time = time.perf_counter() self._command_start_opcodes = self.total_opcodes return command def read_char(self, *args, **kwargs): # For simple timing, just return enter key # Mark timing same as read_line if ( self._command_start_time is not None and self._command_start_opcodes is not None ): elapsed_ms = int((time.perf_counter() - self._command_start_time) * 1000) opcodes = self.total_opcodes - self._command_start_opcodes self._timings.append((self._command_index - 1, elapsed_ms, opcodes)) if self._command_index >= len(self._commands): raise ZCpuQuit # Just return enter self._command_start_time = time.perf_counter() self._command_start_opcodes = self.total_opcodes return ord("\r") def get_timings(self): return self._timings # Set up UI audio = TrivialAudio() screen = TrivialScreen() keyboard = TimedInputStream(test_commands) filesystem = TrivialFilesystem() ui = zui.ZUI(audio, screen, keyboard, filesystem) zm = ZMachine(story_bytes, ui) print(f"Testing {len(test_commands)} commands: {test_commands}") print() # Try to restore from save file data_dir = project_root / "data" save_dir = data_dir / "if_saves" restored = False if save_dir.exists(): # Look for any save file matching LostPig for save_path in save_dir.glob("*/LostPig.qzl"): print(f"Found save file: {save_path}") try: save_data = save_path.read_bytes() parser = QuetzalParser(zm) parser.load_from_bytes(save_data) pc = zm._opdecoder.program_counter mem = zm._mem # Handle V5+ in-game save restore if ( mem.version >= 5 and pc >= 3 and mem[pc - 3] == 0xBE and mem[pc - 2] == 0x00 ): zm._cpu._write_result(2) restored = True print(f"Restored from {save_path}") break except Exception as e: print(f"Could not restore from {save_path}: {e}") if not restored: print("No save file found, starting from beginning") print() print("Running timed commands...") print() # Run interpreter with stepping to count opcodes step_count = 0 max_steps = 2_000_000 try: while step_count < max_steps: try: zm._cpu.step() step_count += 1 keyboard.total_opcodes = step_count except ZCpuQuit: print(f"Game quit after {step_count:,} steps") break except ZCpuRestart: print(f"Game restart after {step_count:,} steps") break except ZCpuNotImplemented as e: print(f"NOT IMPLEMENTED at step {step_count:,}: {e}") break except Exception as e: print(f"ERROR at step {step_count:,}: {type(e).__name__}: {e}") import traceback traceback.print_exc() zm._cpu._dump_trace() break except KeyboardInterrupt: print(f"\nInterrupted at step {step_count:,}") print() print("=" * 60) print("TIMING RESULTS") print("=" * 60) print() timings = keyboard.get_timings() if timings: total_ms = 0 total_cmd_opcodes = 0 for idx, elapsed_ms, opcodes in timings: if idx < len(keyboard._commands): cmd = keyboard._commands[idx] rate = opcodes / elapsed_ms if elapsed_ms > 0 else 0 print( f'command {idx + 1} "{cmd}": ' f"{opcodes:,} opcodes in {elapsed_ms}ms ({rate:,.0f} ops/ms)" ) total_ms += elapsed_ms total_cmd_opcodes += opcodes if total_ms > 0: avg_rate = total_cmd_opcodes / total_ms print() print( f"Average rate: {avg_rate:,.0f} opcodes/ms " f"({avg_rate * 1000:,.0f} opcodes/sec)" ) else: print("No timing data collected") print() print(f"Total opcodes executed: {step_count:,}")