Add seed-based terrain world with movement and viewport

1000x1000 tile world generated deterministically from a seed using
layered Perlin noise. Terrain derived from elevation: mountains,
forests, grasslands, sand, water, with rivers traced downhill from
peaks. ANSI-colored viewport centered on player.

Command system with registry/dispatch, 8-direction movement (n/s/e/w
+ diagonals), look/l, quit/q. Players see arrival/departure messages.

Set connect_maxwait=0.5 on telnetlib3 to avoid the 4s CHARSET
negotiation timeout — MUD clients reject CHARSET immediately via MTTS.
This commit is contained in:
Jared Miller 2026-02-07 13:27:44 -05:00
parent a1d139ea87
commit 0d0c142993
Signed by: shmup
GPG key ID: 22B5C6D66A38B06C
14 changed files with 1368 additions and 19 deletions

42
demo_terrain.py Executable file
View file

@ -0,0 +1,42 @@
#!/usr/bin/env -S uv run --script
"""Demo script to visualize terrain generation."""
from mudlib.render.ansi import colorize_map
from mudlib.world.terrain import World
def main():
print("Generating world (seed=42, 100x100)...")
world = World(seed=42, width=100, height=100)
print("\nTerrain distribution:")
terrain_counts = {".": 0, "^": 0, "~": 0, "T": 0, ":": 0}
for y in range(100):
for x in range(100):
tile = world.get_tile(x, y)
terrain_counts[tile] += 1
for char, name in [
(".", "grass"),
("^", "mountain"),
("~", "water"),
("T", "forest"),
(":", "sand"),
]:
count = terrain_counts[char]
percentage = (count / 10000) * 100
print(f" {char} {name:10} {count:5} tiles ({percentage:5.1f}%)")
print("\nViewport at center (50, 50) - 40x20:")
viewport = world.get_viewport(50, 50, width=40, height=20)
colored = colorize_map(viewport)
print(colored)
print("\nViewport at (25, 25) - 40x20:")
viewport = world.get_viewport(25, 25, width=40, height=20)
colored = colorize_map(viewport)
print(colored)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,57 @@
"""Command registry and dispatcher."""
from collections.abc import Awaitable, Callable
from mudlib.player import Player
# Type alias for command handlers
CommandHandler = Callable[[Player, str], Awaitable[None]]
# Registry maps command names to handler functions
_registry: dict[str, CommandHandler] = {}
def register(
name: str, handler: CommandHandler, aliases: list[str] | None = None
) -> None:
"""Register a command handler with optional aliases.
Args:
name: The primary command name
handler: Async function that handles the command
aliases: Optional list of alternative names for the command
"""
_registry[name] = handler
if aliases:
for alias in aliases:
_registry[alias] = handler
async def dispatch(player: Player, raw_input: str) -> None:
"""Parse input, find command, call handler.
Args:
player: The player executing the command
raw_input: The raw input string from the player
"""
raw_input = raw_input.strip()
if not raw_input:
return
# Split into command and arguments
parts = raw_input.split(maxsplit=1)
command = parts[0].lower()
args = parts[1] if len(parts) > 1 else ""
# Look up the handler
handler = _registry.get(command)
if handler is None:
player.writer.write(f"Unknown command: {command}\r\n")
await player.writer.drain()
return
# Execute the handler
await handler(player, args)

View file

@ -0,0 +1,66 @@
"""Look command for viewing the world."""
from typing import Any
from mudlib.commands import register
from mudlib.player import Player, players
from mudlib.render.ansi import colorize_terrain
# World instance will be injected by the server
world: Any = None
# Viewport dimensions
VIEWPORT_WIDTH = 21
VIEWPORT_HEIGHT = 11
async def cmd_look(player: Player, args: str) -> None:
"""Render the current viewport to the player.
Args:
player: The player executing the command
args: Command arguments (unused for now)
"""
# Get the viewport from the world
viewport = world.get_viewport(player.x, player.y, VIEWPORT_WIDTH, VIEWPORT_HEIGHT)
# Calculate center position
center_x = VIEWPORT_WIDTH // 2
center_y = VIEWPORT_HEIGHT // 2
# Build a list of (relative_x, relative_y) for other players
other_player_positions = []
for other in players.values():
if other.name == player.name:
continue
# Calculate relative position
rel_x = other.x - player.x + center_x
rel_y = other.y - player.y + center_y
# Check if within viewport bounds
if 0 <= rel_x < VIEWPORT_WIDTH and 0 <= rel_y < VIEWPORT_HEIGHT:
other_player_positions.append((rel_x, rel_y))
# Build the output with ANSI coloring
output_lines = []
for y, row in enumerate(viewport):
line = []
for x, tile in enumerate(row):
# Check if this is the player's position
if x == center_x and y == center_y:
line.append(colorize_terrain("@"))
# Check if this is another player's position
elif (x, y) in other_player_positions:
line.append(colorize_terrain("*"))
else:
line.append(colorize_terrain(tile))
output_lines.append("".join(line))
# Send to player
player.writer.write("\r\n".join(output_lines) + "\r\n")
await player.writer.drain()
# Register the look command with its alias
register("look", cmd_look, aliases=["l"])

View file

