mud/scripts/time_lostpig.py

217 lines
6.4 KiB
Python
Executable file

#!/usr/bin/env -S uv run --script
"""Time Lost Pig V8 per-command execution.
Measures wall-clock time and opcode count for each command to diagnose
whether slowness is in z-machine processing or MUD async plumbing.
Usage:
time_lostpig.py # default commands
time_lostpig.py look look inventory # custom commands
"""
# ruff: noqa: E402
import sys
import time
from pathlib import Path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))
from mudlib.zmachine import ZMachine, zstream, zui
from mudlib.zmachine.quetzal import QuetzalParser
from mudlib.zmachine.trivialzui import (
TrivialAudio,
TrivialFilesystem,
TrivialScreen,
)
from mudlib.zmachine.zcpu import (
ZCpuNotImplemented,
ZCpuQuit,
ZCpuRestart,
)
story_path = project_root / "content" / "stories" / "LostPig.z8"
if not story_path.exists():
print(f"ERROR: {story_path} not found")
sys.exit(1)
story_bytes = story_path.read_bytes()
print(f"Loaded LostPig.z8: {len(story_bytes)} bytes, version {story_bytes[0]}")
# Parse command line arguments for custom commands
default_commands = ["look", "look", "inventory", "x me", "n"]
test_commands = sys.argv[1:] if len(sys.argv) > 1 else default_commands
class TimedInputStream(zstream.ZInputStream):
"""Input stream that tracks timing and feeds commands."""
def __init__(self, commands):
super().__init__()
self._commands = commands
self._command_index = 0
self._command_start_time: float | None = None
self._command_start_opcodes: int | None = None
self._timings: list[tuple[int, int, int]] = []
self.total_opcodes: int = 0
def read_line(self, *args, **kwargs):
# Mark end of previous command processing
if (
self._command_start_time is not None
and self._command_start_opcodes is not None
):
elapsed_ms = int((time.perf_counter() - self._command_start_time) * 1000)
opcodes = self.total_opcodes - self._command_start_opcodes
self._timings.append((self._command_index - 1, elapsed_ms, opcodes))
# Return next command or quit
if self._command_index >= len(self._commands):
raise ZCpuQuit
command = self._commands[self._command_index]
self._command_index += 1
# Mark start of new command processing
self._command_start_time = time.perf_counter()
self._command_start_opcodes = self.total_opcodes
return command
def read_char(self, *args, **kwargs):
# For simple timing, just return enter key
# Mark timing same as read_line
if (
self._command_start_time is not None
and self._command_start_opcodes is not None
):
elapsed_ms = int((time.perf_counter() - self._command_start_time) * 1000)
opcodes = self.total_opcodes - self._command_start_opcodes
self._timings.append((self._command_index - 1, elapsed_ms, opcodes))
if self._command_index >= len(self._commands):
raise ZCpuQuit
# Just return enter
self._command_start_time = time.perf_counter()
self._command_start_opcodes = self.total_opcodes
return ord("\r")
def get_timings(self):
return self._timings
# Set up UI
audio = TrivialAudio()
screen = TrivialScreen()
keyboard = TimedInputStream(test_commands)
filesystem = TrivialFilesystem()
ui = zui.ZUI(audio, screen, keyboard, filesystem)
zm = ZMachine(story_bytes, ui)
print(f"Testing {len(test_commands)} commands: {test_commands}")
print()
# Try to restore from save file
data_dir = project_root / "data"
save_dir = data_dir / "if_saves"
restored = False
if save_dir.exists():
# Look for any save file matching LostPig
for save_path in save_dir.glob("*/LostPig.qzl"):
print(f"Found save file: {save_path}")
try:
save_data = save_path.read_bytes()
parser = QuetzalParser(zm)
parser.load_from_bytes(save_data)
pc = zm._opdecoder.program_counter
mem = zm._mem
# Handle V5+ in-game save restore
if (
mem.version >= 5
and pc >= 3
and mem[pc - 3] == 0xBE
and mem[pc - 2] == 0x00
):
zm._cpu._write_result(2)
restored = True
print(f"Restored from {save_path}")
break
except Exception as e:
print(f"Could not restore from {save_path}: {e}")
if not restored:
print("No save file found, starting from beginning")
print()
print("Running timed commands...")
print()
# Run interpreter with stepping to count opcodes
step_count = 0
max_steps = 2_000_000
try:
while step_count < max_steps:
try:
zm._cpu.step()
step_count += 1
keyboard.total_opcodes = step_count
except ZCpuQuit:
print(f"Game quit after {step_count:,} steps")
break
except ZCpuRestart:
print(f"Game restart after {step_count:,} steps")
break
except ZCpuNotImplemented as e:
print(f"NOT IMPLEMENTED at step {step_count:,}: {e}")
break
except Exception as e:
print(f"ERROR at step {step_count:,}: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
zm._cpu._dump_trace()
break
except KeyboardInterrupt:
print(f"\nInterrupted at step {step_count:,}")
print()
print("=" * 60)
print("TIMING RESULTS")
print("=" * 60)
print()
timings = keyboard.get_timings()
if timings:
total_ms = 0
total_cmd_opcodes = 0
for idx, elapsed_ms, opcodes in timings:
if idx < len(keyboard._commands):
cmd = keyboard._commands[idx]
rate = opcodes / elapsed_ms if elapsed_ms > 0 else 0
print(
f'command {idx + 1} "{cmd}": '
f"{opcodes:,} opcodes in {elapsed_ms}ms ({rate:,.0f} ops/ms)"
)
total_ms += elapsed_ms
total_cmd_opcodes += opcodes
if total_ms > 0:
avg_rate = total_cmd_opcodes / total_ms
print()
print(
f"Average rate: {avg_rate:,.0f} opcodes/ms "
f"({avg_rate * 1000:,.0f} opcodes/sec)"
)
else:
print("No timing data collected")
print()
print(f"Total opcodes executed: {step_count:,}")