1040 lines
35 KiB
Python
1040 lines
35 KiB
Python
#
|
|
# A class which represents the CPU itself, the brain of the virtual
|
|
# machine. It ties all the systems together and runs the story.
|
|
#
|
|
# For the license of this file, please consult the LICENSE file in the
|
|
# root directory of this distribution.
|
|
#
|
|
|
|
import random
|
|
import time
|
|
from collections import deque
|
|
|
|
from . import bitfield, zopdecoder, zscreen
|
|
from .zlogging import log, log_disasm
|
|
|
|
|
|
class ZCpuError(Exception):
|
|
"General exception for Zcpu class"
|
|
|
|
|
|
class ZCpuOpcodeOverlap(ZCpuError):
|
|
"Overlapping opcodes registered"
|
|
|
|
|
|
class ZCpuIllegalInstruction(ZCpuError):
|
|
"Illegal instruction encountered"
|
|
|
|
|
|
class ZCpuDivideByZero(ZCpuError):
|
|
"Divide by zero error"
|
|
|
|
|
|
class ZCpuNotImplemented(ZCpuError):
|
|
"Opcode not yet implemented"
|
|
|
|
|
|
class ZCpuQuit(ZCpuError):
|
|
"Quit opcode executed"
|
|
|
|
|
|
class ZCpuRestart(ZCpuError):
|
|
"Restart opcode executed"
|
|
|
|
|
|
class ZCpu:
|
|
def __init__(
|
|
self, zmem, zopdecoder, zstack, zobjects, zstring, zstreammanager, zui, zlexer
|
|
):
|
|
self._memory = zmem
|
|
self._opdecoder = zopdecoder
|
|
self._stackmanager = zstack
|
|
self._objects = zobjects
|
|
self._string = zstring
|
|
self._streammanager = zstreammanager
|
|
self._ui = zui
|
|
self._lexer = zlexer
|
|
self._trace = deque(maxlen=20)
|
|
|
|
def _get_handler(self, opcode_class, opcode_number):
|
|
try:
|
|
opcode_decl = self.opcodes[opcode_class][opcode_number]
|
|
except IndexError:
|
|
opcode_decl = None
|
|
if not opcode_decl:
|
|
raise ZCpuIllegalInstruction
|
|
|
|
# If the opcode declaration is a sequence, we have extra
|
|
# thinking to do.
|
|
if not isinstance(opcode_decl, (list, tuple)):
|
|
opcode_func = opcode_decl
|
|
else:
|
|
# We have several different implementations for the
|
|
# opcode, and we need to select the right one based on
|
|
# version.
|
|
if isinstance(opcode_decl[0], (list, tuple)):
|
|
for func, version in opcode_decl:
|
|
if version <= self._memory.version:
|
|
opcode_func = func
|
|
break
|
|
# Only one implementation, check that our machine is
|
|
# recent enough.
|
|
elif opcode_decl[1] <= self._memory.version:
|
|
opcode_func = opcode_decl[0]
|
|
else:
|
|
raise ZCpuIllegalInstruction
|
|
|
|
# The following is a hack, based on our policy of only
|
|
# documenting opcodes we implement. If we ever hit an
|
|
# undocumented opcode, we crash with a not implemented
|
|
# error.
|
|
if not opcode_func.__doc__:
|
|
return False, opcode_func
|
|
else:
|
|
return True, opcode_func
|
|
|
|
def _make_signed(self, a):
|
|
"""Turn the given 16-bit value into a signed integer."""
|
|
assert a < (1 << 16)
|
|
# This is a little ugly.
|
|
bf = bitfield.BitField(a)
|
|
if bf[15]:
|
|
a = a - (1 << 16)
|
|
return a
|
|
|
|
def _unmake_signed(self, a):
|
|
"""Turn the given signed integer into a 16-bit value ready for
|
|
storage."""
|
|
if a < 0:
|
|
a = (1 << 16) + a
|
|
return a
|
|
|
|
def _read_variable(self, addr):
|
|
"""Return the value of the given variable, which can come from
|
|
the stack, or from a local/global variable. If it comes from
|
|
the stack, the value is popped from the stack."""
|
|
if addr == 0x0:
|
|
return self._stackmanager.pop_stack()
|
|
elif 0x0 < addr < 0x10:
|
|
return self._stackmanager.get_local_variable(addr - 1)
|
|
else:
|
|
return self._memory.read_global(addr)
|
|
|
|
def _write_result(self, result_value, store_addr=None):
|
|
"""Write the given result value to the stack or to a
|
|
local/global variable. Write result_value to the store_addr
|
|
variable, or if None, extract the destination variable from
|
|
the opcode."""
|
|
if store_addr is None:
|
|
result_addr = self._opdecoder.get_store_address()
|
|
else:
|
|
result_addr = store_addr
|
|
|
|
if result_addr is not None:
|
|
if result_addr == 0x0:
|
|
log(f"Push {result_value} to stack")
|
|
self._stackmanager.push_stack(result_value)
|
|
elif 0x0 < result_addr < 0x10:
|
|
log(f"Local variable {result_addr - 1} = {result_value}")
|
|
self._stackmanager.set_local_variable(result_addr - 1, result_value)
|
|
else:
|
|
log(f"Global variable {result_addr} = {result_value}")
|
|
self._memory.write_global(result_addr, result_value)
|
|
|
|
def _call(self, routine_address, args, store_return_value):
|
|
"""Set up a function call to the given routine address,
|
|
passing the given arguments. If store_return_value is True,
|
|
the routine's return value will be stored."""
|
|
# Calling address 0 is a no-op that returns false (0).
|
|
if routine_address == 0:
|
|
if store_return_value:
|
|
self._write_result(0)
|
|
return
|
|
addr = self._memory.packed_address(routine_address)
|
|
if store_return_value:
|
|
return_value = self._opdecoder.get_store_address()
|
|
else:
|
|
return_value = None
|
|
current_addr = self._opdecoder.program_counter
|
|
new_addr = self._stackmanager.start_routine(
|
|
addr, return_value, current_addr, args
|
|
)
|
|
self._opdecoder.program_counter = new_addr
|
|
|
|
def _branch(self, test_result):
|
|
"""Retrieve the branch information, and set the instruction
|
|
pointer according to the type of branch and the test_result."""
|
|
branch_cond, branch_offset = self._opdecoder.get_branch_offset()
|
|
|
|
if test_result == branch_cond:
|
|
if branch_offset == 0 or branch_offset == 1:
|
|
log(f"Return from routine with {branch_offset}")
|
|
addr = self._stackmanager.finish_routine(branch_offset)
|
|
self._opdecoder.program_counter = addr
|
|
else:
|
|
log(f"Jump to offset {branch_offset:+d}")
|
|
self._opdecoder.program_counter += branch_offset - 2
|
|
|
|
def _dump_trace(self):
|
|
"""Print the last N instructions for debugging."""
|
|
print("\n=== INSTRUCTION TRACE (last 20) ===")
|
|
for entry in self._trace:
|
|
print(entry)
|
|
print("===================================\n")
|
|
|
|
def step(self):
|
|
"""Execute a single instruction. Returns True if execution should continue."""
|
|
current_pc = self._opdecoder.program_counter
|
|
log(f"Reading next opcode at address {current_pc:x}")
|
|
(opcode_class, opcode_number, operands) = self._opdecoder.get_next_instruction()
|
|
|
|
# Record raw byte at this PC for trace
|
|
cls_str = zopdecoder.OPCODE_STRINGS.get(opcode_class, f"?{opcode_class}")
|
|
trace_entry = f" {current_pc:06x} {cls_str}:{opcode_number:02x}"
|
|
|
|
try:
|
|
implemented, func = self._get_handler(opcode_class, opcode_number)
|
|
except ZCpuIllegalInstruction:
|
|
trace_entry += f" ILLEGAL (raw byte: {self._memory[current_pc]:02x})"
|
|
self._trace.append(trace_entry)
|
|
self._dump_trace()
|
|
raise
|
|
|
|
trace_entry += f" {func.__name__}({', '.join(str(x) for x in operands)})"
|
|
self._trace.append(trace_entry)
|
|
|
|
log_disasm(
|
|
current_pc,
|
|
zopdecoder.OPCODE_STRINGS[opcode_class],
|
|
opcode_number,
|
|
func.__name__,
|
|
", ".join([str(x) for x in operands]),
|
|
)
|
|
if not implemented:
|
|
log(f"Unimplemented opcode {func.__name__}, halting execution")
|
|
return False
|
|
|
|
# The returned function is unbound, so we must pass
|
|
# self to it ourselves.
|
|
try:
|
|
func(self, *operands)
|
|
except (ZCpuQuit, ZCpuRestart):
|
|
# Normal control flow - don't dump trace
|
|
raise
|
|
except ZCpuError:
|
|
# All other ZCpu errors - dump trace for debugging
|
|
self._dump_trace()
|
|
raise
|
|
return True
|
|
|
|
def run(self):
|
|
"""The Magic Function that takes little bits and bytes, twirls
|
|
them around, and brings the magic to your screen!"""
|
|
log("Execution started")
|
|
while self.step():
|
|
pass
|
|
|
|
##
|
|
## Opcode implementation functions start here.
|
|
##
|
|
|
|
## 2OP opcodes (opcodes 1-127 and 192-223)
|
|
def op_je(self, a, *others):
|
|
"""Branch if the first argument is equal to any subsequent
|
|
arguments. Note that the second operand may be absent, in
|
|
which case there is no jump."""
|
|
for b in others:
|
|
if a == b:
|
|
self._branch(True)
|
|
return
|
|
|
|
# Fallthrough: No args were equal to a.
|
|
self._branch(False)
|
|
|
|
def op_jl(self, a, b):
|
|
"""Branch if the first argument is less than the second."""
|
|
a = self._make_signed(a)
|
|
b = self._make_signed(b)
|
|
if a < b:
|
|
self._branch(True)
|
|
else:
|
|
self._branch(False)
|
|
|
|
def op_jg(self, a, b):
|
|
"""Branch if the first argument is greater than the second."""
|
|
a = self._make_signed(a)
|
|
b = self._make_signed(b)
|
|
if a > b:
|
|
self._branch(True)
|
|
else:
|
|
self._branch(False)
|
|
|
|
def op_dec_chk(self, variable, test_value):
|
|
"""Decrement the variable, and branch if the value becomes
|
|
less than test_value."""
|
|
val = self._read_variable(variable)
|
|
val = (val - 1) % 65536
|
|
self._write_result(val, store_addr=variable)
|
|
self._branch(self._make_signed(val) < self._make_signed(test_value))
|
|
|
|
def op_inc_chk(self, variable, test_value):
|
|
"""Increment the variable, and branch if the value becomes
|
|
greater than the test value."""
|
|
val = self._read_variable(variable)
|
|
val = (val + 1) % 65536
|
|
self._write_result(val, store_addr=variable)
|
|
self._branch(self._make_signed(val) > self._make_signed(test_value))
|
|
|
|
def op_jin(self, obj1, obj2):
|
|
"""Branch if obj1's parent equals obj2."""
|
|
parent = self._objects.get_parent(obj1)
|
|
self._branch(parent == obj2)
|
|
|
|
def op_test(self, bitmap, flags):
|
|
"""Test if all bits in flags are set in bitmap, branch if true."""
|
|
self._branch((bitmap & flags) == flags)
|
|
|
|
def op_or(self, a, b):
|
|
"""Bitwise OR between the two arguments."""
|
|
self._write_result(a | b)
|
|
|
|
def op_and(self, a, b):
|
|
"""Bitwise AND between the two arguments."""
|
|
self._write_result(a & b)
|
|
|
|
def op_test_attr(self, obj, attr):
|
|
"""Test if object has attribute, branch if true."""
|
|
has_attr = self._objects.get_attribute(obj, attr)
|
|
self._branch(has_attr != 0)
|
|
|
|
def op_set_attr(self, obj, attr):
|
|
"""Set attribute on object."""
|
|
self._objects.set_attribute(obj, attr)
|
|
|
|
def op_clear_attr(self, obj, attr):
|
|
"""Clear attribute on object."""
|
|
self._objects.clear_attribute(obj, attr)
|
|
|
|
def op_store(self, variable, value):
|
|
"""Store the given value to the given variable."""
|
|
self._write_result(value, store_addr=variable)
|
|
|
|
def op_insert_obj(self, object, dest):
|
|
"""Move object OBJECT to become the first child of object
|
|
DEST. After the move, the prior first child of DEST is now
|
|
the OBJECT's sibling."""
|
|
self._objects.insert_object(dest, object)
|
|
|
|
def op_loadw(self, base, offset):
|
|
"""Store in the given result register the word value at
|
|
(base+2*offset)."""
|
|
val = self._memory.read_word(base + 2 * offset)
|
|
self._write_result(val)
|
|
|
|
def op_loadb(self, base, offset):
|
|
"""Store in the given result register the byte value at
|
|
(base+offset)."""
|
|
val = self._memory[base + offset]
|
|
self._write_result(val)
|
|
|
|
def op_get_prop(self, objectnum, propnum):
|
|
"""Store in the given result an object's property value
|
|
(either a byte or word)."""
|
|
val = self._objects.get_prop(objectnum, propnum)
|
|
self._write_result(val)
|
|
|
|
def op_get_prop_addr(self, obj, prop):
|
|
"""Get property data address, store 0 if not found."""
|
|
addr = self._objects.get_property_data_address(obj, prop)
|
|
self._write_result(addr)
|
|
|
|
def op_get_next_prop(self, obj, prop):
|
|
"""Get next property number, store result."""
|
|
next_prop = self._objects.get_next_property(obj, prop)
|
|
self._write_result(next_prop)
|
|
|
|
def op_add(self, a, b):
|
|
"""Signed 16-bit addition."""
|
|
result = self._unmake_signed(self._make_signed(a) + self._make_signed(b))
|
|
self._write_result(result)
|
|
|
|
def op_sub(self, a, b):
|
|
"""Signed 16-bit subtraction"""
|
|
result = self._unmake_signed(self._make_signed(a) - self._make_signed(b))
|
|
self._write_result(result)
|
|
|
|
def op_mul(self, a, b):
|
|
"""Signed 16-bit multiplication."""
|
|
result = self._unmake_signed(self._make_signed(a) * self._make_signed(b))
|
|
self._write_result(result)
|
|
|
|
def op_div(self, a, b):
|
|
"""Signed 16-bit division."""
|
|
a = self._make_signed(a)
|
|
b = self._make_signed(b)
|
|
if b == 0:
|
|
raise ZCpuDivideByZero
|
|
self._write_result(self._unmake_signed(a // b))
|
|
|
|
def op_mod(self, a, b):
|
|
"""Signed 16-bit modulo (remainder after division)."""
|
|
a = self._make_signed(a)
|
|
b = self._make_signed(b)
|
|
if b == 0:
|
|
raise ZCpuDivideByZero
|
|
# Z-machine uses truncation toward zero, not Python's floor division
|
|
quotient = int(a / b)
|
|
if quotient < 0 and quotient < a / b:
|
|
quotient += 1
|
|
if quotient > 0 and quotient > a / b:
|
|
quotient -= 1
|
|
remainder = a - (quotient * b)
|
|
self._write_result(self._unmake_signed(remainder))
|
|
|
|
def op_call_2s(self, routine_addr, arg1):
|
|
"""Call routine(arg1) and store the result."""
|
|
self._call(routine_addr, [arg1], True)
|
|
|
|
def op_call_2n(self, routine_addr, arg1):
|
|
"""Call routine(arg1) and throw away the result."""
|
|
self._call(routine_addr, [arg1], False)
|
|
|
|
def op_set_colour(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_throw(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
## 1OP opcodes (opcodes 128-175)
|
|
|
|
def op_jz(self, val):
|
|
"""Branch if the val is zero."""
|
|
self._branch(val == 0)
|
|
|
|
def op_get_sibling(self, obj):
|
|
"""Get sibling of object, store it, branch if nonzero."""
|
|
sibling = self._objects.get_sibling(obj)
|
|
self._write_result(sibling)
|
|
self._branch(sibling != 0)
|
|
|
|
def op_get_child(self, object_num):
|
|
"""Get first child of object, store it, branch if nonzero."""
|
|
child = self._objects.get_child(object_num)
|
|
self._write_result(child)
|
|
self._branch(child != 0)
|
|
|
|
def op_get_parent(self, object_num):
|
|
"""Get and store the parent of the given object."""
|
|
self._write_result(self._objects.get_parent(object_num))
|
|
|
|
def op_get_prop_len(self, data_addr):
|
|
"""Get property data length from data address, store result."""
|
|
length = self._objects.get_property_length(data_addr)
|
|
self._write_result(length)
|
|
|
|
def op_inc(self, variable):
|
|
"""Increment the given value."""
|
|
val = self._read_variable(variable)
|
|
val = (val + 1) % 65536
|
|
self._write_result(val, store_addr=variable)
|
|
|
|
def op_dec(self, variable):
|
|
"""Decrement the given variable."""
|
|
val = self._read_variable(variable)
|
|
val = self._make_signed(val)
|
|
val = val - 1
|
|
val = self._unmake_signed(val)
|
|
self._write_result(val, store_addr=variable)
|
|
|
|
def op_print_addr(self, string_byte_address):
|
|
"""Print the z-encoded string at the given byte address."""
|
|
text = self._string.get(string_byte_address)
|
|
self._ui.screen.write(text)
|
|
|
|
def op_call_1s(self, routine_address):
|
|
"""Call the given routine and store the return value."""
|
|
self._call(routine_address, [], True)
|
|
|
|
def op_remove_obj(self, obj):
|
|
"""Remove object from its parent."""
|
|
self._objects.remove_object(obj)
|
|
|
|
def op_print_obj(self, obj):
|
|
"""Print object's short name."""
|
|
shortname = self._objects.get_shortname(obj)
|
|
self._ui.screen.write(shortname)
|
|
|
|
def op_ret(self, value):
|
|
"""Return from the current routine with the given value."""
|
|
pc = self._stackmanager.finish_routine(value)
|
|
self._opdecoder.program_counter = pc
|
|
|
|
def op_jump(self, offset):
|
|
"""Jump unconditionally to the given branch offset. This
|
|
opcode does not follow the usual branch decision algorithm,
|
|
and so we do not call the _branch method to dispatch the call."""
|
|
|
|
old_pc = self._opdecoder.program_counter
|
|
|
|
# The offset to the jump instruction is known to be a 2-byte
|
|
# signed integer. We need to make it signed before applying
|
|
# the offset.
|
|
if offset >= (1 << 15):
|
|
offset = -(1 << 16) + offset
|
|
log(f"Jump unconditionally to relative offset {offset}")
|
|
|
|
# Apparently reading the 2 bytes of operand *isn't* supposed
|
|
# to increment the PC, thus we need to apply this offset to PC
|
|
# that's still pointing at the 'jump' opcode. Hence the -2
|
|
# modifier below.
|
|
new_pc = self._opdecoder.program_counter + offset - 2
|
|
self._opdecoder.program_counter = new_pc
|
|
log(f"PC has changed from from {old_pc:x} to {new_pc:x}")
|
|
|
|
def op_print_paddr(self, string_paddr):
|
|
"""Print the string at the given packed address."""
|
|
zstr_address = self._memory.packed_address(string_paddr)
|
|
text = self._string.get(zstr_address)
|
|
self._ui.screen.write(text)
|
|
|
|
def op_load(self, variable):
|
|
"""Load the value of the given variable and store it."""
|
|
value = self._read_variable(variable)
|
|
self._write_result(value)
|
|
|
|
def op_not(self, value):
|
|
"""Bitwise NOT of the given value."""
|
|
result = ~value & 0xFFFF
|
|
self._write_result(result)
|
|
|
|
def op_call_1n(self, routine_addr):
|
|
"""Call the given routine, and discard the return value."""
|
|
self._call(routine_addr, [], False)
|
|
|
|
## 0OP opcodes (opcodes 176-191)
|
|
|
|
def op_rtrue(self, *args):
|
|
"""Make the current routine return true (1)."""
|
|
pc = self._stackmanager.finish_routine(1)
|
|
self._opdecoder.program_counter = pc
|
|
|
|
def op_rfalse(self, *args):
|
|
"""Make the current routine return false (0)."""
|
|
pc = self._stackmanager.finish_routine(0)
|
|
self._opdecoder.program_counter = pc
|
|
|
|
def op_print(self):
|
|
"""Print the embedded ZString."""
|
|
zstr_address = self._opdecoder.get_zstring()
|
|
text = self._string.get(zstr_address)
|
|
self._ui.screen.write(text)
|
|
|
|
def op_print_ret(self):
|
|
"""TODO: Write docstring here."""
|
|
self.op_print()
|
|
self.op_rtrue()
|
|
|
|
def op_nop(self, *args):
|
|
"""Do nothing."""
|
|
pass
|
|
|
|
def op_save(self, *args):
|
|
"""Save game state to file (V3 - branch on success).
|
|
|
|
Currently always fails because QuetzalWriter is not yet functional.
|
|
"""
|
|
self._branch(False)
|
|
|
|
def op_save_v4(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_restore(self, *args):
|
|
"""Restore game state from file (V3 - branch on success).
|
|
|
|
Currently always fails because QuetzalWriter is not yet functional,
|
|
so there are no valid save files to restore.
|
|
"""
|
|
self._branch(False)
|
|
|
|
def op_restore_v4(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_restart(self, *args):
|
|
"""Restart the game from the beginning.
|
|
|
|
Raises ZCpuRestart exception for the run loop to handle.
|
|
The ZMachine should catch this and reset to initial state.
|
|
"""
|
|
raise ZCpuRestart
|
|
|
|
def op_ret_popped(self, *args):
|
|
"""Pop a value from the stack and return it from the current routine."""
|
|
value = self._stackmanager.pop_stack()
|
|
pc = self._stackmanager.finish_routine(value)
|
|
self._opdecoder.program_counter = pc
|
|
|
|
def op_pop(self, *args):
|
|
"""Pop and discard the top value from the stack."""
|
|
self._stackmanager.pop_stack()
|
|
|
|
def op_catch(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_quit(self, *args):
|
|
"""Quit the game."""
|
|
raise ZCpuQuit
|
|
|
|
def op_new_line(self, *args):
|
|
"""Print a newline."""
|
|
self._ui.screen.write("\n")
|
|
|
|
def op_show_status(self, *args):
|
|
"""Update status line (V3 only). No-op in this implementation."""
|
|
pass
|
|
|
|
def op_verify(self, *args):
|
|
"""Verify story file checksum. Branch if checksum matches."""
|
|
expected_checksum = self._memory.read_word(0x1C)
|
|
actual_checksum = self._memory.generate_checksum()
|
|
self._branch(expected_checksum == actual_checksum)
|
|
|
|
def op_piracy(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
## VAR opcodes (opcodes 224-255)
|
|
|
|
# call in v1-3
|
|
def op_call(self, routine_addr, *args):
|
|
"""Call the routine r1, passing it any of r2, r3, r4 if defined."""
|
|
self._call(routine_addr, args, True)
|
|
|
|
def op_call_vs(self, routine_addr, *args):
|
|
"""See op_call."""
|
|
self.op_call(routine_addr, *args)
|
|
|
|
def op_storew(self, array, offset, value):
|
|
"""Store the given 16-bit value at array+2*byte_index."""
|
|
store_address = array + 2 * offset
|
|
self._memory.write_word(store_address, value)
|
|
|
|
def op_storeb(self, array, byte_index, value):
|
|
"""Store the given byte value at array+byte_index."""
|
|
self._memory[array + byte_index] = value & 0xFF
|
|
|
|
def op_put_prop(self, object_number, property_number, value):
|
|
"""Set an object's property to the given value."""
|
|
self._objects.set_property(object_number, property_number, value)
|
|
|
|
def op_sread(self, *args):
|
|
"""Read text input from keyboard (V3).
|
|
|
|
Args:
|
|
args[0]: text buffer address (byte 0 = max length, text starts at byte 1)
|
|
args[1]: parse buffer address (optional, 0 = no parsing)
|
|
"""
|
|
text_buffer_addr = args[0]
|
|
parse_buffer_addr = args[1] if len(args) > 1 else 0
|
|
|
|
# V3: show status line first
|
|
if self._memory.version <= 3:
|
|
self.op_show_status()
|
|
|
|
# Read input from keyboard
|
|
text = self._ui.keyboard_input.read_line()
|
|
text = text.lower().strip("\n\r")
|
|
|
|
# Store in text buffer
|
|
max_len = self._memory[text_buffer_addr]
|
|
text = text[:max_len]
|
|
for i, ch in enumerate(text):
|
|
self._memory[text_buffer_addr + 1 + i] = ord(ch)
|
|
self._memory[text_buffer_addr + 1 + len(text)] = 0
|
|
|
|
# Tokenize if parse buffer provided
|
|
if parse_buffer_addr != 0:
|
|
max_words = self._memory[parse_buffer_addr]
|
|
tokens = self._lexer.parse_input(text)
|
|
num_words = min(len(tokens), max_words)
|
|
|
|
self._memory[parse_buffer_addr + 1] = num_words
|
|
offset = 0
|
|
for i in range(num_words):
|
|
word_str, dict_addr = tokens[i]
|
|
# Find word position in text
|
|
pos = text.find(word_str, offset)
|
|
if pos == -1:
|
|
pos = offset # fallback: best guess position
|
|
word_len = len(word_str)
|
|
base = parse_buffer_addr + 2 + (i * 4)
|
|
self._memory.write_word(base, dict_addr)
|
|
self._memory[base + 2] = word_len
|
|
self._memory[base + 3] = pos + 1 # 1-indexed from start of text buffer
|
|
offset = pos + word_len
|
|
|
|
def op_sread_v4(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_aread(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_print_char(self, char):
|
|
"""Output the given ZSCII character."""
|
|
self._ui.screen.write(self._string.zscii.get([char]))
|
|
|
|
def op_print_num(self, value):
|
|
"""Print a signed 16-bit number as text."""
|
|
signed_value = self._make_signed(value)
|
|
self._ui.screen.write(str(signed_value))
|
|
|
|
def op_random(self, n):
|
|
"""Generate a random number, or seed the PRNG.
|
|
|
|
If the input is positive, generate a uniformly random number
|
|
in the range [1:input]. If the input is negative, seed the
|
|
PRNG with that value. If the input is zero, seed the PRNG with
|
|
the current time.
|
|
"""
|
|
result = 0
|
|
if n > 0:
|
|
log(f"Generate random number in [1:{n}]")
|
|
result = random.randint(1, n)
|
|
elif n < 0:
|
|
log(f"Seed PRNG with {n}")
|
|
random.seed(n)
|
|
else:
|
|
log("Seed PRNG with time")
|
|
random.seed(time.time())
|
|
self._write_result(result)
|
|
|
|
def op_push(self, value):
|
|
"""Push a value onto the current routine's game stack."""
|
|
self._stackmanager.push_stack(value)
|
|
|
|
def op_pull(self, variable):
|
|
"""Pop a value from the stack and store it in the given variable."""
|
|
value = self._stackmanager.pop_stack()
|
|
self._write_result(value, store_addr=variable)
|
|
|
|
def op_split_window(self, height):
|
|
"""Split or unsplit the window horizontally."""
|
|
self._ui.screen.split_window(height)
|
|
|
|
def op_set_window(self, window_num):
|
|
"""Set the given window as the active window."""
|
|
self._ui.screen.select_window(window_num)
|
|
|
|
def op_call_vs2(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_erase_window(self, window_number):
|
|
"""Clear the window with the given number. If # is -1, unsplit
|
|
all and clear (full reset). If # is -2, clear all but don't
|
|
unsplit."""
|
|
if window_number == -1:
|
|
self.op_split_window(0)
|
|
self._ui.screen.erase_window(zscreen.WINDOW_LOWER)
|
|
if window_number == -2:
|
|
self._ui.screen.erase_window(zscreen.WINDOW_LOWER)
|
|
self._ui.screen.erase_window(zscreen.WINDOW_UPPER)
|
|
else:
|
|
self._ui.screen.erase_window(window_number)
|
|
|
|
def op_erase_line(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_set_cursor(self, x, y):
|
|
"""Set the cursor position within the active window."""
|
|
self._ui.screen.set_cursor_position(x, y)
|
|
|
|
def op_get_cursor(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_set_text_style(self, text_style):
|
|
"""Set the text style."""
|
|
self._ui.screen.set_text_style(text_style)
|
|
|
|
def op_buffer_mode(self, flag):
|
|
"""If set to 1, text output on the lower window in stream 1 is
|
|
buffered up so that it can be word-wrapped properly. If set to
|
|
0, it isn't."""
|
|
|
|
self._ui.screen.buffer_mode = bool(flag)
|
|
|
|
def op_output_stream(self, stream_num):
|
|
"""Enable or disable the given stream.
|
|
|
|
This is the v3/4 implementation of the opcode, which just
|
|
delegates to the backwards compatible v5 implementation.
|
|
"""
|
|
self.op_output_stream_v5(stream_num)
|
|
|
|
def op_output_stream_v5(self, stream_num, table=None):
|
|
"""Enable or disable the given output stream."""
|
|
stream_num = self._make_signed(stream_num)
|
|
if stream_num < 0:
|
|
self._streammanager.output.unselect(-stream_num)
|
|
else:
|
|
self._streammanager.output.select(stream_num)
|
|
|
|
def op_input_stream(self, stream_num):
|
|
"""Select input stream (stub - not yet implemented for MUD integration)."""
|
|
pass
|
|
|
|
# This one may have been used prematurely in v3 stories. Keep an
|
|
# eye out for it if we ever get bug reports.
|
|
def op_sound_effect(self, *args):
|
|
"""Play sound effect (no-op - sound not supported in text MUD)."""
|
|
pass
|
|
|
|
def op_read_char(self, unused, time, input_routine):
|
|
"""Read a single character from input stream 0 (keyboard).
|
|
|
|
Optionally, call a routine periodically to decide whether or
|
|
not to interrupt user input.
|
|
"""
|
|
# According to the spec, the first argument is always one, and
|
|
# exists only for Historical Reasons(tm)
|
|
assert unused == 1
|
|
|
|
# TODO: shiny timer stuff not implemented yet.
|
|
if time != 0 or input_routine != 0:
|
|
raise ZCpuNotImplemented
|
|
|
|
char = self._ui.keyboard_input.read_char()
|
|
self._write_result(char)
|
|
|
|
def op_scan_table(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_not_v5(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_call_vn(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_call_vn2(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_tokenize(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_encode_text(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_copy_table(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_print_table(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_check_arg_count(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
## EXT opcodes (opcodes 256-284)
|
|
|
|
def op_save_v5(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_restore_v5(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_log_shift(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_art_shift(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_set_font(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_save_undo(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_restore_undo(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_print_unicode(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
def op_check_unicode(self, *args):
|
|
"""TODO: Write docstring here."""
|
|
raise ZCpuNotImplemented
|
|
|
|
# Declaration of the opcode tables. In a Z-Machine, opcodes are
|
|
# divided into tables based on the operand type. Within each
|
|
# table, the operand is then indexed by its number. We preserve
|
|
# that organization in this opcode table.
|
|
#
|
|
# The opcode table is a dictionary mapping an operand type to a
|
|
# list of opcodes definitions. Each opcode definition's index in
|
|
# the table is the opcode number within that opcode table.
|
|
#
|
|
# The opcodes are in one of three forms:
|
|
#
|
|
# - If the opcode is available and unchanging in all versions,
|
|
# then the definition is simply the function implementing the
|
|
# opcode.
|
|
#
|
|
# - If the opcode is only available as of a certain version
|
|
# upwards, then the definition is the tuple (opcode_func,
|
|
# first_version), where first_version is the version of the
|
|
# Z-machine where the opcode appeared.
|
|
#
|
|
# - If the opcode changes meaning with successive revisions of the
|
|
# Z-machine, then the definition is a list of the above tuples,
|
|
# sorted in descending order (tuple with the highest
|
|
# first_version comes first). If an instruction became illegal
|
|
# after a given version, it should have a tuple with the opcode
|
|
# function set to None.
|
|
|
|
opcodes = {
|
|
# 2OP opcodes
|
|
zopdecoder.OPCODE_2OP: [
|
|
None,
|
|
op_je,
|
|
op_jl,
|
|
op_jg,
|
|
op_dec_chk,
|
|
op_inc_chk,
|
|
op_jin,
|
|
op_test,
|
|
op_or,
|
|
op_and,
|
|
op_test_attr,
|
|
op_set_attr,
|
|
op_clear_attr,
|
|
op_store,
|
|
op_insert_obj,
|
|
op_loadw,
|
|
op_loadb,
|
|
op_get_prop,
|
|
op_get_prop_addr,
|
|
op_get_next_prop,
|
|
op_add,
|
|
op_sub,
|
|
op_mul,
|
|
op_div,
|
|
op_mod,
|
|
(op_call_2s, 4),
|
|
(op_call_2n, 5),
|
|
(op_set_colour, 5),
|
|
(op_throw, 5),
|
|
],
|
|
# 1OP opcodes
|
|
zopdecoder.OPCODE_1OP: [
|
|
op_jz,
|
|
op_get_sibling,
|
|
op_get_child,
|
|
op_get_parent,
|
|
op_get_prop_len,
|
|
op_inc,
|
|
op_dec,
|
|
op_print_addr,
|
|
(op_call_1s, 4),
|
|
op_remove_obj,
|
|
op_print_obj,
|
|
op_ret,
|
|
op_jump,
|
|
op_print_paddr,
|
|
op_load,
|
|
[(op_call_1n, 5), (op_not, 1)],
|
|
],
|
|
# 0OP opcodes
|
|
zopdecoder.OPCODE_0OP: [
|
|
op_rtrue,
|
|
op_rfalse,
|
|
op_print,
|
|
op_print_ret,
|
|
op_nop,
|
|
[(None, 5), (op_save_v4, 4), (op_save, 1)],
|
|
[(None, 5), (op_restore_v4, 4), (op_restore, 1)],
|
|
op_restart,
|
|
op_ret_popped,
|
|
[(op_catch, 5), (op_pop, 1)],
|
|
op_quit,
|
|
op_new_line,
|
|
[(None, 4), (op_show_status, 3)],
|
|
(op_verify, 3),
|
|
None, # Padding. Opcode 0OP:E is the extended opcode marker.
|
|
(op_piracy, 5),
|
|
],
|
|
# VAR opcodes
|
|
zopdecoder.OPCODE_VAR: [
|
|
[(op_call_vs, 4), (op_call, 1)],
|
|
op_storew,
|
|
op_storeb,
|
|
op_put_prop,
|
|
[(op_aread, 5), (op_sread_v4, 4), (op_sread, 1)],
|
|
op_print_char,
|
|
op_print_num,
|
|
op_random,
|
|
op_push,
|
|
op_pull,
|
|
(op_split_window, 3),
|
|
(op_set_window, 3),
|
|
(op_call_vs2, 4),
|
|
(op_erase_window, 4),
|
|
(op_erase_line, 4),
|
|
(op_set_cursor, 4),
|
|
(op_get_cursor, 4),
|
|
(op_set_text_style, 4),
|
|
(op_buffer_mode, 4),
|
|
[(op_output_stream_v5, 5), (op_output_stream, 3)],
|
|
(op_input_stream, 3),
|
|
(op_sound_effect, 5),
|
|
(op_read_char, 4),
|
|
(op_scan_table, 4),
|
|
(op_not, 5),
|
|
(op_call_vn, 5),
|
|
(op_call_vn2, 5),
|
|
(op_tokenize, 5),
|
|
(op_encode_text, 5),
|
|
(op_copy_table, 5),
|
|
(op_print_table, 5),
|
|
(op_check_arg_count, 5),
|
|
],
|
|
# EXT opcodes
|
|
zopdecoder.OPCODE_EXT: [
|
|
(op_save_v5, 5),
|
|
(op_restore_v5, 5),
|
|
(op_log_shift, 5),
|
|
(op_art_shift, 5),
|
|
(op_set_font, 5),
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
(op_save_undo, 5),
|
|
(op_restore_undo, 5),
|
|
(op_print_unicode, 5),
|
|
(op_check_unicode, 5),
|
|
],
|
|
}
|