# # A class which represents the CPU itself, the brain of the virtual # machine. It ties all the systems together and runs the story. # # For the license of this file, please consult the LICENSE file in the # root directory of this distribution. # import random import time from collections import deque from . import zopdecoder, zscreen from .zlogging import log, log_disasm class ZCpuError(Exception): "General exception for Zcpu class" class ZCpuOpcodeOverlap(ZCpuError): "Overlapping opcodes registered" class ZCpuIllegalInstruction(ZCpuError): "Illegal instruction encountered" class ZCpuDivideByZero(ZCpuError): "Divide by zero error" class ZCpuNotImplemented(ZCpuError): "Opcode not yet implemented" class ZCpuQuit(ZCpuError): "Quit opcode executed" class ZCpuRestart(ZCpuError): "Restart opcode executed" class ZCpu: def __init__( self, zmem, zopdecoder, zstack, zobjects, zstring, zstreammanager, zui, zlexer, zmachine=None, ): self._memory = zmem self._opdecoder = zopdecoder self._stackmanager = zstack self._objects = zobjects self._string = zstring self._streammanager = zstreammanager self._ui = zui self._lexer = zlexer self._zmachine = zmachine self._trace = deque(maxlen=20) self._undo_snapshot = None self._dispatch = self._build_dispatch_table() @property def _program_counter(self): """Return the current program counter value.""" return self._opdecoder.program_counter def _get_handler(self, opcode_class, opcode_number): try: opcode_decl = self.opcodes[opcode_class][opcode_number] except IndexError: opcode_decl = None if not opcode_decl: raise ZCpuIllegalInstruction # If the opcode declaration is a sequence, we have extra # thinking to do. if not isinstance(opcode_decl, (list, tuple)): opcode_func = opcode_decl else: # We have several different implementations for the # opcode, and we need to select the right one based on # version. if isinstance(opcode_decl[0], (list, tuple)): for func, version in opcode_decl: if version <= self._memory.version: opcode_func = func break # Only one implementation, check that our machine is # recent enough. elif opcode_decl[1] <= self._memory.version: opcode_func = opcode_decl[0] else: raise ZCpuIllegalInstruction # The following is a hack, based on our policy of only # documenting opcodes we implement. If we ever hit an # undocumented opcode, we crash with a not implemented # error. if not opcode_func.__doc__: return False, opcode_func else: return True, opcode_func def _make_signed(self, a): """Turn the given 16-bit value into a signed integer.""" assert a < (1 << 16) if (a >> 15) & 1: a = a - (1 << 16) return a def _unmake_signed(self, a): """Turn the given signed integer into a 16-bit value ready for storage.""" if a < 0: a = (1 << 16) + a return a def _read_variable(self, addr): """Return the value of the given variable, which can come from the stack, or from a local/global variable. If it comes from the stack, the value is popped from the stack.""" if addr == 0x0: return self._stackmanager.pop_stack() elif 0x0 < addr < 0x10: return self._stackmanager.get_local_variable(addr - 1) else: return self._memory.read_global(addr) def _write_result(self, result_value, store_addr=None): """Write the given result value to the stack or to a local/global variable. Write result_value to the store_addr variable, or if None, extract the destination variable from the opcode.""" if store_addr is None: result_addr = self._opdecoder.get_store_address() else: result_addr = store_addr if result_addr is not None: if result_addr == 0x0: log(f"Push {result_value} to stack") self._stackmanager.push_stack(result_value) elif 0x0 < result_addr < 0x10: log(f"Local variable {result_addr - 1} = {result_value}") self._stackmanager.set_local_variable(result_addr - 1, result_value) else: log(f"Global variable {result_addr} = {result_value}") self._memory.write_global(result_addr, result_value) def _call(self, routine_address, args, store_return_value): """Set up a function call to the given routine address, passing the given arguments. If store_return_value is True, the routine's return value will be stored.""" # Calling address 0 is a no-op that returns false (0). if routine_address == 0: if store_return_value: self._write_result(0) return addr = self._memory.packed_address(routine_address) if store_return_value: return_value = self._opdecoder.get_store_address() else: return_value = None current_addr = self._opdecoder.program_counter new_addr = self._stackmanager.start_routine( addr, return_value, current_addr, args ) self._opdecoder.program_counter = new_addr def _branch(self, test_result): """Retrieve the branch information, and set the instruction pointer according to the type of branch and the test_result.""" branch_cond, branch_offset = self._opdecoder.get_branch_offset() if test_result == branch_cond: if branch_offset == 0 or branch_offset == 1: log(f"Return from routine with {branch_offset}") addr = self._stackmanager.finish_routine(branch_offset) self._opdecoder.program_counter = addr else: log(f"Jump to offset {branch_offset:+d}") self._opdecoder.program_counter += branch_offset - 2 def _dump_trace(self): """Print the last N instructions for debugging.""" print("\n=== INSTRUCTION TRACE (last 20) ===") for entry in self._trace: print(entry) print("===================================\n") def _build_dispatch_table(self): """Pre-resolve all opcode handlers for current version.""" table = {} for opcode_class, opcode_class_list in self.opcodes.items(): class_table = [] for opcode_decl in opcode_class_list: if not opcode_decl: class_table.append(None) continue if not isinstance(opcode_decl, (list, tuple)): func = opcode_decl else: func = None if isinstance(opcode_decl[0], (list, tuple)): for f, version in opcode_decl: # type: ignore if version <= self._memory.version: func = f break elif opcode_decl[1] <= self._memory.version: func = opcode_decl[0] if func is None: class_table.append(None) continue implemented = bool(func.__doc__) class_table.append((implemented, func)) table[opcode_class] = class_table return table def step_fast(self): """Execute a single instruction with lightweight tracing. Returns True if execution should continue. """ current_pc = self._opdecoder.program_counter (opcode_class, opcode_number, operands) = self._opdecoder.get_next_instruction() entry = self._dispatch[opcode_class][opcode_number] if entry is None: self._trace.append(f" {current_pc:06x} ILLEGAL") self._dump_trace() raise ZCpuIllegalInstruction( f"illegal opcode class={opcode_class} num={opcode_number}" f" at PC={current_pc:#x}" ) implemented, func = entry if not implemented: return False self._trace.append( f" {current_pc:06x} {func.__name__}" f"({', '.join(str(x) for x in operands)})" ) try: func(self, *operands) except (ZCpuQuit, ZCpuRestart): raise except Exception: self._dump_trace() raise return True def step(self): """Execute a single instruction. Returns True if execution should continue.""" current_pc = self._opdecoder.program_counter log(f"Reading next opcode at address {current_pc:x}") (opcode_class, opcode_number, operands) = self._opdecoder.get_next_instruction() # Record raw byte at this PC for trace cls_str = zopdecoder.OPCODE_STRINGS.get(opcode_class, f"?{opcode_class}") trace_entry = f" {current_pc:06x} {cls_str}:{opcode_number:02x}" try: implemented, func = self._get_handler(opcode_class, opcode_number) except ZCpuIllegalInstruction: trace_entry += f" ILLEGAL (raw byte: {self._memory[current_pc]:02x})" self._trace.append(trace_entry) self._dump_trace() raise trace_entry += f" {func.__name__}({', '.join(str(x) for x in operands)})" self._trace.append(trace_entry) log_disasm( current_pc, zopdecoder.OPCODE_STRINGS[opcode_class], opcode_number, func.__name__, ", ".join([str(x) for x in operands]), ) if not implemented: log(f"Unimplemented opcode {func.__name__}, halting execution") return False # The returned function is unbound, so we must pass # self to it ourselves. try: func(self, *operands) except (ZCpuQuit, ZCpuRestart): # Normal control flow - don't dump trace raise except Exception: self._dump_trace() raise return True def run(self): """The Magic Function that takes little bits and bytes, twirls them around, and brings the magic to your screen!""" log("Execution started") while self.step_fast(): pass ## ## Opcode implementation functions start here. ## ## 2OP opcodes (opcodes 1-127 and 192-223) def op_je(self, a, *others): """Branch if the first argument is equal to any subsequent arguments. Note that the second operand may be absent, in which case there is no jump.""" for b in others: if a == b: self._branch(True) return # Fallthrough: No args were equal to a. self._branch(False) def op_jl(self, a, b): """Branch if the first argument is less than the second.""" a = self._make_signed(a) b = self._make_signed(b) if a < b: self._branch(True) else: self._branch(False) def op_jg(self, a, b): """Branch if the first argument is greater than the second.""" a = self._make_signed(a) b = self._make_signed(b) if a > b: self._branch(True) else: self._branch(False) def op_dec_chk(self, variable, test_value): """Decrement the variable, and branch if the value becomes less than test_value.""" val = self._read_variable(variable) val = (val - 1) % 65536 self._write_result(val, store_addr=variable) self._branch(self._make_signed(val) < self._make_signed(test_value)) def op_inc_chk(self, variable, test_value): """Increment the variable, and branch if the value becomes greater than the test value.""" val = self._read_variable(variable) val = (val + 1) % 65536 self._write_result(val, store_addr=variable) self._branch(self._make_signed(val) > self._make_signed(test_value)) def op_jin(self, obj1, obj2): """Branch if obj1's parent equals obj2.""" parent = self._objects.get_parent(obj1) self._branch(parent == obj2) def op_test(self, bitmap, flags): """Test if all bits in flags are set in bitmap, branch if true.""" self._branch((bitmap & flags) == flags) def op_or(self, a, b): """Bitwise OR between the two arguments.""" self._write_result(a | b) def op_and(self, a, b): """Bitwise AND between the two arguments.""" self._write_result(a & b) def op_test_attr(self, obj, attr): """Test if object has attribute, branch if true.""" has_attr = self._objects.get_attribute(obj, attr) self._branch(has_attr != 0) def op_set_attr(self, obj, attr): """Set attribute on object.""" self._objects.set_attribute(obj, attr) def op_clear_attr(self, obj, attr): """Clear attribute on object.""" self._objects.clear_attribute(obj, attr) def op_store(self, variable, value): """Store the given value to the given variable.""" self._write_result(value, store_addr=variable) def op_insert_obj(self, object, dest): """Move object OBJECT to become the first child of object DEST. After the move, the prior first child of DEST is now the OBJECT's sibling.""" self._objects.insert_object(dest, object) def op_loadw(self, base, offset): """Store in the given result register the word value at (base+2*offset).""" val = self._memory.read_word(base + 2 * offset) self._write_result(val) def op_loadb(self, base, offset): """Store in the given result register the byte value at (base+offset).""" val = self._memory[base + offset] self._write_result(val) def op_get_prop(self, objectnum, propnum): """Store in the given result an object's property value (either a byte or word).""" val = self._objects.get_prop(objectnum, propnum) self._write_result(val) def op_get_prop_addr(self, obj, prop): """Get property data address, store 0 if not found.""" addr = self._objects.get_property_data_address(obj, prop) self._write_result(addr) def op_get_next_prop(self, obj, prop): """Get next property number, store result.""" next_prop = self._objects.get_next_property(obj, prop) self._write_result(next_prop) def op_add(self, a, b): """Signed 16-bit addition.""" result = self._unmake_signed(self._make_signed(a) + self._make_signed(b)) self._write_result(result) def op_sub(self, a, b): """Signed 16-bit subtraction""" result = self._unmake_signed(self._make_signed(a) - self._make_signed(b)) self._write_result(result) def op_mul(self, a, b): """Signed 16-bit multiplication.""" result = self._unmake_signed(self._make_signed(a) * self._make_signed(b)) self._write_result(result) def op_div(self, a, b): """Signed 16-bit division.""" a = self._make_signed(a) b = self._make_signed(b) if b == 0: raise ZCpuDivideByZero self._write_result(self._unmake_signed(a // b)) def op_mod(self, a, b): """Signed 16-bit modulo (remainder after division).""" a = self._make_signed(a) b = self._make_signed(b) if b == 0: raise ZCpuDivideByZero # Z-machine uses truncation toward zero, not Python's floor division quotient = int(a / b) if quotient < 0 and quotient < a / b: quotient += 1 if quotient > 0 and quotient > a / b: quotient -= 1 remainder = a - (quotient * b) self._write_result(self._unmake_signed(remainder)) def op_call_2s(self, routine_addr, arg1): """Call routine(arg1) and store the result.""" self._call(routine_addr, [arg1], True) def op_call_2n(self, routine_addr, arg1): """Call routine(arg1) and throw away the result.""" self._call(routine_addr, [arg1], False) def op_set_colour(self, *args): """Set foreground and background colors (no-op for text MUD).""" pass def op_throw(self, *args): """TODO: Write docstring here.""" raise ZCpuNotImplemented ## 1OP opcodes (opcodes 128-175) def op_jz(self, val): """Branch if the val is zero.""" self._branch(val == 0) def op_get_sibling(self, obj): """Get sibling of object, store it, branch if nonzero.""" sibling = self._objects.get_sibling(obj) self._write_result(sibling) self._branch(sibling != 0) def op_get_child(self, object_num): """Get first child of object, store it, branch if nonzero.""" child = self._objects.get_child(object_num) self._write_result(child) self._branch(child != 0) def op_get_parent(self, object_num): """Get and store the parent of the given object.""" self._write_result(self._objects.get_parent(object_num)) def op_get_prop_len(self, data_addr): """Get property data length from data address, store result.""" length = self._objects.get_property_length(data_addr) self._write_result(length) def op_inc(self, variable): """Increment the given value.""" val = self._read_variable(variable) val = (val + 1) % 65536 self._write_result(val, store_addr=variable) def op_dec(self, variable): """Decrement the given variable.""" val = self._read_variable(variable) val = self._make_signed(val) val = val - 1 val = self._unmake_signed(val) self._write_result(val, store_addr=variable) def op_print_addr(self, string_byte_address): """Print the z-encoded string at the given byte address.""" text = self._string.get(string_byte_address) self._ui.screen.write(text) def op_call_1s(self, routine_address): """Call the given routine and store the return value.""" self._call(routine_address, [], True) def op_remove_obj(self, obj): """Remove object from its parent.""" self._objects.remove_object(obj) def op_print_obj(self, obj): """Print object's short name.""" shortname = self._objects.get_shortname(obj) self._ui.screen.write(shortname) def op_ret(self, value): """Return from the current routine with the given value.""" pc = self._stackmanager.finish_routine(value) self._opdecoder.program_counter = pc def op_jump(self, offset): """Jump unconditionally to the given branch offset. This opcode does not follow the usual branch decision algorithm, and so we do not call the _branch method to dispatch the call.""" old_pc = self._opdecoder.program_counter # The offset to the jump instruction is known to be a 2-byte # signed integer. We need to make it signed before applying # the offset. if offset >= (1 << 15): offset = -(1 << 16) + offset log(f"Jump unconditionally to relative offset {offset}") # Apparently reading the 2 bytes of operand *isn't* supposed # to increment the PC, thus we need to apply this offset to PC # that's still pointing at the 'jump' opcode. Hence the -2 # modifier below. new_pc = self._opdecoder.program_counter + offset - 2 self._opdecoder.program_counter = new_pc log(f"PC has changed from from {old_pc:x} to {new_pc:x}") def op_print_paddr(self, string_paddr): """Print the string at the given packed address.""" zstr_address = self._memory.packed_address(string_paddr) text = self._string.get(zstr_address) self._ui.screen.write(text) def op_load(self, variable): """Load the value of the given variable and store it.""" value = self._read_variable(variable) self._write_result(value) def op_not(self, value): """Bitwise NOT of the given value.""" result = ~value & 0xFFFF self._write_result(result) def op_call_1n(self, routine_addr): """Call the given routine, and discard the return value.""" self._call(routine_addr, [], False) ## 0OP opcodes (opcodes 176-191) def op_rtrue(self, *args): """Make the current routine return true (1).""" pc = self._stackmanager.finish_routine(1) self._opdecoder.program_counter = pc def op_rfalse(self, *args): """Make the current routine return false (0).""" pc = self._stackmanager.finish_routine(0) self._opdecoder.program_counter = pc def op_print(self): """Print the embedded ZString.""" zstr_address = self._opdecoder.get_zstring() text = self._string.get(zstr_address) self._ui.screen.write(text) def op_print_ret(self): """TODO: Write docstring here.""" self.op_print() self.op_rtrue() def op_nop(self, *args): """Do nothing.""" pass def op_save(self, *args): """Save game state to file (V3 - branch on success). Uses QuetzalWriter to generate save data in IFF/FORM/IFZS format, then calls the filesystem to write it. Branches true on success, false on failure. """ if self._zmachine is None: # Can't save without zmachine reference self._branch(False) return from .quetzal import QuetzalWriter try: writer = QuetzalWriter(self._zmachine) save_data = writer.generate_save_data() success = self._ui.filesystem.save_game(save_data) self._branch(success) except Exception as e: # Any error during save process = failure log(f"Save failed with exception: {e}") self._branch(False) def op_save_v4(self, *args): """TODO: Write docstring here.""" raise ZCpuNotImplemented def op_restore(self, *args): """Restore game state from file (V3 - branch on success). Uses QuetzalParser to load save data from filesystem, validates it matches current story, and restores memory/stack/PC. Branches true on success, false on failure. """ if self._zmachine is None: # Can't restore without zmachine reference self._branch(False) return from .quetzal import QuetzalParser try: # Get save data from filesystem save_data = self._ui.filesystem.restore_game() if save_data is None: # User cancelled or no save file available self._branch(False) return # Parse the save data parser = QuetzalParser(self._zmachine) parser.load_from_bytes(save_data) # QuetzalParser already: # - Validated IFhd matches current story (release/serial/checksum) # - Replaced dynamic memory via _parse_cmem or _parse_umem # - Replaced stack manager via _parse_stks # - Set program counter via _parse_ifhd # Success! self._branch(True) except Exception as e: # Any error during restore process = failure log(f"Restore failed with exception: {e}") self._branch(False) def op_restore_v4(self, *args): """TODO: Write docstring here.""" raise ZCpuNotImplemented def op_restart(self, *args): """Restart the game from the beginning. Raises ZCpuRestart exception for the run loop to handle. The ZMachine should catch this and reset to initial state. """ raise ZCpuRestart def op_ret_popped(self, *args): """Pop a value from the stack and return it from the current routine.""" value = self._stackmanager.pop_stack() pc = self._stackmanager.finish_routine(value) self._opdecoder.program_counter = pc def op_pop(self, *args): """Pop and discard the top value from the stack.""" self._stackmanager.pop_stack() def op_catch(self, *args): """Store the current stack frame index (for throw opcode).""" frame_index = self._stackmanager.get_stack_frame_index() self._write_result(frame_index) def op_quit(self, *args): """Quit the game.""" raise ZCpuQuit def op_new_line(self, *args): """Print a newline.""" self._ui.screen.write("\n") def op_show_status(self, *args): """Update status line (V3 only). No-op in this implementation.""" pass def op_verify(self, *args): """Verify story file checksum. Branch if checksum matches.""" expected_checksum = self._memory.read_word(0x1C) actual_checksum = self._memory.generate_checksum() self._branch(expected_checksum == actual_checksum) def op_piracy(self, *args): """Anti-piracy check. Always branches true (all interpreters pass this).""" self._branch(True) ## VAR opcodes (opcodes 224-255) # call in v1-3 def op_call(self, routine_addr, *args): """Call the routine r1, passing it any of r2, r3, r4 if defined.""" self._call(routine_addr, args, True) def op_call_vs(self, routine_addr, *args): """See op_call.""" self.op_call(routine_addr, *args) def op_storew(self, array, offset, value): """Store the given 16-bit value at array+2*byte_index.""" store_address = array + 2 * offset self._memory.write_word(store_address, value) def op_storeb(self, array, byte_index, value): """Store the given byte value at array+byte_index.""" self._memory[array + byte_index] = value & 0xFF def op_put_prop(self, object_number, property_number, value): """Set an object's property to the given value.""" self._objects.set_property(object_number, property_number, value) def op_sread(self, *args): """Read text input from keyboard (V3). Args: args[0]: text buffer address (byte 0 = max length, text starts at byte 1) args[1]: parse buffer address (optional, 0 = no parsing) """ text_buffer_addr = args[0] parse_buffer_addr = args[1] if len(args) > 1 else 0 # V3: show status line first if self._memory.version <= 3: self.op_show_status() # Read input from keyboard text = self._ui.keyboard_input.read_line() text = text.lower().strip("\n\r") # Store in text buffer max_len = self._memory[text_buffer_addr] text = text[:max_len] for i, ch in enumerate(text): self._memory[text_buffer_addr + 1 + i] = ord(ch) self._memory[text_buffer_addr + 1 + len(text)] = 0 # Tokenize if parse buffer provided if parse_buffer_addr != 0: max_words = self._memory[parse_buffer_addr] tokens = self._lexer.parse_input(text) num_words = min(len(tokens), max_words) self._memory[parse_buffer_addr + 1] = num_words offset = 0 for i in range(num_words): word_str, dict_addr = tokens[i] # Find word position in text pos = text.find(word_str, offset) if pos == -1: pos = offset # fallback: best guess position word_len = len(word_str) base = parse_buffer_addr + 2 + (i * 4) self._memory.write_word(base, dict_addr) self._memory[base + 2] = word_len self._memory[base + 3] = pos + 1 # 1-indexed from start of text buffer offset = pos + word_len def op_sread_v4(self, *args): """TODO: Write docstring here.""" raise ZCpuNotImplemented def op_aread(self, *args): """Read text input from keyboard (V5+). V5 text buffer: byte 0 = max chars, byte 1 = num chars written, text starts at byte 2. Stores the terminating ZSCII character (13 for newline) as result. Does not lowercase (game handles it). Args: args[0]: text buffer address args[1]: parse buffer address (0 = skip tokenization) args[2]: optional timer interval (tenths of a second, 0 = none) args[3]: optional timer routine address """ text_buffer_addr = args[0] parse_buffer_addr = args[1] if len(args) > 1 else 0 # Consume store byte BEFORE blocking in read_line(). This ensures # the PC is past the entire instruction when MUD-level saves capture # state during read_line(). Without this, saves point PC at the store # byte, which gets misinterpreted as an opcode on restore. store_addr = self._opdecoder.get_store_address() # Read input from keyboard (blocks until player types something) text = self._ui.keyboard_input.read_line() text = text.lower().strip("\n\r") # Store in text buffer (V5 format: text at byte 2, count at byte 1) max_len = self._memory[text_buffer_addr] text = text[:max_len] self._memory[text_buffer_addr + 1] = len(text) for i, ch in enumerate(text): self._memory[text_buffer_addr + 2 + i] = ord(ch) # Tokenize if parse buffer provided if parse_buffer_addr != 0: max_words = self._memory[parse_buffer_addr] tokens = self._lexer.parse_input(text) num_words = min(len(tokens), max_words) self._memory[parse_buffer_addr + 1] = num_words offset = 0 for i in range(num_words): word_str, dict_addr = tokens[i] pos = text.find(word_str, offset) if pos == -1: pos = offset word_len = len(word_str) base = parse_buffer_addr + 2 + (i * 4) self._memory.write_word(base, dict_addr) self._memory[base + 2] = word_len self._memory[base + 3] = pos + 2 # offset from start of text buffer offset = pos + word_len # Store terminating character (13 = newline) self._write_result(13, store_addr=store_addr) def op_print_char(self, char): """Output the given ZSCII character.""" self._ui.screen.write(self._string.zscii.get([char])) def op_print_num(self, value): """Print a signed 16-bit number as text.""" signed_value = self._make_signed(value) self._ui.screen.write(str(signed_value)) def op_random(self, n): """Generate a random number, or seed the PRNG. If the input is positive, generate a uniformly random number in the range [1:input]. If the input is negative, seed the PRNG with that value. If the input is zero, seed the PRNG with the current time. """ result = 0 if n > 0: log(f"Generate random number in [1:{n}]") result = random.randint(1, n) elif n < 0: log(f"Seed PRNG with {n}") random.seed(n) else: log("Seed PRNG with time") random.seed(time.time()) self._write_result(result) def op_push(self, value): """Push a value onto the current routine's game stack.""" self._stackmanager.push_stack(value) def op_pull(self, variable): """Pop a value from the stack and store it in the given variable.""" value = self._stackmanager.pop_stack() self._write_result(value, store_addr=variable) def op_split_window(self, height): """Split or unsplit the window horizontally.""" self._ui.screen.split_window(height) def op_set_window(self, window_num): """Set the given window as the active window.""" self._ui.screen.select_window(window_num) def op_call_vs2(self, routine_addr, *args): """Call routine with up to 7 arguments and store the result.""" self._call(routine_addr, args, True) def op_erase_window(self, window_number): """Clear the window with the given number. If # is -1, unsplit all and clear (full reset). If # is -2, clear all but don't unsplit.""" if window_number == -1: self.op_split_window(0) self._ui.screen.erase_window(zscreen.WINDOW_LOWER) elif window_number == -2: self._ui.screen.erase_window(zscreen.WINDOW_LOWER) self._ui.screen.erase_window(zscreen.WINDOW_UPPER) else: self._ui.screen.erase_window(window_number) def op_erase_line(self, *args): """Erase current line on screen (no-op for text MUD).""" pass def op_set_cursor(self, x, y): """Set the cursor position within the active window.""" self._ui.screen.set_cursor_position(x, y) def op_get_cursor(self, table_addr): """Get cursor position into table. For MUD, always write row=1, col=1.""" self._memory.write_word(table_addr, 1) # row self._memory.write_word(table_addr + 2, 1) # col def op_set_text_style(self, text_style): """Set the text style.""" self._ui.screen.set_text_style(text_style) def op_buffer_mode(self, flag): """If set to 1, text output on the lower window in stream 1 is buffered up so that it can be word-wrapped properly. If set to 0, it isn't.""" self._ui.screen.buffer_mode = bool(flag) def op_output_stream(self, stream_num): """Enable or disable the given stream. This is the v3/4 implementation of the opcode, which just delegates to the backwards compatible v5 implementation. """ self.op_output_stream_v5(stream_num) def op_output_stream_v5(self, stream_num, table=None): """Enable or disable the given output stream.""" stream_num = self._make_signed(stream_num) if stream_num < 0: self._streammanager.output.unselect(-stream_num) else: self._streammanager.output.select(stream_num) def op_input_stream(self, stream_num): """Select input stream (stub - not yet implemented for MUD integration).""" pass # This one may have been used prematurely in v3 stories. Keep an # eye out for it if we ever get bug reports. def op_sound_effect(self, *args): """Play sound effect (no-op - sound not supported in text MUD).""" pass def op_read_char(self, unused, time=0, input_routine=0): """Read a single character from input stream 0 (keyboard). Optionally, call a routine periodically to decide whether or not to interrupt user input. """ # According to the spec, the first argument is always one, and # exists only for Historical Reasons(tm) assert unused == 1 # TODO: shiny timer stuff not implemented yet. if time != 0 or input_routine != 0: raise ZCpuNotImplemented # Consume store byte BEFORE blocking in read_char() — same reason # as op_aread: PC must be past the full instruction for MUD saves. store_addr = self._opdecoder.get_store_address() char = self._ui.keyboard_input.read_char() self._write_result(char, store_addr=store_addr) def op_scan_table(self, x, table, length, *args): """Search a table for a value, branch if found, store address (V4+). Searches length entries starting at table. Each entry is form bytes wide (default 2). Compares against byte 0-1 (word) or byte 0 (if form bit 7 is set, compare bytes not words). form & 0x7f = entry size in bytes. """ form = args[0] if len(args) > 0 else 0x82 # default: word entries, 2 bytes wide entry_size = form & 0x7F compare_word = not (form & 0x80) for i in range(length): addr = table + (i * entry_size) val = self._memory.read_word(addr) if compare_word else self._memory[addr] if val == x: self._write_result(addr) self._branch(True) return self._write_result(0) self._branch(False) def op_not_v5(self, value): """Bitwise NOT (VAR form). Same as op_not.""" result = ~value & 0xFFFF self._write_result(result) def op_call_vn(self, routine_addr, *args): """Call routine with up to 3 arguments and discard the result.""" self._call(routine_addr, args, False) def op_call_vn2(self, routine_addr, *args): """Call routine with up to 7 arguments and discard the result.""" self._call(routine_addr, args, False) def op_tokenize(self, text_buffer, parse_buffer, *args): """Tokenize text in text_buffer into parse_buffer (V5+). Uses V5 text buffer format (count at byte 1, text at byte 2+). Optional args[0] = dictionary address, args[1] = flag. """ _dictionary = args[0] if len(args) > 0 else 0 # custom dict, not yet used flag = args[1] if len(args) > 1 else 0 # Read text from V5 text buffer num_chars = self._memory[text_buffer + 1] text = "" for i in range(num_chars): text += chr(self._memory[text_buffer + 2 + i]) # Tokenize max_words = self._memory[parse_buffer] tokens = self._lexer.parse_input(text) num_words = min(len(tokens), max_words) if not flag: self._memory[parse_buffer + 1] = num_words offset = 0 for i in range(num_words): word_str, dict_addr = tokens[i] pos = text.find(word_str, offset) if pos == -1: pos = offset word_len = len(word_str) base = parse_buffer + 2 + (i * 4) # When flag is set, only fill in entries that have dict matches if not flag or dict_addr != 0: self._memory.write_word(base, dict_addr) self._memory[base + 2] = word_len self._memory[base + 3] = pos + 2 offset = pos + word_len def op_encode_text(self, *args): """Encode ZSCII text to Z-encoded string (V5+). This opcode converts ZSCII text into Z-machine's packed text format (3 characters per 2 bytes). Complex operation, rarely used. Not implemented - will raise ZCpuNotImplemented if any game calls it. """ raise ZCpuNotImplemented def op_copy_table(self, first, second, size): """Copy a block of memory, or zero-fill (V5+). If second is 0, zero-fill first for size bytes. If size is positive, copy forward (safe for non-overlapping). If size is negative, copy backward (safe for overlapping). """ if second == 0: for i in range(abs(self._make_signed(size))): self._memory[first + i] = 0 else: signed_size = self._make_signed(size) count = abs(signed_size) if signed_size >= 0: # Forward copy (may corrupt overlapping regions) for i in range(count): self._memory[second + i] = self._memory[first + i] else: # Backward copy (safe for overlapping) for i in range(count - 1, -1, -1): self._memory[second + i] = self._memory[first + i] def op_print_table(self, zscii_text, width, height=1, skip=0): """Formatted table printing (no-op for text MUD). Spec: print width chars per line for height lines from zscii_text. Skip bytes between rows. For now, no-op to avoid crashes. """ pass def op_check_arg_count(self, arg_number): """Branch if the Nth argument was passed to the current routine.""" current_frame = self._stackmanager._call_stack[-1] self._branch(arg_number <= current_frame.arg_count) ## EXT opcodes (opcodes 256-284) def op_save_v5(self, *args): """Save game state to file (V5+ - stores result). Generates Quetzal save data and writes via filesystem. Stores 1 on success, 0 on failure. On restore, the game will see 2 stored in the same variable. """ if self._zmachine is None: self._write_result(0) return from .quetzal import QuetzalWriter try: writer = QuetzalWriter(self._zmachine) save_data = writer.generate_save_data() success = self._ui.filesystem.save_game(save_data) self._write_result(1 if success else 0) except Exception as e: log(f"Save failed with exception: {e}") self._write_result(0) def op_restore_v5(self, *args): """Restore game state from file (V5+ - stores result). Loads Quetzal save data and restores memory/stack/PC. The restored PC points at the store byte of the original save instruction. We read it and write 2 (meaning "restored") to the indicated variable. Stores 0 on failure (in the current, un-restored state). """ if self._zmachine is None: self._write_result(0) return from .quetzal import QuetzalParser try: save_data = self._ui.filesystem.restore_game() if save_data is None: self._write_result(0) return parser = QuetzalParser(self._zmachine) parser.load_from_bytes(save_data) # Restored PC points at the store byte of the save instruction. # Read it and write 2 ("restored") to that variable. self._write_result(2) except Exception as e: log(f"Restore failed with exception: {e}") self._write_result(0) def op_log_shift(self, number, places): """Logical shift: positive places = left, negative = right (V5+). Right shift fills with zeros (unsigned/logical shift). """ places = self._make_signed(places) result = ( (number << places) & 0xFFFF if places >= 0 else (number >> (-places)) & 0xFFFF ) self._write_result(result) def op_art_shift(self, number, places): """Arithmetic shift: positive places = left, negative = right (V5+). Right shift preserves the sign bit (signed/arithmetic shift). """ signed_number = self._make_signed(number) places = self._make_signed(places) if places >= 0: result = (signed_number << places) & 0xFFFF else: result = self._unmake_signed(signed_number >> (-places)) self._write_result(result) def op_set_font(self, font_id): """Set the current font. Returns the previous font, or 0 if the requested font is unavailable (V5+). Font 1 is the normal font. We only support font 1. """ if font_id == 1: self._write_result(1) # was font 1, now font 1 else: self._write_result(0) # unsupported font def op_save_undo(self, *args): """Save undo state (V5+, EXT:9). Captures a snapshot of dynamic memory, call stack, and PC. Stores 1 on success. After restore_undo, execution resumes here with result 2 (like fork() returning different values). """ from .zstackmanager import ZRoutine, ZStackBottom # Read store address first — advances PC past the store byte store_addr = self._opdecoder.get_store_address() # Capture dynamic memory mem = self._memory dynamic_copy = bytearray(mem._memory[mem._dynamic_start : mem._dynamic_end + 1]) # Deep copy call stack stack_copy = [] for frame in self._stackmanager._call_stack: if isinstance(frame, ZStackBottom): bottom = ZStackBottom() bottom.program_counter = frame.program_counter bottom.stack = frame.stack[:] bottom.local_vars = frame.local_vars[:] stack_copy.append(bottom) else: new_frame = ZRoutine( frame.start_addr, frame.return_addr, self._memory, [], local_vars=frame.local_vars[:], stack=frame.stack[:], ) new_frame.program_counter = frame.program_counter new_frame.arg_count = frame.arg_count stack_copy.append(new_frame) self._undo_snapshot = ( self._opdecoder.program_counter, store_addr, dynamic_copy, stack_copy, ) # Store 1 = success self._write_result(1, store_addr=store_addr) def op_restore_undo(self, *args): """Restore undo state (V5+, EXT:10). Restores dynamic memory, call stack, and PC from snapshot. Stores 0 on failure. On success, execution resumes at the save_undo call site with result 2. """ if self._undo_snapshot is None: self._write_result(0) return pc, store_addr, dynamic_copy, stack_copy = self._undo_snapshot self._undo_snapshot = None # Restore dynamic memory mem = self._memory mem._memory[mem._dynamic_start : mem._dynamic_end + 1] = dynamic_copy # Restore call stack: keep the live ZStackBottom identity, # but restore its state from the snapshot live_bottom = self._stackmanager._stackbottom saved_bottom = stack_copy[0] live_bottom.program_counter = saved_bottom.program_counter live_bottom.stack = saved_bottom.stack[:] live_bottom.local_vars = saved_bottom.local_vars[:] self._stackmanager._call_stack[:] = [live_bottom] + stack_copy[1:] # Restore PC self._opdecoder.program_counter = pc # Store 2 at save_undo's store location (not restore_undo's) self._write_result(2, store_addr=store_addr) def op_print_unicode(self, char_code): """Print a Unicode character (V5+, EXT:11).""" self._ui.screen.write(chr(char_code)) def op_check_unicode(self, char_code): """Check if Unicode character can be printed/read (V5+, EXT:12). Bit 0 = can print, bit 1 = can read. We support printing only. """ self._write_result(1) # can print, can't read # Declaration of the opcode tables. In a Z-Machine, opcodes are # divided into tables based on the operand type. Within each # table, the operand is then indexed by its number. We preserve # that organization in this opcode table. # # The opcode table is a dictionary mapping an operand type to a # list of opcodes definitions. Each opcode definition's index in # the table is the opcode number within that opcode table. # # The opcodes are in one of three forms: # # - If the opcode is available and unchanging in all versions, # then the definition is simply the function implementing the # opcode. # # - If the opcode is only available as of a certain version # upwards, then the definition is the tuple (opcode_func, # first_version), where first_version is the version of the # Z-machine where the opcode appeared. # # - If the opcode changes meaning with successive revisions of the # Z-machine, then the definition is a list of the above tuples, # sorted in descending order (tuple with the highest # first_version comes first). If an instruction became illegal # after a given version, it should have a tuple with the opcode # function set to None. opcodes = { # 2OP opcodes zopdecoder.OPCODE_2OP: [ None, op_je, op_jl, op_jg, op_dec_chk, op_inc_chk, op_jin, op_test, op_or, op_and, op_test_attr, op_set_attr, op_clear_attr, op_store, op_insert_obj, op_loadw, op_loadb, op_get_prop, op_get_prop_addr, op_get_next_prop, op_add, op_sub, op_mul, op_div, op_mod, (op_call_2s, 4), (op_call_2n, 5), (op_set_colour, 5), (op_throw, 5), ], # 1OP opcodes zopdecoder.OPCODE_1OP: [ op_jz, op_get_sibling, op_get_child, op_get_parent, op_get_prop_len, op_inc, op_dec, op_print_addr, (op_call_1s, 4), op_remove_obj, op_print_obj, op_ret, op_jump, op_print_paddr, op_load, [(op_call_1n, 5), (op_not, 1)], ], # 0OP opcodes zopdecoder.OPCODE_0OP: [ op_rtrue, op_rfalse, op_print, op_print_ret, op_nop, [(None, 5), (op_save_v4, 4), (op_save, 1)], [(None, 5), (op_restore_v4, 4), (op_restore, 1)], op_restart, op_ret_popped, [(op_catch, 5), (op_pop, 1)], op_quit, op_new_line, [(None, 4), (op_show_status, 3)], (op_verify, 3), None, # Padding. Opcode 0OP:E is the extended opcode marker. (op_piracy, 5), ], # VAR opcodes zopdecoder.OPCODE_VAR: [ [(op_call_vs, 4), (op_call, 1)], op_storew, op_storeb, op_put_prop, [(op_aread, 5), (op_sread_v4, 4), (op_sread, 1)], op_print_char, op_print_num, op_random, op_push, op_pull, (op_split_window, 3), (op_set_window, 3), (op_call_vs2, 4), (op_erase_window, 4), (op_erase_line, 4), (op_set_cursor, 4), (op_get_cursor, 4), (op_set_text_style, 4), (op_buffer_mode, 4), [(op_output_stream_v5, 5), (op_output_stream, 3)], (op_input_stream, 3), (op_sound_effect, 5), (op_read_char, 4), (op_scan_table, 4), (op_not, 5), (op_call_vn, 5), (op_call_vn2, 5), (op_tokenize, 5), (op_encode_text, 5), (op_copy_table, 5), (op_print_table, 5), (op_check_arg_count, 5), ], # EXT opcodes zopdecoder.OPCODE_EXT: [ (op_save_v5, 5), (op_restore_v5, 5), (op_log_shift, 5), (op_art_shift, 5), (op_set_font, 5), None, None, None, None, (op_save_undo, 5), (op_restore_undo, 5), (op_print_unicode, 5), (op_check_unicode, 5), ], }