From 72dd047b7b4181ab4897d0007a7ab3f67e5de07b Mon Sep 17 00:00:00 2001 From: Jared Miller Date: Mon, 9 Feb 2026 20:18:57 -0500 Subject: [PATCH] Port 5 complex opcodes to hybrid z-machine interpreter Implement op_sread (text input), op_save/restore (file I/O stubs), op_restart (exception-based reset), op_input_stream and op_sound_effect (no-op stubs). Add ZCpuRestart exception. All implementations follow TDD with comprehensive unit tests. --- src/mudlib/zmachine/zcpu.py | 67 ++++++++++++--- tests/test_zmachine_opcodes.py | 145 ++++++++++++++++++++++++++++++++- 2 files changed, 198 insertions(+), 14 deletions(-) diff --git a/src/mudlib/zmachine/zcpu.py b/src/mudlib/zmachine/zcpu.py index 7f277d5..e19f5ad 100644 --- a/src/mudlib/zmachine/zcpu.py +++ b/src/mudlib/zmachine/zcpu.py @@ -37,6 +37,10 @@ class ZCpuQuit(ZCpuError): "Quit opcode executed" +class ZCpuRestart(ZCpuError): + "Restart opcode executed" + + class ZCpu: def __init__( self, zmem, zopdecoder, zstack, zobjects, zstring, zstreammanager, zui @@ -495,24 +499,35 @@ class ZCpu: pass def op_save(self, *args): - """TODO: Write docstring here.""" - raise ZCpuNotImplemented + """Save game state to file (V3 - branch on success). + + Currently always fails because QuetzalWriter is not yet functional. + """ + self._branch(False) def op_save_v4(self, *args): """TODO: Write docstring here.""" raise ZCpuNotImplemented def op_restore(self, *args): - """TODO: Write docstring here.""" - raise ZCpuNotImplemented + """Restore game state from file (V3 - branch on success). + + Currently always fails because QuetzalWriter is not yet functional, + so there are no valid save files to restore. + """ + self._branch(False) def op_restore_v4(self, *args): """TODO: Write docstring here.""" raise ZCpuNotImplemented def op_restart(self, *args): - """TODO: Write docstring here.""" - raise ZCpuNotImplemented + """Restart the game from the beginning. + + Raises ZCpuRestart exception for the run loop to handle. + The ZMachine should catch this and reset to initial state. + """ + raise ZCpuRestart def op_ret_popped(self, *args): """Pop a value from the stack and return it from the current routine.""" @@ -579,8 +594,34 @@ class ZCpu: self._objects.set_property(object_number, property_number, value) def op_sread(self, *args): - """Not implemented yet, but documented so that the detection - code will be foiled.""" + """Read text input from keyboard (V3). + + Args: + args[0]: text buffer address (byte 0 = max length, text starts at byte 1) + args[1]: parse buffer address (optional, 0 = no parsing) + """ + text_buffer_addr = args[0] + parse_buffer_addr = args[1] if len(args) > 1 else 0 + + # V3: show status line first + if self._memory.version <= 3: + self.op_show_status() + + # Read input from keyboard + text = self._ui.keyboard.readline() + text = text.lower().strip("\n\r") + + # Store in text buffer + max_len = self._memory[text_buffer_addr] + text = text[:max_len] + for i, ch in enumerate(text): + self._memory[text_buffer_addr + 1 + i] = ord(ch) + self._memory[text_buffer_addr + 1 + len(text)] = 0 + + # Tokenize if parse buffer provided + # Note: ZLexer not yet wired up - tokenization will be added when needed + if parse_buffer_addr != 0: + pass # TODO: add tokenization when ZLexer is integrated def op_sread_v4(self, *args): """TODO: Write docstring here.""" @@ -692,15 +733,15 @@ class ZCpu: else: self._streammanager.output.select(stream_num) - def op_input_stream(self, *args): - """TODO: Write docstring here.""" - raise ZCpuNotImplemented + def op_input_stream(self, stream_num): + """Select input stream (stub - not yet implemented for MUD integration).""" + pass # This one may have been used prematurely in v3 stories. Keep an # eye out for it if we ever get bug reports. def op_sound_effect(self, *args): - """TODO: Write docstring here.""" - raise ZCpuNotImplemented + """Play sound effect (no-op - sound not supported in text MUD).""" + pass def op_read_char(self, unused, time, input_routine): """Read a single character from input stream 0 (keyboard). diff --git a/tests/test_zmachine_opcodes.py b/tests/test_zmachine_opcodes.py index bf08d3c..18aa369 100644 --- a/tests/test_zmachine_opcodes.py +++ b/tests/test_zmachine_opcodes.py @@ -8,7 +8,7 @@ required dependencies (memory, stack, decoder, etc). from unittest import TestCase from unittest.mock import Mock -from mudlib.zmachine.zcpu import ZCpu, ZCpuDivideByZero, ZCpuQuit +from mudlib.zmachine.zcpu import ZCpu, ZCpuDivideByZero, ZCpuQuit, ZCpuRestart class MockMemory: @@ -85,6 +85,7 @@ class MockUI: def __init__(self): self.screen = Mock() self.screen.write = Mock() + self.keyboard = Mock() class ZMachineOpcodeTests(TestCase): @@ -516,6 +517,148 @@ class ZMachineObjectOpcodeTests(TestCase): self.assertEqual(self.stack.pop_stack(), 0) +class ZMachineComplexOpcodeTests(TestCase): + """Test suite for complex Z-machine opcodes (input, save/restore, restart).""" + + def setUp(self): + """Create a CPU with all necessary mocks.""" + self.memory = MockMemory() + self.stack = MockStackManager() + self.decoder = MockOpDecoder() + self.ui = MockUI() + + # Create CPU with mocked dependencies + self.cpu = ZCpu( + self.memory, + self.decoder, + self.stack, + Mock(), # objects + Mock(), # string + Mock(), # stream manager + self.ui, + ) + + def test_op_sread_v3_basic_input(self): + """Test sread (V3) reads text into buffer and null-terminates.""" + # Setup: text buffer at 0x1000, max length 20 + text_buffer_addr = 0x1000 + self.memory[text_buffer_addr] = 20 # max length + + # Mock keyboard input + self.ui.keyboard.readline = Mock(return_value="hello world\n") + + # Call sread with text buffer only (no parse buffer) + self.cpu.op_sread(text_buffer_addr, 0) + + # Verify text is stored lowercased starting at offset 1 + expected = "hello world" + for i, ch in enumerate(expected): + self.assertEqual(self.memory[text_buffer_addr + 1 + i], ord(ch)) + + # Verify null termination + self.assertEqual(self.memory[text_buffer_addr + 1 + len(expected)], 0) + + def test_op_sread_v3_truncates_to_max_length(self): + """Test sread respects max length in buffer.""" + text_buffer_addr = 0x1000 + self.memory[text_buffer_addr] = 5 # max length of 5 + + self.ui.keyboard.readline = Mock(return_value="hello world\n") + + self.cpu.op_sread(text_buffer_addr, 0) + + # Should only store first 5 characters + expected = "hello" + for i, ch in enumerate(expected): + self.assertEqual(self.memory[text_buffer_addr + 1 + i], ord(ch)) + self.assertEqual(self.memory[text_buffer_addr + 1 + 5], 0) + + def test_op_sread_v3_lowercases_input(self): + """Test sread converts input to lowercase.""" + text_buffer_addr = 0x1000 + self.memory[text_buffer_addr] = 20 + + self.ui.keyboard.readline = Mock(return_value="HELLO World\n") + + self.cpu.op_sread(text_buffer_addr, 0) + + expected = "hello world" + for i, ch in enumerate(expected): + self.assertEqual(self.memory[text_buffer_addr + 1 + i], ord(ch)) + + def test_op_sread_v3_strips_newlines(self): + """Test sread strips newlines and carriage returns.""" + text_buffer_addr = 0x1000 + self.memory[text_buffer_addr] = 20 + + self.ui.keyboard.readline = Mock(return_value="test\r\n") + + self.cpu.op_sread(text_buffer_addr, 0) + + expected = "test" + for i, ch in enumerate(expected): + self.assertEqual(self.memory[text_buffer_addr + 1 + i], ord(ch)) + self.assertEqual(self.memory[text_buffer_addr + 1 + 4], 0) + + def test_op_sread_v3_calls_show_status(self): + """Test sread calls op_show_status for V3.""" + text_buffer_addr = 0x1000 + self.memory[text_buffer_addr] = 20 + self.ui.keyboard.readline = Mock(return_value="test\n") + + # Track if show_status was called + call_count = [0] + original = self.cpu.op_show_status + + def counted_show_status(*args): + call_count[0] += 1 + return original(*args) + + self.cpu.op_show_status = counted_show_status # type: ignore + + self.cpu.op_sread(text_buffer_addr, 0) + + # Should have called show_status once + self.assertEqual(call_count[0], 1) + + def test_op_save_v3_branches_false(self): + """Test save (V3) branches false (QuetzalWriter not functional).""" + self.decoder.branch_condition = True + self.decoder.branch_offset = 100 + old_pc = self.cpu._opdecoder.program_counter + + self.cpu.op_save() + + # Should not have branched (test is false) + self.assertEqual(self.cpu._opdecoder.program_counter, old_pc) + + def test_op_restore_v3_branches_false(self): + """Test restore (V3) branches false (no valid save files).""" + self.decoder.branch_condition = True + self.decoder.branch_offset = 100 + old_pc = self.cpu._opdecoder.program_counter + + self.cpu.op_restore() + + # Should not have branched (test is false) + self.assertEqual(self.cpu._opdecoder.program_counter, old_pc) + + def test_op_input_stream_is_noop(self): + """Test input_stream is a no-op stub.""" + # Should not raise + self.cpu.op_input_stream(1) + + def test_op_sound_effect_is_noop(self): + """Test sound_effect is a no-op stub.""" + # Should not raise + self.cpu.op_sound_effect(1, 2, 3) + + def test_op_restart_raises_exception(self): + """Test restart raises ZCpuRestart exception for run loop to handle.""" + with self.assertRaises(ZCpuRestart): + self.cpu.op_restart() + + # Note: ZObjectParser methods are tested through integration tests # with real story files, not unit tests with mock memory, as the # interaction with ZStringFactory makes mocking complex.