@ -0,0 +1,161 @@
"""Movement commands for navigating the world."""
from typing import Any
from mudlib.commands import register
from mudlib.player import Player, players
# World instance will be injected by the server
world: Any = None
# Direction mappings: command -> (dx, dy)
DIRECTIONS: dict[str, tuple[int, int]] = {
"n": (0, -1),
"north": (0, -1),
"s": (0, 1),
"south": (0, 1),
"e": (1, 0),
"east": (1, 0),
"w": (-1, 0),
"west": (-1, 0),
"ne": (1, -1),
"northeast": (1, -1),
"nw": (-1, -1),
"northwest": (-1, -1),
"se": (1, 1),
"southeast": (1, 1),
"sw": (-1, 1),
"southwest": (-1, 1),
}
# Opposite directions for arrival messages
OPPOSITE_DIRECTIONS: dict[str, str] = {
"north": "south",
"south": "north",
"east": "west",
"west": "east",
"northeast": "southwest",
"southwest": "northeast",
"northwest": "southeast",
"southeast": "northwest",
}
def get_direction_name(dx: int, dy: int) -> str:
"""Get the full direction name from deltas."""
direction_map = {
(0, -1): "north",
(0, 1): "south",
(1, 0): "east",
(-1, 0): "west",
(1, -1): "northeast",
(-1, -1): "northwest",
(1, 1): "southeast",
(-1, 1): "southwest",
}
return direction_map.get((dx, dy), "")
async def move_player(player: Player, dx: int, dy: int, direction_name: str) -> None:
"""Move a player in a given direction.
Args:
player: The player to move
dx: X delta
dy: Y delta
direction_name: Full name of the direction for messages
"""
target_x = player.x + dx
target_y = player.y + dy
# Check if the target is passable
if not world.is_passable(target_x, target_y):
player.writer.write("You can't go that way.\r\n")
await player.writer.drain()
return
# Send departure message to players in the old area
opposite = OPPOSITE_DIRECTIONS[direction_name]
await send_nearby_message(
player, player.x, player.y, f"{player.name} leaves {direction_name}.\r\n"
)
# Update position
player.x = target_x
player.y = target_y
# Send arrival message to players in the new area
await send_nearby_message(
player, player.x, player.y, f"{player.name} arrives from the {opposite}.\r\n"
)
# Render new viewport to the moving player
from mudlib.commands.look import cmd_look
await cmd_look(player, "")
async def send_nearby_message(player: Player, x: int, y: int, message: str) -> None:
"""Send a message to all players near a location, excluding the player.
Args:
player: The player who triggered the message (excluded from receiving it)
x: X coordinate of the location
y: Y coordinate of the location
message: The message to send
"""
# For now, use a simple viewport range (could be configurable)
viewport_range = 10
for other in players.values():
if other.name == player.name:
continue
# Check if other player is within viewport range
if abs(other.x - x) <= viewport_range and abs(other.y - y) <= viewport_range:
other.writer.write(message)
await other.writer.drain()
# Define individual movement command handlers
async def move_north(player: Player, args: str) -> None:
await move_player(player, 0, -1, "north")
async def move_south(player: Player, args: str) -> None:
await move_player(player, 0, 1, "south")
async def move_east(player: Player, args: str) -> None:
await move_player(player, 1, 0, "east")
async def move_west(player: Player, args: str) -> None:
await move_player(player, -1, 0, "west")
async def move_northeast(player: Player, args: str) -> None:
await move_player(player, 1, -1, "northeast")
async def move_northwest(player: Player, args: str) -> None:
await move_player(player, -1, -1, "northwest")
async def move_southeast(player: Player, args: str) -> None:
await move_player(player, 1, 1, "southeast")
async def move_southwest(player: Player, args: str) -> None:
await move_player(player, -1, 1, "southwest")
# Register all movement commands with their aliases
register("north", move_north, aliases=["n"])
register("south", move_south, aliases=["s"])
register("east", move_east, aliases=["e"])
register("west", move_west, aliases=["w"])
register("northeast", move_northeast, aliases=["ne"])
register("northwest", move_northwest, aliases=["nw"])
register("southeast", move_southeast, aliases=["se"])
register("southwest", move_southwest, aliases=["sw"])

View file

@ -0,0 +1,24 @@
"""Quit command for disconnecting from the server."""
from mudlib.commands import register
from mudlib.player import Player, players
async def cmd_quit(player: Player, args: str) -> None:
"""Disconnect the player from the server.
Args:
player: The player executing the command
args: Command arguments (unused)
"""
player.writer.write("Goodbye!\r\n")
await player.writer.drain()
player.writer.close()
# Remove from player registry
if player.name in players:
del players[player.name]
# Register the quit command with its aliases
register("quit", cmd_quit, aliases=["q"])

19
src/mudlib/player.py Normal file
View file

@ -0,0 +1,19 @@
"""Player state and registry."""
from dataclasses import dataclass
from typing import Any
@dataclass
class Player:
"""Represents a connected player."""
name: str
x: int # position in world
y: int
writer: Any # telnetlib3 TelnetWriter for sending output
reader: Any # telnetlib3 TelnetReader for reading input
# Global registry of connected players
players: dict[str, Player] = {}

53
src/mudlib/render/ansi.py Normal file
View file

