import asyncio import logging import re import threading import traceback from pathlib import Path from typing import TYPE_CHECKING from mudlib.if_session import IFResponse from mudlib.zmachine.mud_ui import create_mud_ui from mudlib.zmachine.quetzal import QuetzalParser, QuetzalWriter from mudlib.zmachine.zcpu import ZCpuQuit, ZCpuRestart from mudlib.zmachine.zmachine import ZMachine if TYPE_CHECKING: from mudlib.player import Player logger = logging.getLogger(__name__) class EmbeddedIFSession: """Wraps z-machine interpreter for MUD integration.""" def __init__(self, player: "Player", story_path: str, game_name: str = ""): self.player = player self.story_path = story_path self.game_name = game_name or Path(story_path).stem self._data_dir = Path(__file__).resolve().parents[2] / "data" self._thread: threading.Thread | None = None self._done = False self._error: str | None = None story_bytes = Path(story_path).read_bytes() save_path = self.save_path self._ui, self._screen, self._keyboard = create_mud_ui(save_path) self._zmachine = ZMachine(story_bytes, self._ui) self._filesystem = self._ui.filesystem def _strip_prompt(self, output: str) -> str: """Strip trailing > prompt from game output (matches dfrotz behavior).""" has_prompt = ( output.endswith("> ") or output.endswith(">\n") or output.endswith(">\r\n") ) text = output.rstrip() if text.endswith("\n>"): return text[:-2].rstrip() if text == ">": return "" if has_prompt and text.endswith(">"): return text[:-1].rstrip() return output @property def save_path(self) -> Path: safe_name = re.sub(r"[^a-zA-Z0-9_-]", "_", self.player.name) return self._data_dir / "if_saves" / safe_name / f"{self.game_name}.qzl" def _try_restore(self) -> bool: """Try to restore from save file before interpreter starts. Must be called before the interpreter thread is launched. Returns True if state was restored successfully. Handles two save origins: - In-game save (V3: opcode 0xB5, V5+: EXT 0xBE/0x00): PC points at branch data (V3) or store byte (V5+). Process them so execution resumes at the next instruction. - MUD-level _do_save during sread/aread: PC points past the read instruction (store byte pre-consumed in op_aread/op_read_char). No post-processing needed (phantom output suppressed in start()). """ if not self.save_path.exists(): return False try: save_data = self.save_path.read_bytes() parser = QuetzalParser(self._zmachine) parser.load_from_bytes(save_data) pc = self._zmachine._opdecoder.program_counter mem = self._zmachine._mem if mem.version <= 3 and pc > 0 and mem[pc - 1] == 0xB5: # V3 in-game save: PC at branch data after save opcode (0xB5). # Process the branch as "save succeeded". self._zmachine._cpu._branch(True) elif ( mem.version >= 5 and pc >= 3 and mem[pc - 3] == 0xBE and mem[pc - 2] == 0x00 ): # V5+ in-game save: PC at store byte after EXT save opcode. # Read store byte and write 2 ("restored") to that variable. self._zmachine._cpu._write_result(2) return True except Exception as e: logger.debug(f"Restore failed: {e}") return False async def start(self) -> str: """Start the z-machine interpreter, restoring from save if available.""" restored = self._try_restore() self._thread = threading.Thread(target=self._run_interpreter, daemon=True) self._thread.start() loop = asyncio.get_running_loop() await loop.run_in_executor(None, self._keyboard._waiting.wait) output = self._screen.flush() if restored: # After restore, the game processes phantom input (garbage in text # buffer), producing unwanted output. Suppress it and only show the # restore confirmation. return "restoring saved game...\r\nrestored." output = self._strip_prompt(output) return output async def handle_input(self, text: str) -> IFResponse: if text.lower() == "::quit": await self._do_save() return IFResponse(output="game saved.", done=True) if text.lower() == "::help": help_text = """escape commands: ::quit - exit the game ::save - save game progress ::help - show this help""" return IFResponse(output=help_text, done=False) if text.lower() == "::save": confirmation = await self._do_save() return IFResponse(output=confirmation, done=False) self._keyboard._waiting.clear() self._keyboard.feed(text) loop = asyncio.get_running_loop() def wait_for_next_input(): while not self._done and not self._keyboard._waiting.is_set(): self._keyboard._waiting.wait(timeout=0.1) await loop.run_in_executor(None, wait_for_next_input) output = self._screen.flush() output = self._strip_prompt(output) if self._done and self._error: output = f"{output}\r\n{self._error}" if output else self._error return IFResponse(output=output, done=self._done) async def stop(self): self._done = True if self._keyboard._waiting.is_set(): self._keyboard.feed("") def _run_interpreter(self): try: self._zmachine.run() except ZCpuQuit: logger.debug("Interpreter quit normally") except ZCpuRestart: logger.debug("Interpreter restart requested") except Exception as e: tb = traceback.format_exc() logger.error(f"Interpreter crashed:\n{tb}") msg = str(e) or type(e).__name__ self._error = f"interpreter error: {msg}" finally: self._done = True self._keyboard._waiting.set() async def _do_save(self) -> str: try: writer = QuetzalWriter(self._zmachine) save_data = writer.generate_save_data() success = self._filesystem.save_game(save_data) if success: return "saved." return "error: save failed" except Exception as e: return f"error: save failed ({e})" def get_location_name(self) -> str | None: try: location_obj = self._zmachine._mem.read_global(0) if location_obj == 0: return None return self._zmachine._objectparser.get_shortname(location_obj) except Exception: return None def get_room_objects(self) -> list[str]: try: location_obj = self._zmachine._mem.read_global(0) if location_obj == 0: return [] objects = [] child = self._zmachine._objectparser.get_child(location_obj) while child != 0: name = self._zmachine._objectparser.get_shortname(child) objects.append(name) child = self._zmachine._objectparser.get_sibling(child) return objects except Exception: return []