Port 5 complex opcodes to hybrid z-machine interpreter

Implement op_sread (text input), op_save/restore (file I/O stubs),
op_restart (exception-based reset), op_input_stream and op_sound_effect
(no-op stubs). Add ZCpuRestart exception. All implementations follow TDD
with comprehensive unit tests.
This commit is contained in:
Jared Miller 2026-02-09 20:18:57 -05:00
parent c76ee337d3
commit 72dd047b7b
Signed by: shmup
GPG key ID: 22B5C6D66A38B06C
2 changed files with 198 additions and 14 deletions

View file

@ -37,6 +37,10 @@ class ZCpuQuit(ZCpuError):
"Quit opcode executed"
class ZCpuRestart(ZCpuError):
"Restart opcode executed"
class ZCpu:
def __init__(
self, zmem, zopdecoder, zstack, zobjects, zstring, zstreammanager, zui
@ -495,24 +499,35 @@ class ZCpu:
pass
def op_save(self, *args):
"""TODO: Write docstring here."""
raise ZCpuNotImplemented
"""Save game state to file (V3 - branch on success).
Currently always fails because QuetzalWriter is not yet functional.
"""
self._branch(False)
def op_save_v4(self, *args):
"""TODO: Write docstring here."""
raise ZCpuNotImplemented
def op_restore(self, *args):
"""TODO: Write docstring here."""
raise ZCpuNotImplemented
"""Restore game state from file (V3 - branch on success).
Currently always fails because QuetzalWriter is not yet functional,
so there are no valid save files to restore.
"""
self._branch(False)
def op_restore_v4(self, *args):
"""TODO: Write docstring here."""
raise ZCpuNotImplemented
def op_restart(self, *args):
"""TODO: Write docstring here."""
raise ZCpuNotImplemented
"""Restart the game from the beginning.
Raises ZCpuRestart exception for the run loop to handle.
The ZMachine should catch this and reset to initial state.
"""
raise ZCpuRestart
def op_ret_popped(self, *args):
"""Pop a value from the stack and return it from the current routine."""
@ -579,8 +594,34 @@ class ZCpu:
self._objects.set_property(object_number, property_number, value)
def op_sread(self, *args):
"""Not implemented yet, but documented so that the detection
code will be foiled."""
"""Read text input from keyboard (V3).
Args:
args[0]: text buffer address (byte 0 = max length, text starts at byte 1)
args[1]: parse buffer address (optional, 0 = no parsing)
"""
text_buffer_addr = args[0]
parse_buffer_addr = args[1] if len(args) > 1 else 0
# V3: show status line first
if self._memory.version <= 3:
self.op_show_status()
# Read input from keyboard
text = self._ui.keyboard.readline()
text = text.lower().strip("\n\r")
# Store in text buffer
max_len = self._memory[text_buffer_addr]
text = text[:max_len]
for i, ch in enumerate(text):
self._memory[text_buffer_addr + 1 + i] = ord(ch)
self._memory[text_buffer_addr + 1 + len(text)] = 0
# Tokenize if parse buffer provided
# Note: ZLexer not yet wired up - tokenization will be added when needed
if parse_buffer_addr != 0:
pass # TODO: add tokenization when ZLexer is integrated
def op_sread_v4(self, *args):
"""TODO: Write docstring here."""
@ -692,15 +733,15 @@ class ZCpu:
else:
self._streammanager.output.select(stream_num)
def op_input_stream(self, *args):
"""TODO: Write docstring here."""
raise ZCpuNotImplemented
def op_input_stream(self, stream_num):
"""Select input stream (stub - not yet implemented for MUD integration)."""
pass
# This one may have been used prematurely in v3 stories. Keep an
# eye out for it if we ever get bug reports.
def op_sound_effect(self, *args):
"""TODO: Write docstring here."""
raise ZCpuNotImplemented
"""Play sound effect (no-op - sound not supported in text MUD)."""
pass
def op_read_char(self, unused, time, input_routine):
"""Read a single character from input stream 0 (keyboard).

View file

@ -8,7 +8,7 @@ required dependencies (memory, stack, decoder, etc).
from unittest import TestCase
from unittest.mock import Mock
from mudlib.zmachine.zcpu import ZCpu, ZCpuDivideByZero, ZCpuQuit
from mudlib.zmachine.zcpu import ZCpu, ZCpuDivideByZero, ZCpuQuit, ZCpuRestart
class MockMemory:
@ -85,6 +85,7 @@ class MockUI:
def __init__(self):
self.screen = Mock()
self.screen.write = Mock()
self.keyboard = Mock()
class ZMachineOpcodeTests(TestCase):
@ -516,6 +517,148 @@ class ZMachineObjectOpcodeTests(TestCase):
self.assertEqual(self.stack.pop_stack(), 0)
class ZMachineComplexOpcodeTests(TestCase):
"""Test suite for complex Z-machine opcodes (input, save/restore, restart)."""
def setUp(self):
"""Create a CPU with all necessary mocks."""
self.memory = MockMemory()
self.stack = MockStackManager()
self.decoder = MockOpDecoder()
self.ui = MockUI()
# Create CPU with mocked dependencies
self.cpu = ZCpu(
self.memory,
self.decoder,
self.stack,
Mock(), # objects
Mock(), # string
Mock(), # stream manager
self.ui,
)
def test_op_sread_v3_basic_input(self):
"""Test sread (V3) reads text into buffer and null-terminates."""
# Setup: text buffer at 0x1000, max length 20
text_buffer_addr = 0x1000
self.memory[text_buffer_addr] = 20 # max length
# Mock keyboard input
self.ui.keyboard.readline = Mock(return_value="hello world\n")
# Call sread with text buffer only (no parse buffer)
self.cpu.op_sread(text_buffer_addr, 0)
# Verify text is stored lowercased starting at offset 1
expected = "hello world"
for i, ch in enumerate(expected):
self.assertEqual(self.memory[text_buffer_addr + 1 + i], ord(ch))
# Verify null termination
self.assertEqual(self.memory[text_buffer_addr + 1 + len(expected)], 0)
def test_op_sread_v3_truncates_to_max_length(self):
"""Test sread respects max length in buffer."""
text_buffer_addr = 0x1000
self.memory[text_buffer_addr] = 5 # max length of 5
self.ui.keyboard.readline = Mock(return_value="hello world\n")
self.cpu.op_sread(text_buffer_addr, 0)
# Should only store first 5 characters
expected = "hello"
for i, ch in enumerate(expected):
self.assertEqual(self.memory[text_buffer_addr + 1 + i], ord(ch))
self.assertEqual(self.memory[text_buffer_addr + 1 + 5], 0)
def test_op_sread_v3_lowercases_input(self):
"""Test sread converts input to lowercase."""
text_buffer_addr = 0x1000
self.memory[text_buffer_addr] = 20
self.ui.keyboard.readline = Mock(return_value="HELLO World\n")
self.cpu.op_sread(text_buffer_addr, 0)
expected = "hello world"
for i, ch in enumerate(expected):
self.assertEqual(self.memory[text_buffer_addr + 1 + i], ord(ch))
def test_op_sread_v3_strips_newlines(self):
"""Test sread strips newlines and carriage returns."""
text_buffer_addr = 0x1000
self.memory[text_buffer_addr] = 20
self.ui.keyboard.readline = Mock(return_value="test\r\n")
self.cpu.op_sread(text_buffer_addr, 0)
expected = "test"
for i, ch in enumerate(expected):
self.assertEqual(self.memory[text_buffer_addr + 1 + i], ord(ch))
self.assertEqual(self.memory[text_buffer_addr + 1 + 4], 0)
def test_op_sread_v3_calls_show_status(self):
"""Test sread calls op_show_status for V3."""
text_buffer_addr = 0x1000
self.memory[text_buffer_addr] = 20
self.ui.keyboard.readline = Mock(return_value="test\n")
# Track if show_status was called
call_count = [0]
original = self.cpu.op_show_status
def counted_show_status(*args):
call_count[0] += 1
return original(*args)
self.cpu.op_show_status = counted_show_status # type: ignore
self.cpu.op_sread(text_buffer_addr, 0)
# Should have called show_status once
self.assertEqual(call_count[0], 1)
def test_op_save_v3_branches_false(self):
"""Test save (V3) branches false (QuetzalWriter not functional)."""
self.decoder.branch_condition = True
self.decoder.branch_offset = 100
old_pc = self.cpu._opdecoder.program_counter
self.cpu.op_save()
# Should not have branched (test is false)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
def test_op_restore_v3_branches_false(self):
"""Test restore (V3) branches false (no valid save files)."""
self.decoder.branch_condition = True
self.decoder.branch_offset = 100
old_pc = self.cpu._opdecoder.program_counter
self.cpu.op_restore()
# Should not have branched (test is false)
self.assertEqual(self.cpu._opdecoder.program_counter, old_pc)
def test_op_input_stream_is_noop(self):
"""Test input_stream is a no-op stub."""
# Should not raise
self.cpu.op_input_stream(1)
def test_op_sound_effect_is_noop(self):
"""Test sound_effect is a no-op stub."""
# Should not raise
self.cpu.op_sound_effect(1, 2, 3)
def test_op_restart_raises_exception(self):
"""Test restart raises ZCpuRestart exception for run loop to handle."""
with self.assertRaises(ZCpuRestart):
self.cpu.op_restart()
# Note: ZObjectParser methods are tested through integration tests
# with real story files, not unit tests with mock memory, as the
# interaction with ZStringFactory makes mocking complex.