@ -0,0 +1,53 @@
"""ANSI color codes for terminal rendering."""
# ANSI color codes
RESET = "\033[0m"
BOLD = "\033[1m"
# foreground colors
BLACK = "\033[30m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
WHITE = "\033[37m"
# bright foreground colors
BRIGHT_BLACK = "\033[90m"
BRIGHT_RED = "\033[91m"
BRIGHT_GREEN = "\033[92m"
BRIGHT_YELLOW = "\033[93m"
BRIGHT_BLUE = "\033[94m"
BRIGHT_MAGENTA = "\033[95m"
BRIGHT_CYAN = "\033[96m"
BRIGHT_WHITE = "\033[97m"
# terrain color mapping
TERRAIN_COLORS = {
".": GREEN, # grass
"^": BRIGHT_BLACK, # mountain
"~": BLUE, # water
"T": GREEN, # forest (darker would be "\033[32m")
":": YELLOW, # sand
"@": BOLD + BRIGHT_WHITE, # player
"*": BOLD + BRIGHT_RED, # other entity
}
def colorize_terrain(char: str) -> str:
"""Return ANSI-colored version of terrain character."""
color = TERRAIN_COLORS.get(char, "")
if color:
return f"{color}{char}{RESET}"
return char
def colorize_map(grid: list[list[str]]) -> str:
"""Colorize a 2D grid of terrain and return as string."""
lines = []
for row in grid:
colored_row = "".join(colorize_terrain(char) for char in row)
lines.append(colored_row)
return "\n".join(lines)

View file

@ -1,53 +1,169 @@
"""Telnet server for the MUD.""" """Telnet server for the MUD."""
import asyncio import asyncio
import logging
import time
from typing import cast from typing import cast
import telnetlib3 import telnetlib3
from telnetlib3.server_shell import readline2 from telnetlib3.server_shell import readline2
import mudlib.commands
import mudlib.commands.look
import mudlib.commands.movement
import mudlib.commands.quit
from mudlib.player import Player, players
from mudlib.world.terrain import World
log = logging.getLogger(__name__)
PORT = 6789 PORT = 6789
# Module-level world instance, generated once at startup
_world: World | None = None
def find_passable_start(world: World, start_x: int, start_y: int) -> tuple[int, int]:
"""Find a passable tile starting from (start_x, start_y) and searching outward.
Args:
world: The world to search in
start_x: Starting X coordinate
start_y: Starting Y coordinate
Returns:
Tuple of (x, y) for the first passable tile found
"""
# Try the starting position first
if world.is_passable(start_x, start_y):
return start_x, start_y
# Spiral outward from the starting position
for radius in range(1, 100):
for dx in range(-radius, radius + 1):
for dy in range(-radius, radius + 1):
# Only check the perimeter of the current radius
if abs(dx) != radius and abs(dy) != radius:
continue
x = start_x + dx
y = start_y + dy
# Check bounds
if not (0 <= x < world.width and 0 <= y < world.height):
continue
if world.is_passable(x, y):
return x, y
# Fallback to starting position if nothing found
return start_x, start_y
async def shell( async def shell(
reader: telnetlib3.TelnetReader | telnetlib3.TelnetReaderUnicode, reader: telnetlib3.TelnetReader | telnetlib3.TelnetReaderUnicode,
writer: telnetlib3.TelnetWriter | telnetlib3.TelnetWriterUnicode, writer: telnetlib3.TelnetWriter | telnetlib3.TelnetWriterUnicode,
) -> None: ) -> None:
"""Shell callback that greets the player and echoes their input.""" """Shell callback that handles player connection and command loop."""
_reader = cast(telnetlib3.TelnetReaderUnicode, reader)
_writer = cast(telnetlib3.TelnetWriterUnicode, writer) _writer = cast(telnetlib3.TelnetWriterUnicode, writer)
assert _world is not None, "World must be initialized before accepting connections"
log.debug("new connection from %s", _writer.get_extra_info("peername"))
_writer.write("Welcome to the MUD!\r\n") _writer.write("Welcome to the MUD!\r\n")
_writer.write("Type 'quit' to disconnect.\r\n\r\n") _writer.write("What is your name? ")
await _writer.drain() await _writer.drain()
name_input = await readline2(_reader, _writer)
if name_input is None or not name_input.strip():
_writer.close()
return
player_name = name_input.strip()
# Find a passable starting position (start at world center)
center_x = _world.width // 2
center_y = _world.height // 2
start_x, start_y = find_passable_start(_world, center_x, center_y)
# Create player
player = Player(
name=player_name,
x=start_x,
y=start_y,
writer=_writer,
reader=_reader,
)
# Register player
players[player_name] = player
log.info("%s connected at (%d, %d)", player_name, start_x, start_y)
_writer.write(f"\r\nWelcome, {player_name}!\r\n")
_writer.write("Type 'help' for commands or 'quit' to disconnect.\r\n\r\n")
await _writer.drain()
# Show initial map
await mudlib.commands.look.cmd_look(player, "")
# Command loop
while not _writer.is_closing(): while not _writer.is_closing():
_writer.write("mud> ") _writer.write("mud> ")
await _writer.drain() await _writer.drain()
inp = await readline2(reader, writer) inp = await readline2(_reader, _writer)
if inp is None: if inp is None:
break break
command = inp.strip() command = inp.strip()
if command == "quit": if not command:
_writer.write("Goodbye!\r\n") continue
# Dispatch command
await mudlib.commands.dispatch(player, command)
# Check if writer was closed by quit command
if _writer.is_closing():
break break
_writer.write(f"You typed: {command}\r\n") # Clean up: remove from registry if still present
if player_name in players:
del players[player_name]
log.info("%s disconnected", player_name)
_writer.close() _writer.close()
async def run_server() -> None: async def run_server() -> None:
"""Start the MUD telnet server.""" """Start the MUD telnet server."""
server = await telnetlib3.create_server(host="127.0.0.1", port=PORT, shell=shell) global _world
print(f"MUD server running on 127.0.0.1:{PORT}")
print("Connect with: telnet 127.0.0.1 6789") # Generate world once at startup
log.info("generating world (seed=42, 1000x1000)...")
t0 = time.monotonic()
_world = World(seed=42, width=1000, height=1000)
elapsed = time.monotonic() - t0
log.info("world generated in %.2fs", elapsed)
# Inject world into command modules
mudlib.commands.look.world = _world
mudlib.commands.movement.world = _world
# connect_maxwait: how long to wait for telnet option negotiation (CHARSET
# etc) before starting the shell. default is 4.0s which is painful.
# MUD clients like tintin++ reject CHARSET immediately via MTTS, but
# telnetlib3 still waits for the full timeout. 0.5s is plenty.
server = await telnetlib3.create_server(
host="127.0.0.1", port=PORT, shell=shell, connect_maxwait=0.5
)
log.info("listening on 127.0.0.1:%d", PORT)
try: try:
while True: while True:
await asyncio.sleep(3600) await asyncio.sleep(3600)
except KeyboardInterrupt: except KeyboardInterrupt:
print("\nShutting down...") log.info("shutting down...")
server.close() server.close()
await server.wait_closed() await server.wait_closed()

