mud/src/mudlib/embedded_if_session.py
Jared Miller 5a98adb6ee
Add instruction tracing to step_fast and improve error messages
step_fast() never recorded trace entries, so crash dumps always showed
an empty trace. Now records PC + opcode info in the same deque as
step(). Also includes exception type in player-facing error messages
when the exception string is empty.
2026-02-10 18:29:27 -05:00

208 lines
7.4 KiB
Python

import asyncio
import logging
import re
import threading
import traceback
from pathlib import Path
from typing import TYPE_CHECKING
from mudlib.if_session import IFResponse
from mudlib.zmachine.mud_ui import create_mud_ui
from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter
from mudlib.zmachine.zcpu import ZCpuQuit, ZCpuRestart
from mudlib.zmachine.zmachine import ZMachine
if TYPE_CHECKING:
from mudlib.player import Player
logger = logging.getLogger(__name__)
class EmbeddedIFSession:
"""Wraps z-machine interpreter for MUD integration."""
def __init__(self, player: "Player", story_path: str, game_name: str = ""):
self.player = player
self.story_path = story_path
self.game_name = game_name or Path(story_path).stem
self._data_dir = Path(__file__).resolve().parents[2] / "data"
self._thread: threading.Thread | None = None
self._done = False
self._error: str | None = None
story_bytes = Path(story_path).read_bytes()
save_path = self.save_path
self._ui, self._screen, self._keyboard = create_mud_ui(save_path)
self._zmachine = ZMachine(story_bytes, self._ui)
self._filesystem = self._ui.filesystem
def _strip_prompt(self, output: str) -> str:
"""Strip trailing > prompt from game output (matches dfrotz behavior)."""
has_prompt = (
output.endswith("> ") or output.endswith(">\n") or output.endswith(">\r\n")
)
text = output.rstrip()
if text.endswith("\n>"):
return text[:-2].rstrip()
if text == ">":
return ""
if has_prompt and text.endswith(">"):
return text[:-1].rstrip()
return output
@property
def save_path(self) -> Path:
safe_name = re.sub(r"[^a-zA-Z0-9_-]", "_", self.player.name)
return self._data_dir / "if_saves" / safe_name / f"{self.game_name}.qzl"
def _try_restore(self) -> bool:
"""Try to restore from save file before interpreter starts.
Must be called before the interpreter thread is launched.
Returns True if state was restored successfully.
Handles two save origins:
- In-game save (V3: opcode 0xB5, V5+: EXT 0xBE/0x00): PC points at
branch data (V3) or store byte (V5+). Process them so execution
resumes at the next instruction.
- MUD-level _do_save during sread/aread: PC points past the read
instruction (store byte pre-consumed in op_aread/op_read_char).
No post-processing needed (phantom output suppressed in start()).
"""
if not self.save_path.exists():
return False
try:
save_data = self.save_path.read_bytes()
parser = QuetzalParser(self._zmachine)
parser.load_from_bytes(save_data)
pc = self._zmachine._opdecoder.program_counter
mem = self._zmachine._mem
if mem.version <= 3 and pc > 0 and mem[pc - 1] == 0xB5:
# V3 in-game save: PC at branch data after save opcode (0xB5).
# Process the branch as "save succeeded".
self._zmachine._cpu._branch(True)
elif (
mem.version >= 5
and pc >= 3
and mem[pc - 3] == 0xBE
and mem[pc - 2] == 0x00
):
# V5+ in-game save: PC at store byte after EXT save opcode.
# Read store byte and write 2 ("restored") to that variable.
self._zmachine._cpu._write_result(2)
return True
except Exception as e:
logger.debug(f"Restore failed: {e}")
return False
async def start(self) -> str:
"""Start the z-machine interpreter, restoring from save if available."""
restored = self._try_restore()
self._thread = threading.Thread(target=self._run_interpreter, daemon=True)
self._thread.start()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._keyboard._waiting.wait)
output = self._screen.flush()
if restored:
# After restore, the game processes phantom input (garbage in text
# buffer), producing unwanted output. Suppress it and only show the
# restore confirmation.
return "restoring saved game...\r\nrestored."
output = self._strip_prompt(output)
return output
async def handle_input(self, text: str) -> IFResponse:
if text.lower() == "::quit":
await self._do_save()
return IFResponse(output="game saved.", done=True)
if text.lower() == "::help":
help_text = """escape commands:
::quit - exit the game
::save - save game progress
::help - show this help"""
return IFResponse(output=help_text, done=False)
if text.lower() == "::save":
confirmation = await self._do_save()
return IFResponse(output=confirmation, done=False)
self._keyboard._waiting.clear()
self._keyboard.feed(text)
loop = asyncio.get_running_loop()
def wait_for_next_input():
while not self._done and not self._keyboard._waiting.is_set():
self._keyboard._waiting.wait(timeout=0.1)
await loop.run_in_executor(None, wait_for_next_input)
output = self._screen.flush()
output = self._strip_prompt(output)
if self._done and self._error:
output = f"{output}\r\n{self._error}" if output else self._error
return IFResponse(output=output, done=self._done)
async def stop(self):
self._done = True
if self._keyboard._waiting.is_set():
self._keyboard.feed("")
def _run_interpreter(self):
try:
self._zmachine.run()
except ZCpuQuit:
logger.debug("Interpreter quit normally")
except ZCpuRestart:
logger.debug("Interpreter restart requested")
except Exception as e:
tb = traceback.format_exc()
logger.error(f"Interpreter crashed:\n{tb}")
msg = str(e) or type(e).__name__
self._error = f"interpreter error: {msg}"
finally:
self._done = True
self._keyboard._waiting.set()
async def _do_save(self) -> str:
try:
writer = QuetzalWriter(self._zmachine)
save_data = writer.generate_save_data()
success = self._filesystem.save_game(save_data)
if success:
return "saved."
return "error: save failed"
except Exception as e:
return f"error: save failed ({e})"
def get_location_name(self) -> str | None:
try:
location_obj = self._zmachine._mem.read_global(0)
if location_obj == 0:
return None
return self._zmachine._objectparser.get_shortname(location_obj)
except Exception:
return None
def get_room_objects(self) -> list[str]:
try:
location_obj = self._zmachine._mem.read_global(0)
if location_obj == 0:
return []
objects = []
child = self._zmachine._objectparser.get_child(location_obj)
while child != 0:
name = self._zmachine._objectparser.get_shortname(child)
objects.append(name)
child = self._zmachine._objectparser.get_sibling(child)
return objects
except Exception:
return []