Implements load_zone() and load_zones() functions to parse zone definitions from TOML files. Wires zone loading into server startup to register all zones from content/zones/ directory. Updates player zone lookup to use the registry instead of hardcoded overworld check. Includes tavern.toml as first hand-built interior zone (8x6 bounded).
521 lines
18 KiB
Python
521 lines
18 KiB
Python
"""Telnet server for the MUD."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import time
|
|
import tomllib
|
|
from typing import cast
|
|
|
|
import telnetlib3
|
|
from telnetlib3.server_shell import readline2
|
|
|
|
import mudlib.combat.commands
|
|
import mudlib.commands
|
|
import mudlib.commands.containers
|
|
import mudlib.commands.edit
|
|
import mudlib.commands.fly
|
|
import mudlib.commands.help
|
|
import mudlib.commands.look
|
|
import mudlib.commands.movement
|
|
import mudlib.commands.play
|
|
import mudlib.commands.portals
|
|
import mudlib.commands.quit
|
|
import mudlib.commands.reload
|
|
import mudlib.commands.spawn
|
|
import mudlib.commands.things
|
|
from mudlib.caps import parse_mtts
|
|
from mudlib.combat.commands import register_combat_commands
|
|
from mudlib.combat.engine import process_combat
|
|
from mudlib.content import load_commands
|
|
from mudlib.effects import clear_expired
|
|
from mudlib.if_session import broadcast_to_spectators
|
|
from mudlib.mob_ai import process_mobs
|
|
from mudlib.mobs import load_mob_templates, mob_templates
|
|
from mudlib.player import Player, players
|
|
from mudlib.resting import process_resting
|
|
from mudlib.store import (
|
|
PlayerData,
|
|
account_exists,
|
|
authenticate,
|
|
create_account,
|
|
init_db,
|
|
load_player_data,
|
|
save_player,
|
|
update_last_login,
|
|
)
|
|
from mudlib.thing import Thing
|
|
from mudlib.things import load_thing_templates, spawn_thing, thing_templates
|
|
from mudlib.world.terrain import World
|
|
from mudlib.zone import Zone
|
|
from mudlib.zones import get_zone, load_zones, register_zone
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
HOST = os.environ.get("MUD_HOST", "127.0.0.1")
|
|
PORT = int(os.environ.get("MUD_PORT", "6789"))
|
|
TICK_RATE = 10 # ticks per second
|
|
TICK_INTERVAL = 1.0 / TICK_RATE
|
|
AUTOSAVE_INTERVAL = 60.0 # seconds between auto-saves
|
|
|
|
# Module-level overworld zone instance, created once at startup
|
|
_overworld: Zone | None = None
|
|
|
|
|
|
def load_world_config(world_name: str = "earth") -> dict:
|
|
"""Load world configuration from TOML file."""
|
|
worlds_dir = pathlib.Path(__file__).resolve().parents[2] / "worlds"
|
|
config_path = worlds_dir / world_name / "config.toml"
|
|
with open(config_path, "rb") as f:
|
|
return tomllib.load(f)
|
|
|
|
|
|
async def game_loop() -> None:
|
|
"""Run periodic game tasks at TICK_RATE ticks per second."""
|
|
log.info("game loop started (%d ticks/sec)", TICK_RATE)
|
|
last_save_time = time.monotonic()
|
|
|
|
while True:
|
|
t0 = asyncio.get_event_loop().time()
|
|
clear_expired()
|
|
await process_combat()
|
|
await process_mobs(mudlib.combat.commands.combat_moves)
|
|
await process_resting()
|
|
|
|
# Periodic auto-save (every 60 seconds)
|
|
current_time = time.monotonic()
|
|
if current_time - last_save_time >= AUTOSAVE_INTERVAL:
|
|
player_count = len(players)
|
|
if player_count > 0:
|
|
log.debug("auto-saving %d players", player_count)
|
|
for player in list(players.values()):
|
|
save_player(player)
|
|
last_save_time = current_time
|
|
|
|
elapsed = asyncio.get_event_loop().time() - t0
|
|
sleep_time = TICK_INTERVAL - elapsed
|
|
if sleep_time > 0:
|
|
await asyncio.sleep(sleep_time)
|
|
|
|
|
|
def find_passable_start(zone: Zone, start_x: int, start_y: int) -> tuple[int, int]:
|
|
"""Find a passable tile starting from (start_x, start_y) and searching outward.
|
|
|
|
Args:
|
|
zone: The zone to search in
|
|
start_x: Starting X coordinate
|
|
start_y: Starting Y coordinate
|
|
|
|
Returns:
|
|
Tuple of (x, y) for the first passable tile found
|
|
"""
|
|
# Try the starting position first
|
|
if zone.is_passable(start_x, start_y):
|
|
return start_x, start_y
|
|
|
|
# Spiral outward from the starting position (wrapping)
|
|
for radius in range(1, 100):
|
|
for dx in range(-radius, radius + 1):
|
|
for dy in range(-radius, radius + 1):
|
|
# Only check the perimeter of the current radius
|
|
if abs(dx) != radius and abs(dy) != radius:
|
|
continue
|
|
|
|
x, y = zone.wrap(start_x + dx, start_y + dy)
|
|
|
|
if zone.is_passable(x, y):
|
|
return x, y
|
|
|
|
# Fallback to starting position if nothing found
|
|
return start_x, start_y
|
|
|
|
|
|
async def handle_login(
|
|
name: str,
|
|
read_func,
|
|
write_func,
|
|
) -> dict:
|
|
"""Handle login or registration for a player.
|
|
|
|
Args:
|
|
name: Player name
|
|
read_func: Async function to read input
|
|
write_func: Async function to write output
|
|
|
|
Returns:
|
|
Dict with 'success' (bool) and 'player_data' (dict or None)
|
|
"""
|
|
if account_exists(name):
|
|
# Existing account - authenticate
|
|
max_attempts = 3
|
|
for attempt in range(max_attempts):
|
|
await write_func("Password: ")
|
|
password = await read_func()
|
|
|
|
if password is None or not password.strip():
|
|
return {"success": False, "player_data": None}
|
|
|
|
if authenticate(name, password.strip()):
|
|
# Success - load player data
|
|
player_data = load_player_data(name)
|
|
return {"success": True, "player_data": player_data}
|
|
|
|
remaining = max_attempts - attempt - 1
|
|
if remaining > 0:
|
|
msg = f"Incorrect password. {remaining} attempts remaining.\r\n"
|
|
await write_func(msg)
|
|
else:
|
|
await write_func("Too many failed attempts.\r\n")
|
|
|
|
return {"success": False, "player_data": None}
|
|
|
|
# New account - registration
|
|
await write_func(f"Account '{name}' does not exist. Create new account? (y/n) ")
|
|
response = await read_func()
|
|
|
|
if response is None or response.strip().lower() != "y":
|
|
return {"success": False, "player_data": None}
|
|
|
|
# Get password with confirmation
|
|
while True:
|
|
await write_func("Choose a password: ")
|
|
password1 = await read_func()
|
|
|
|
if password1 is None or not password1.strip():
|
|
return {"success": False, "player_data": None}
|
|
|
|
await write_func("Confirm password: ")
|
|
password2 = await read_func()
|
|
|
|
if password2 is None or not password2.strip():
|
|
return {"success": False, "player_data": None}
|
|
|
|
if password1.strip() == password2.strip():
|
|
# Passwords match - create account
|
|
if create_account(name, password1.strip()):
|
|
await write_func("Account created successfully!\r\n")
|
|
# Return default data for new account
|
|
player_data = load_player_data(name)
|
|
return {"success": True, "player_data": player_data}
|
|
await write_func("Failed to create account.\r\n")
|
|
return {"success": False, "player_data": None}
|
|
|
|
# Passwords don't match - retry or cancel
|
|
await write_func("Passwords do not match. Try again? (y/n) ")
|
|
retry = await read_func()
|
|
if retry is None or retry.strip().lower() != "y":
|
|
return {"success": False, "player_data": None}
|
|
|
|
|
|
async def shell(
|
|
reader: telnetlib3.TelnetReader | telnetlib3.TelnetReaderUnicode,
|
|
writer: telnetlib3.TelnetWriter | telnetlib3.TelnetWriterUnicode,
|
|
) -> None:
|
|
"""Shell callback that handles player connection and command loop."""
|
|
_reader = cast(telnetlib3.TelnetReaderUnicode, reader)
|
|
_writer = cast(telnetlib3.TelnetWriterUnicode, writer)
|
|
|
|
assert _overworld is not None, (
|
|
"Overworld zone must be initialized before accepting connections"
|
|
)
|
|
|
|
log.debug("new connection from %s", _writer.get_extra_info("peername"))
|
|
|
|
_writer.write("Welcome to the MUD!\r\n")
|
|
_writer.write("What is your name? ")
|
|
await _writer.drain()
|
|
|
|
name_input = await readline2(_reader, _writer)
|
|
if name_input is None or not name_input.strip():
|
|
_writer.close()
|
|
return
|
|
|
|
player_name = name_input.strip()
|
|
|
|
# Handle login/registration
|
|
async def read_input():
|
|
result = await readline2(_reader, _writer)
|
|
return result
|
|
|
|
async def write_output(msg: str):
|
|
_writer.write(msg)
|
|
await _writer.drain()
|
|
|
|
login_result = await handle_login(player_name, read_input, write_output)
|
|
|
|
if not login_result["success"]:
|
|
_writer.write("Login failed. Disconnecting.\r\n")
|
|
await _writer.drain()
|
|
_writer.close()
|
|
return
|
|
|
|
# Update last login timestamp
|
|
update_last_login(player_name)
|
|
|
|
# Load player data from database or use defaults for new player
|
|
player_data: PlayerData | None = login_result["player_data"]
|
|
if player_data is None:
|
|
# New player - find a passable starting position
|
|
center_x = _overworld.width // 2
|
|
center_y = _overworld.height // 2
|
|
start_x, start_y = find_passable_start(_overworld, center_x, center_y)
|
|
player_data = {
|
|
"x": start_x,
|
|
"y": start_y,
|
|
"pl": 100.0,
|
|
"stamina": 100.0,
|
|
"max_stamina": 100.0,
|
|
"flying": False,
|
|
"zone_name": "overworld",
|
|
"inventory": [],
|
|
}
|
|
|
|
# Resolve zone from zone_name using zone registry
|
|
zone_name = player_data.get("zone_name", "overworld")
|
|
player_zone = get_zone(zone_name)
|
|
if player_zone is None:
|
|
log.warning(
|
|
"unknown zone '%s' for player '%s', defaulting to overworld",
|
|
zone_name,
|
|
player_name,
|
|
)
|
|
player_zone = _overworld
|
|
|
|
# Verify spawn position is still passable
|
|
if not player_zone.is_passable(player_data["x"], player_data["y"]):
|
|
# Saved position is no longer passable, find a new one
|
|
start_x, start_y = find_passable_start(
|
|
player_zone, player_data["x"], player_data["y"]
|
|
)
|
|
player_data["x"] = start_x
|
|
player_data["y"] = start_y
|
|
|
|
# Create player instance
|
|
player = Player(
|
|
name=player_name,
|
|
location=player_zone,
|
|
x=player_data["x"],
|
|
y=player_data["y"],
|
|
pl=player_data["pl"],
|
|
stamina=player_data["stamina"],
|
|
max_stamina=player_data["max_stamina"],
|
|
flying=player_data["flying"],
|
|
writer=_writer,
|
|
reader=_reader,
|
|
)
|
|
|
|
# Reconstruct inventory from saved data
|
|
for item_name in player_data.get("inventory", []):
|
|
template = thing_templates.get(item_name)
|
|
if template:
|
|
spawn_thing(template, player)
|
|
else:
|
|
# Template not found — create a bare Thing so it's not lost
|
|
log.warning(
|
|
"unknown thing template '%s' for player '%s'",
|
|
item_name,
|
|
player_name,
|
|
)
|
|
Thing(name=item_name, location=player)
|
|
|
|
# Parse and store client capabilities from MTTS
|
|
ttype3 = _writer.get_extra_info("ttype3")
|
|
player.caps = parse_mtts(ttype3)
|
|
log.debug(
|
|
"%s capabilities: %s (color_depth=%s)",
|
|
player_name,
|
|
player.caps,
|
|
player.color_depth,
|
|
)
|
|
|
|
# Register player
|
|
players[player_name] = player
|
|
log.info(
|
|
"%s connected at (%d, %d)",
|
|
player_name,
|
|
player_data["x"],
|
|
player_data["y"],
|
|
)
|
|
|
|
_writer.write(f"\r\nWelcome, {player_name}!\r\n")
|
|
_writer.write("Type 'help' for commands or 'quit' to disconnect.\r\n\r\n")
|
|
await _writer.drain()
|
|
|
|
# Show initial map
|
|
await mudlib.commands.look.cmd_look(player, "")
|
|
|
|
# Command loop
|
|
try:
|
|
while not _writer.is_closing():
|
|
# Show appropriate prompt based on mode
|
|
if player.mode == "editor" and player.editor:
|
|
_writer.write(f" {player.editor.cursor + 1}> ")
|
|
elif player.mode == "if" and player.if_session:
|
|
_writer.write("> ")
|
|
else:
|
|
_writer.write("mud> ")
|
|
await _writer.drain()
|
|
|
|
inp = await readline2(_reader, _writer)
|
|
if inp is None:
|
|
break
|
|
|
|
command = inp.strip()
|
|
if not command and player.mode not in ("editor", "if"):
|
|
continue
|
|
|
|
# Handle editor mode
|
|
if player.mode == "editor" and player.editor:
|
|
response = await player.editor.handle_input(inp)
|
|
if response.output:
|
|
await player.send(response.output)
|
|
if response.done:
|
|
player.editor = None
|
|
player.mode_stack.pop()
|
|
# Handle IF mode
|
|
elif player.mode == "if" and player.if_session:
|
|
response = await player.if_session.handle_input(command)
|
|
if response.output:
|
|
# Ensure output ends with newline
|
|
output = response.output
|
|
if not output.endswith("\r\n"):
|
|
output += "\r\n"
|
|
await player.send(output)
|
|
# Broadcast to spectators unless it's an escape command
|
|
if not command.startswith("::"):
|
|
spectator_msg = (
|
|
f"[{player.name}'s terminal]\r\n> {command}\r\n{output}"
|
|
)
|
|
await broadcast_to_spectators(player, spectator_msg)
|
|
if response.done:
|
|
await player.if_session.stop()
|
|
player.if_session = None
|
|
player.mode_stack.pop()
|
|
await player.send("\r\nyou leave the terminal.\r\n")
|
|
# Notify spectators
|
|
leave_msg = f"{player.name} steps away from the terminal.\r\n"
|
|
await broadcast_to_spectators(player, leave_msg)
|
|
else:
|
|
# Dispatch normal command
|
|
await mudlib.commands.dispatch(player, command)
|
|
|
|
# Check if writer was closed by quit command
|
|
if _writer.is_closing():
|
|
break
|
|
finally:
|
|
# Clean up IF session if player was playing
|
|
if player.if_session:
|
|
await player.if_session.stop()
|
|
# Save player state on disconnect (if not already saved by quit command)
|
|
if player_name in players:
|
|
save_player(player)
|
|
del players[player_name]
|
|
log.info("%s disconnected", player_name)
|
|
|
|
_writer.close()
|
|
|
|
|
|
async def run_server() -> None:
|
|
"""Start the MUD telnet server."""
|
|
global _overworld
|
|
|
|
# Initialize database
|
|
data_dir = pathlib.Path(__file__).resolve().parents[2] / "data"
|
|
db_path = data_dir / "mud.db"
|
|
log.info("initializing database at %s", db_path)
|
|
init_db(db_path)
|
|
|
|
# Generate world once at startup (cached to build/ after first run)
|
|
cache_dir = pathlib.Path(__file__).resolve().parents[2] / "build"
|
|
config = load_world_config()
|
|
world_cfg = config["world"]
|
|
log.info(
|
|
"loading world (seed=%d, %dx%d)...",
|
|
world_cfg["seed"],
|
|
world_cfg["width"],
|
|
world_cfg["height"],
|
|
)
|
|
t0 = time.monotonic()
|
|
world = World(
|
|
seed=world_cfg["seed"],
|
|
width=world_cfg["width"],
|
|
height=world_cfg["height"],
|
|
cache_dir=cache_dir,
|
|
)
|
|
elapsed = time.monotonic() - t0
|
|
if world.cached:
|
|
log.info("world loaded from cache in %.2fs", elapsed)
|
|
else:
|
|
log.info("world generated in %.2fs (cached for next run)", elapsed)
|
|
|
|
# Create overworld zone from generated terrain
|
|
_overworld = Zone(
|
|
name="overworld",
|
|
width=world.width,
|
|
height=world.height,
|
|
terrain=world.terrain,
|
|
toroidal=True,
|
|
)
|
|
log.info("created overworld zone (%dx%d, toroidal)", world.width, world.height)
|
|
|
|
# Register overworld zone
|
|
register_zone("overworld", _overworld)
|
|
|
|
# Load and register zones from content/zones/
|
|
zones_dir = pathlib.Path(__file__).resolve().parents[2] / "content" / "zones"
|
|
if zones_dir.exists():
|
|
loaded_zones = load_zones(zones_dir)
|
|
for zone_name, zone in loaded_zones.items():
|
|
register_zone(zone_name, zone)
|
|
log.info("loaded %d zones from %s", len(loaded_zones), zones_dir)
|
|
|
|
# Load content-defined commands from TOML files
|
|
content_dir = pathlib.Path(__file__).resolve().parents[2] / "content" / "commands"
|
|
if content_dir.exists():
|
|
log.info("loading content from %s", content_dir)
|
|
content_commands = load_commands(content_dir)
|
|
for cmd_def in content_commands:
|
|
mudlib.commands.register(cmd_def)
|
|
log.debug("registered content command: %s", cmd_def.name)
|
|
|
|
# Load combat moves and register as commands
|
|
combat_dir = pathlib.Path(__file__).resolve().parents[2] / "content" / "combat"
|
|
if combat_dir.exists():
|
|
log.info("loading combat moves from %s", combat_dir)
|
|
register_combat_commands(combat_dir)
|
|
log.info("registered combat commands")
|
|
|
|
# Load mob templates
|
|
mobs_dir = pathlib.Path(__file__).resolve().parents[2] / "content" / "mobs"
|
|
if mobs_dir.exists():
|
|
loaded = load_mob_templates(mobs_dir)
|
|
mob_templates.update(loaded)
|
|
log.info("loaded %d mob templates from %s", len(loaded), mobs_dir)
|
|
|
|
# Load thing templates
|
|
things_dir = pathlib.Path(__file__).resolve().parents[2] / "content" / "things"
|
|
if things_dir.exists():
|
|
loaded_things = load_thing_templates(things_dir)
|
|
thing_templates.update(loaded_things)
|
|
log.info("loaded %d thing templates from %s", len(loaded_things), things_dir)
|
|
|
|
# connect_maxwait: how long to wait for telnet option negotiation (CHARSET
|
|
# etc) before starting the shell. default is 4.0s which is painful.
|
|
# MUD clients like tintin++ reject CHARSET immediately via MTTS, but
|
|
# telnetlib3 still waits for the full timeout. 0.5s is plenty.
|
|
server = await telnetlib3.create_server(
|
|
host=HOST, port=PORT, shell=shell, connect_maxwait=0.5
|
|
)
|
|
log.info("listening on %s:%d", HOST, PORT)
|
|
|
|
loop_task = asyncio.create_task(game_loop())
|
|
|
|
try:
|
|
while True:
|
|
await asyncio.sleep(3600)
|
|
except KeyboardInterrupt:
|
|
log.info("shutting down...")
|
|
loop_task.cancel()
|
|
server.close()
|
|
await server.wait_closed()
|