282
src/mudlib/world/terrain.py Normal file
View file

@ -0,0 +1,282 @@
"""Terrain generation for MUD worlds using deterministic noise."""
import math
import random
def _make_perm(seed: int) -> list[int]:
"""Generate permutation table from seed."""
rng = random.Random(seed)
p = list(range(256))
rng.shuffle(p)
return p + p
def _noise(perm: list[int], x: float, y: float) -> float:
"""2D Perlin noise, fully inlined for speed."""
# unit grid cell
fx = math.floor(x)
fy = math.floor(y)
xi = int(fx) & 255
yi = int(fy) & 255
# relative coordinates within cell
xf = x - fx
yf = y - fy
# fade curves (inlined)
u = xf * xf * xf * (xf * (xf * 6 - 15) + 10)
v = yf * yf * yf * (yf * (yf * 6 - 15) + 10)
# hash coordinates of 4 corners
aa = perm[perm[xi] + yi]
ab = perm[perm[xi] + yi + 1]
ba = perm[perm[xi + 1] + yi]
bb = perm[perm[xi + 1] + yi + 1]
# grad + lerp inlined for all 4 corners
xf1 = xf - 1
yf1 = yf - 1
def _g(h: int, gx: float, gy: float) -> float:
h &= 7
a = gx if h < 4 else gy
b = gy if h < 4 else gx
return (a if (h & 1) == 0 else -a) + (b if (h & 2) == 0 else -b)
g_aa = _g(aa, xf, yf)
g_ba = _g(ba, xf1, yf)
g_ab = _g(ab, xf, yf1)
g_bb = _g(bb, xf1, yf1)
x1 = g_aa + u * (g_ba - g_aa)
x2 = g_ab + u * (g_bb - g_ab)
return x1 + v * (x2 - x1)
class World:
"""Procedurally generated terrain world."""
TERRAIN_CHARS = {
"mountain": "^",
"forest": "T",
"grass": ".",
"sand": ":",
"water": "~",
}
def __init__(self, seed: int = 42, width: int = 1000, height: int = 1000):
self.seed = seed
self.width = width
self.height = height
# generate terrain grid
self.terrain = self._generate_terrain()
def _generate_elevation(self) -> list[list[float]]:
"""Generate elevation map using fractal noise.
Computes at 1/4 resolution and interpolates up for speed.
"""
w = self.width
h = self.height
perm = _make_perm(self.seed)
noise = _noise
# generate at 1/4 resolution
step = 4
cw = w // step + 2 # +2 for interpolation boundary
ch = h // step + 2
coarse = [[0.0] * cw for _ in range(ch)]
min_val = float("inf")
max_val = float("-inf")
for cy in range(ch):
row = coarse[cy]
for cx in range(cw):
# map coarse coords back to world space
wx = cx * step / w
wy = cy * step / h
value = (
noise(perm, wx * 4, wy * 4)
+ noise(perm, wx * 8, wy * 8) * 0.5
+ noise(perm, wx * 16, wy * 16) * 0.25
)
row[cx] = value
if value < min_val:
min_val = value
if value > max_val:
max_val = value
# normalize coarse grid to [0, 1]
val_range = max_val - min_val
if val_range > 0:
inv_range = 1.0 / val_range
for cy in range(ch):
row = coarse[cy]
for cx in range(cw):
row[cx] = (row[cx] - min_val) * inv_range
# bilinear interpolation to full resolution
inv_step = 1.0 / step
elevation = [[0.0] * w for _ in range(h)]
for y in range(h):
cy_f = y * inv_step
cy0 = int(cy_f)
cy1 = min(cy0 + 1, ch - 1)
fy = cy_f - cy0
fy_inv = 1.0 - fy
row_0 = coarse[cy0]
row_1 = coarse[cy1]
out_row = elevation[y]
for x in range(w):
cx_f = x * inv_step
cx0 = int(cx_f)
cx1 = min(cx0 + 1, cw - 1)
fx = cx_f - cx0
# bilinear interpolation
out_row[x] = (
row_0[cx0] * (1.0 - fx) * fy_inv
+ row_0[cx1] * fx * fy_inv
+ row_1[cx0] * (1.0 - fx) * fy
+ row_1[cx1] * fx * fy
)
return elevation
def _derive_terrain_from_elevation(
self, elevation: list[list[float]]
) -> list[list[str]]:
"""Convert elevation map to terrain types."""
w = self.width
h = self.height
# local lookups
mountain = self.TERRAIN_CHARS["mountain"]
forest = self.TERRAIN_CHARS["forest"]
grass = self.TERRAIN_CHARS["grass"]
sand = self.TERRAIN_CHARS["sand"]
water = self.TERRAIN_CHARS["water"]
terrain = [[""] * w for _ in range(h)]
for y in range(h):
elev_row = elevation[y]
terr_row = terrain[y]
for x in range(w):
e = elev_row[x]
if e > 0.75:
terr_row[x] = mountain
elif e > 0.55:
terr_row[x] = forest
elif e > 0.25:
terr_row[x] = grass
elif e > 0.15:
terr_row[x] = sand
else:
terr_row[x] = water
return terrain
def _generate_rivers(
self, terrain: list[list[str]], elevation: list[list[float]]
) -> None:
"""Generate rivers from mountains to water bodies."""
rng = random.Random(self.seed)
# find some high-elevation starting points
num_rivers = max(5, (self.width * self.height) // 50000)
for _ in range(num_rivers):
# pick random high elevation point
x = rng.randint(0, self.width - 1)
y = rng.randint(0, self.height - 1)
if elevation[y][x] < 0.6:
continue
# trace downhill
visited = set()
while True:
if (x, y) in visited or len(visited) > 200:
break
visited.add((x, y))
# if we hit water or edge, stop
if terrain[y][x] == self.TERRAIN_CHARS["water"]:
break
if x <= 0 or x >= self.width - 1 or y <= 0 or y >= self.height - 1:
break
# carve river
if elevation[y][x] < 0.75: # don't carve through mountains
terrain[y][x] = self.TERRAIN_CHARS["water"]
# find steepest descent to neighbor
current_elevation = elevation[y][x]
best_x, best_y = x, y
best_elevation = current_elevation
for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
nx, ny = x + dx, y + dy
if (
0 <= nx < self.width
and 0 <= ny < self.height
and elevation[ny][nx] < best_elevation
):
best_elevation = elevation[ny][nx]
best_x, best_y = nx, ny
if (best_x, best_y) == (x, y):
break
x, y = best_x, best_y
def _generate_terrain(self) -> list[list[str]]:
"""Generate complete terrain map."""
elevation = self._generate_elevation()
terrain = self._derive_terrain_from_elevation(elevation)
self._generate_rivers(terrain, elevation)
return terrain
def get_tile(self, x: int, y: int) -> str:
"""Return terrain character at position."""
if not (0 <= x < self.width and 0 <= y < self.height):
raise ValueError(f"Coordinates ({x}, {y}) out of bounds")
return self.terrain[y][x]
def is_passable(self, x: int, y: int) -> bool:
"""Check if position is passable."""
tile = self.get_tile(x, y)
return tile not in {self.TERRAIN_CHARS["mountain"], self.TERRAIN_CHARS["water"]}
def get_viewport(
self, cx: int, cy: int, width: int, height: int
) -> list[list[str]]:
"""Return 2D slice of terrain centered on (cx, cy)."""
viewport = []
# calculate bounds
half_width = width // 2
half_height = height // 2
start_y = cy - half_height
end_y = start_y + height
start_x = cx - half_width
end_x = start_x + width
for y in range(start_y, end_y):
row = []
for x in range(start_x, end_x):
# clamp to world bounds
wx = max(0, min(x, self.width - 1))
wy = max(0, min(y, self.height - 1))
row.append(self.terrain[wy][wx])
viewport.append(row)
return viewport

47
tests/test_ansi.py Normal file
View file

@ -0,0 +1,47 @@
from mudlib.render.ansi import RESET, colorize_map, colorize_terrain
def test_colorize_terrain_grass():
"""Grass is colored green."""
result = colorize_terrain(".")
assert "\033[" in result # has ANSI code
assert "." in result
assert RESET in result
def test_colorize_terrain_mountain():
"""Mountain is colored."""
result = colorize_terrain("^")
assert "\033[" in result
assert "^" in result
assert RESET in result
def test_colorize_terrain_water():
"""Water is colored blue."""
result = colorize_terrain("~")
assert "\033[" in result
assert "~" in result
assert RESET in result
def test_colorize_terrain_unknown():
"""Unknown characters pass through unchanged."""
result = colorize_terrain("?")
assert result == "?"
def test_colorize_map():
"""colorize_map returns newline-separated colored rows."""
grid = [
[".", ".", "T"],
["~", "^", "."],
]
result = colorize_map(grid)
assert "\n" in result
lines = result.split("\n")
assert len(lines) == 2
# should have ANSI codes
assert "\033[" in result

266
tests/test_commands.py Normal file
View file

@ -0,0 +1,266 @@
"""Tests for the command system."""
from unittest.mock import AsyncMock, MagicMock
import pytest
from mudlib import commands
from mudlib.commands import look, movement
from mudlib.player import Player
@pytest.fixture
def mock_writer():
writer = MagicMock()
writer.write = MagicMock()
writer.drain = AsyncMock()
return writer
@pytest.fixture
def mock_reader():
return MagicMock()
@pytest.fixture
def mock_world():
world = MagicMock()
world.is_passable = MagicMock(return_value=True)
# Create a 21x11 viewport filled with "."
viewport = [["." for _ in range(21)] for _ in range(11)]
world.get_viewport = MagicMock(return_value=viewport)
return world
@pytest.fixture
def player(mock_reader, mock_writer):
return Player(name="TestPlayer", x=5, y=5, reader=mock_reader, writer=mock_writer)
# Test command registration
def test_register_command():
"""Test that commands can be registered."""
async def test_handler(player, args):
pass
commands.register("test", test_handler)
assert "test" in commands._registry
def test_register_command_with_aliases():
"""Test that command aliases work."""
async def test_handler(player, args):
pass
commands.register("testcmd", test_handler, aliases=["tc", "t"])
assert "testcmd" in commands._registry
assert "tc" in commands._registry
assert "t" in commands._registry
assert commands._registry["testcmd"] == commands._registry["tc"]
assert commands._registry["testcmd"] == commands._registry["t"]
@pytest.mark.asyncio
async def test_dispatch_routes_to_handler(player):
"""Test that dispatch routes input to the correct handler."""
called = False
received_args = None
async def test_handler(p, args):
nonlocal called, received_args
called = True
received_args = args
commands.register("testcmd", test_handler)
await commands.dispatch(player, "testcmd arg1 arg2")
assert called
assert received_args == "arg1 arg2"
@pytest.mark.asyncio
async def test_dispatch_handles_unknown_command(player, mock_writer):
"""Test that unknown commands give feedback."""
await commands.dispatch(player, "unknowncommand")
# Should have written some kind of error message
assert mock_writer.write.called
error_msg = mock_writer.write.call_args[0][0]
assert "unknown" in error_msg.lower() or "not found" in error_msg.lower()
@pytest.mark.asyncio
async def test_dispatch_handles_empty_input(player):
"""Test that empty input doesn't crash."""
await commands.dispatch(player, "")
await commands.dispatch(player, " ")
# Test movement direction parsing
@pytest.mark.parametrize(
"direction,expected_delta",
[
("n", (0, -1)),
("north", (0, -1)),
("s", (0, 1)),
("south", (0, 1)),
("e", (1, 0)),
("east", (1, 0)),
("w", (-1, 0)),
("west", (-1, 0)),
("ne", (1, -1)),
("northeast", (1, -1)),
("nw", (-1, -1)),
("northwest", (-1, -1)),
("se", (1, 1)),
("southeast", (1, 1)),
("sw", (-1, 1)),
("southwest", (-1, 1)),
],
)
def test_direction_deltas(direction, expected_delta):
"""Test that all direction commands map to correct deltas."""
assert movement.DIRECTIONS[direction] == expected_delta
@pytest.mark.asyncio
async def test_movement_updates_position(player, mock_world):
"""Test that movement updates player position when passable."""
# Inject mock world into both movement and look modules
movement.world = mock_world
look.world = mock_world
original_x, original_y = player.x, player.y
await movement.move_north(player, "")
assert player.x == original_x
assert player.y == original_y - 1
assert mock_world.is_passable.called
@pytest.mark.asyncio
async def test_movement_blocked_by_impassable_terrain(player, mock_world, mock_writer):
"""Test that movement is blocked by impassable terrain."""
mock_world.is_passable.return_value = False
movement.world = mock_world
original_x, original_y = player.x, player.y
await movement.move_north(player, "")
# Position should not change
assert player.x == original_x
assert player.y == original_y
# Should send a message to the player
assert mock_writer.write.called
error_msg = mock_writer.write.call_args[0][0]
assert "can't" in error_msg.lower() or "cannot" in error_msg.lower()
@pytest.mark.asyncio
async def test_movement_sends_departure_message(player, mock_world):
"""Test that movement sends departure message to nearby players."""
movement.world = mock_world
look.world = mock_world
# Create another player in the area
other_writer = MagicMock()
other_writer.write = MagicMock()
other_writer.drain = AsyncMock()
other_player = Player(
name="OtherPlayer", x=5, y=4, reader=MagicMock(), writer=other_writer
)
# Register both players
from mudlib.player import players
players.clear()
players[player.name] = player
players[other_player.name] = other_player
await movement.move_north(player, "")
# Other player should have received a departure message
# (We'll check this is called, exact message format is implementation detail)
assert other_player.writer.write.called
@pytest.mark.asyncio
async def test_arrival_message_uses_opposite_direction(player, mock_world):
"""Test that arrival messages use the opposite direction."""
movement.world = mock_world
look.world = mock_world
# Create another player at the destination
other_writer = MagicMock()
other_writer.write = MagicMock()
other_writer.drain = AsyncMock()
other_player = Player(
name="OtherPlayer", x=5, y=3, reader=MagicMock(), writer=other_writer
)
from mudlib.player import players
players.clear()
players[player.name] = player
players[other_player.name] = other_player
# Player at (5, 5) moves north to (5, 4)
await movement.move_north(player, "")
# Other player at (5, 3) should see arrival "from the south"
# (Implementation will determine exact message format)
assert other_player.writer.write.called
# Test look command
@pytest.mark.asyncio
async def test_look_command_sends_viewport(player, mock_world):
"""Test that look command sends the viewport to the player."""
look.world = mock_world
await look.cmd_look(player, "")
assert mock_world.get_viewport.called
assert player.writer.write.called
@pytest.mark.asyncio
async def test_look_command_shows_player_at_center(player, mock_world):
"""Test that look command shows player @ at center."""
look.world = mock_world
await look.cmd_look(player, "")
# Check that the output contains the @ symbol for the player
output = "".join([call[0][0] for call in player.writer.write.call_args_list])
assert "@" in output
@pytest.mark.asyncio
async def test_look_command_shows_other_players(player, mock_world):
"""Test that look command shows other players as *."""
look.world = mock_world
# Create another player in the viewport
other_player = Player(
name="OtherPlayer",
x=6,
y=5,
reader=MagicMock(),
writer=MagicMock(),
)
from mudlib.player import players
players.clear()
players[player.name] = player
players[other_player.name] = other_player
await look.cmd_look(player, "")
# Check that the output contains * for other players
output = "".join([call[0][0] for call in player.writer.write.call_args_list])
assert "*" in output

View file

@ -6,6 +6,7 @@ from unittest.mock import AsyncMock, MagicMock, patch
import pytest import pytest
from mudlib import server from mudlib import server
from mudlib.world.terrain import World
def test_port_constant(): def test_port_constant():
@ -23,28 +24,39 @@ def test_run_server_exists():
assert asyncio.iscoroutinefunction(server.run_server) assert asyncio.iscoroutinefunction(server.run_server)
def test_find_passable_start():
world = World(seed=42, width=100, height=100)
x, y = server.find_passable_start(world, 50, 50)
assert isinstance(x, int)
assert isinstance(y, int)
assert world.is_passable(x, y)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_shell_greets_and_echoes(): async def test_shell_greets_and_accepts_commands():
server._world = World(seed=42, width=100, height=100)
reader = AsyncMock() reader = AsyncMock()
writer = MagicMock() writer = MagicMock()
writer.is_closing.return_value = False writer.is_closing.side_effect = [False, False, False, True]
writer.drain = AsyncMock() writer.drain = AsyncMock()
writer.close = MagicMock() writer.close = MagicMock()
readline = "mudlib.server.readline2" readline = "mudlib.server.readline2"
with patch(readline, new_callable=AsyncMock) as mock_readline: with patch(readline, new_callable=AsyncMock) as mock_readline:
mock_readline.side_effect = ["hello", "quit"] mock_readline.side_effect = ["TestPlayer", "look", "quit"]
await server.shell(reader, writer) await server.shell(reader, writer)
calls = [str(call) for call in writer.write.call_args_list] calls = [str(call) for call in writer.write.call_args_list]
assert any("Welcome" in call for call in calls) assert any("Welcome" in call for call in calls)
assert any("hello" in call for call in calls) assert any("TestPlayer" in call for call in calls)
assert any("Goodbye" in call for call in calls) writer.close.assert_called()
writer.close.assert_called_once()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_shell_handles_eof(): async def test_shell_handles_eof():
server._world = World(seed=42, width=100, height=100)
reader = AsyncMock() reader = AsyncMock()
writer = MagicMock() writer = MagicMock()
writer.is_closing.return_value = False writer.is_closing.return_value = False
@ -61,17 +73,19 @@ async def test_shell_handles_eof():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_shell_handles_quit(): async def test_shell_handles_quit():
server._world = World(seed=42, width=100, height=100)
reader = AsyncMock() reader = AsyncMock()
writer = MagicMock() writer = MagicMock()
writer.is_closing.return_value = False writer.is_closing.side_effect = [False, False, True]
writer.drain = AsyncMock() writer.drain = AsyncMock()
writer.close = MagicMock() writer.close = MagicMock()
readline = "mudlib.server.readline2" readline = "mudlib.server.readline2"
with patch(readline, new_callable=AsyncMock) as mock_readline: with patch(readline, new_callable=AsyncMock) as mock_readline:
mock_readline.return_value = "quit" mock_readline.side_effect = ["TestPlayer", "quit"]
await server.shell(reader, writer) await server.shell(reader, writer)
calls = [str(call) for call in writer.write.call_args_list] calls = [str(call) for call in writer.write.call_args_list]
assert any("Goodbye" in call for call in calls) assert any("Goodbye" in call for call in calls)
writer.close.assert_called_once() writer.close.assert_called()

188
tests/test_terrain.py Normal file
View file

@ -0,0 +1,188 @@
import pytest
from mudlib.world.terrain import World
def test_world_deterministic_from_seed():
"""Same seed produces identical terrain."""
world1 = World(seed=42, width=100, height=100)
world2 = World(seed=42, width=100, height=100)
for y in range(100):
for x in range(100):
assert world1.get_tile(x, y) == world2.get_tile(x, y)
def test_world_different_seeds_produce_different_terrain():
"""Different seeds produce different terrain."""
world1 = World(seed=42, width=100, height=100)
world2 = World(seed=99, width=100, height=100)
different_tiles = 0
for y in range(100):
for x in range(100):
if world1.get_tile(x, y) != world2.get_tile(x, y):
different_tiles += 1
# at least 10% should be different
assert different_tiles > 1000
def test_world_dimensions():
"""World has correct dimensions."""
world = World(seed=42, width=100, height=50)
# should not raise for valid coordinates
world.get_tile(0, 0)
world.get_tile(99, 49)
# should raise for out of bounds
with pytest.raises((IndexError, ValueError)):
world.get_tile(100, 0)
with pytest.raises((IndexError, ValueError)):
world.get_tile(0, 50)
def test_get_tile_returns_valid_terrain():
"""get_tile returns valid terrain characters."""
world = World(seed=42, width=100, height=100)
valid_terrain = {".", "^", "~", "T", ":"}
for y in range(100):
for x in range(100):
tile = world.get_tile(x, y)
assert tile in valid_terrain
def test_is_passable_mountains_impassable():
"""Mountains are impassable."""
world = World(seed=42, width=100, height=100)
for y in range(100):
for x in range(100):
tile = world.get_tile(x, y)
if tile == "^":
assert not world.is_passable(x, y)
def test_is_passable_water_impassable():
"""Water is impassable."""
world = World(seed=42, width=100, height=100)
for y in range(100):
for x in range(100):
tile = world.get_tile(x, y)
if tile == "~":
assert not world.is_passable(x, y)
def test_is_passable_grass_passable():
"""Grass is passable."""
world = World(seed=42, width=100, height=100)
for y in range(100):
for x in range(100):
tile = world.get_tile(x, y)
if tile == ".":
assert world.is_passable(x, y)
def test_is_passable_forest_passable():
"""Forest is passable."""
world = World(seed=42, width=100, height=100)
for y in range(100):
for x in range(100):
tile = world.get_tile(x, y)
if tile == "T":
assert world.is_passable(x, y)
def test_is_passable_sand_passable():
"""Sand is passable."""
world = World(seed=42, width=100, height=100)
for y in range(100):
for x in range(100):
tile = world.get_tile(x, y)
if tile == ":":
assert world.is_passable(x, y)
def test_get_viewport_dimensions():
"""get_viewport returns correct dimensions."""
world = World(seed=42, width=100, height=100)
viewport = world.get_viewport(50, 50, width=10, height=8)
assert len(viewport) == 8
assert len(viewport[0]) == 10
def test_get_viewport_centers_correctly():
"""get_viewport centers on given coordinates."""
world = World(seed=42, width=100, height=100)
viewport = world.get_viewport(50, 50, width=5, height=5)
# center tile should be at position (2, 2) in viewport
center_tile = viewport[2][2]
assert center_tile == world.get_tile(50, 50)
def test_get_viewport_handles_edges():
"""Viewport handles world boundaries correctly."""
world = World(seed=42, width=100, height=100)
# near top-left corner
viewport = world.get_viewport(2, 2, width=5, height=5)
assert len(viewport) == 5
assert len(viewport[0]) == 5
# near bottom-right corner
viewport = world.get_viewport(97, 97, width=5, height=5)
assert len(viewport) == 5
assert len(viewport[0]) == 5
def test_terrain_distribution_reasonable():
"""Terrain has reasonable distribution (not all one type)."""
world = World(seed=42, width=100, height=100)
terrain_counts = {".": 0, "^": 0, "~": 0, "T": 0, ":": 0}
for y in range(100):
for x in range(100):
tile = world.get_tile(x, y)
terrain_counts[tile] += 1
# should have at least 3 different terrain types
types_present = sum(1 for count in terrain_counts.values() if count > 0)
assert types_present >= 3
# no single type should dominate completely (> 95%)
total = sum(terrain_counts.values())
for terrain_type, count in terrain_counts.items():
assert count < 0.95 * total, f"{terrain_type} dominates with {count}/{total}"
def test_rivers_exist():
"""Terrain has some water tiles (rivers/lakes)."""
world = World(seed=42, width=100, height=100)
water_count = 0
for y in range(100):
for x in range(100):
if world.get_tile(x, y) == "~":
water_count += 1
# should have at least some water
assert water_count > 0
def test_large_world_generation():
"""Can generate a 1000x1000 world without errors."""
world = World(seed=42, width=1000, height=1000)
# spot check some tiles
assert world.get_tile(0, 0) in {".", "^", "~", "T", ":"}
assert world.get_tile(500, 500) in {".", "^", "~", "T", ":"}
assert world.get_tile(999, 999) in {".", "^", "~", "T", ":"}

14
worlds/earth/config.toml Normal file
View file

@ -0,0 +1,14 @@
[world]
name = "Earth"
seed = 42
width = 1000
height = 1000
[terrain.colors]
grass = "green"
mountain = "gray"
water = "blue"
forest = "dark_green"
sand = "yellow"
player = "bright_white"
entity = "bright_red"