Add NPC behavior state machine with tests
Adds behavior state tracking to Mob entity with five states: idle, patrol, converse, flee, and working. Each state has specific processing logic: - idle: no-op (existing wander logic handles movement) - patrol: cycles through waypoints with toroidal wrapping support - converse: stationary during player-driven dialogue - flee: moves away from threat coordinates - working: stationary NPC at their post The behavior module is self-contained and testable, ready for integration with mob_ai.py in a later step.
This commit is contained in:
parent
355795a991
commit
369bc5efcb
3 changed files with 389 additions and 0 deletions
|
|
@ -80,3 +80,8 @@ class Mob(Entity):
|
|||
home_x_max: int | None = None
|
||||
home_y_min: int | None = None
|
||||
home_y_max: int | None = None
|
||||
# NPC behavior state machine
|
||||
behavior_state: str = "idle"
|
||||
behavior_data: dict = field(default_factory=dict)
|
||||
npc_name: str | None = None # links to dialogue tree
|
||||
schedule: list | None = None # for future schedule system
|
||||
|
|
|
|||
155
src/mudlib/npc_behavior.py
Normal file
155
src/mudlib/npc_behavior.py
Normal file
|
|
@ -0,0 +1,155 @@
|
|||
"""NPC behavior state machine."""
|
||||
|
||||
from mudlib.entity import Mob
|
||||
from mudlib.zone import Zone
|
||||
|
||||
# Valid behavior states
|
||||
VALID_STATES = {"idle", "patrol", "converse", "flee", "working"}
|
||||
|
||||
|
||||
def transition_state(mob: Mob, new_state: str, data: dict | None = None) -> bool:
|
||||
"""Transition mob to a new behavior state.
|
||||
|
||||
Args:
|
||||
mob: The mob to transition
|
||||
new_state: The target state (must be in VALID_STATES)
|
||||
data: Optional state-specific data
|
||||
|
||||
Returns:
|
||||
True if transition succeeded, False if state is invalid
|
||||
"""
|
||||
if new_state not in VALID_STATES:
|
||||
return False
|
||||
|
||||
mob.behavior_state = new_state
|
||||
mob.behavior_data = data or {}
|
||||
return True
|
||||
|
||||
|
||||
async def process_behavior(mob: Mob, world: Zone) -> None:
|
||||
"""Process mob behavior for current tick.
|
||||
|
||||
Called each game loop tick. Dispatches based on behavior_state:
|
||||
- idle: do nothing (existing wander logic handles movement)
|
||||
- patrol: move through waypoints
|
||||
- converse: do nothing (conversation is player-driven)
|
||||
- flee: do nothing (direction computed by get_flee_direction)
|
||||
- working: do nothing (stationary NPC)
|
||||
|
||||
Args:
|
||||
mob: The mob to process
|
||||
world: The zone the mob is in
|
||||
"""
|
||||
if mob.behavior_state == "idle":
|
||||
# No-op, existing wander logic handles idle movement
|
||||
pass
|
||||
elif mob.behavior_state == "patrol":
|
||||
await _process_patrol(mob, world)
|
||||
elif mob.behavior_state == "converse":
|
||||
# No-op, mob stays put during conversation
|
||||
pass
|
||||
elif mob.behavior_state == "flee":
|
||||
# No-op, mob_ai uses get_flee_direction for movement
|
||||
pass
|
||||
elif mob.behavior_state == "working":
|
||||
# No-op, mob stays at their post
|
||||
pass
|
||||
|
||||
|
||||
async def _process_patrol(mob: Mob, world: Zone) -> None:
|
||||
"""Process patrol behavior: advance waypoint if at current target."""
|
||||
waypoints = mob.behavior_data.get("waypoints", [])
|
||||
if not waypoints:
|
||||
return
|
||||
|
||||
waypoint_index = mob.behavior_data.get("waypoint_index", 0)
|
||||
current_waypoint = waypoints[waypoint_index]
|
||||
|
||||
# Check if at waypoint (use world wrapping for distance)
|
||||
if mob.x == current_waypoint["x"] and mob.y == current_waypoint["y"]:
|
||||
# Advance to next waypoint (loop back to start if at end)
|
||||
waypoint_index = (waypoint_index + 1) % len(waypoints)
|
||||
mob.behavior_data["waypoint_index"] = waypoint_index
|
||||
|
||||
|
||||
def get_flee_direction(mob: Mob, world: Zone) -> str | None:
|
||||
"""Get the cardinal direction to flee away from threat.
|
||||
|
||||
Args:
|
||||
mob: The mob fleeing
|
||||
world: The zone the mob is in
|
||||
|
||||
Returns:
|
||||
Cardinal direction ("north", "south", "east", "west") or None if no threat
|
||||
"""
|
||||
flee_from = mob.behavior_data.get("flee_from")
|
||||
if not flee_from:
|
||||
return None
|
||||
|
||||
threat_x = flee_from["x"]
|
||||
threat_y = flee_from["y"]
|
||||
|
||||
# Calculate direction away from threat
|
||||
dx = mob.x - threat_x
|
||||
dy = mob.y - threat_y
|
||||
|
||||
# Handle toroidal wrapping for shortest distance
|
||||
if world.toroidal:
|
||||
# Adjust dx/dy if wrapping gives shorter distance
|
||||
if abs(dx) > world.width // 2:
|
||||
dx = -(world.width - abs(dx)) * (1 if dx > 0 else -1)
|
||||
if abs(dy) > world.height // 2:
|
||||
dy = -(world.height - abs(dy)) * (1 if dy > 0 else -1)
|
||||
|
||||
# Prefer axis with larger distance (prefer moving away faster)
|
||||
if abs(dx) >= abs(dy):
|
||||
return "east" if dx > 0 else "west" if dx < 0 else None
|
||||
else:
|
||||
return "south" if dy > 0 else "north" if dy < 0 else None
|
||||
|
||||
|
||||
def get_patrol_direction(mob: Mob, world: Zone) -> str | None:
|
||||
"""Get the cardinal direction to move toward current patrol waypoint.
|
||||
|
||||
Args:
|
||||
mob: The mob on patrol
|
||||
world: The zone the mob is in
|
||||
|
||||
Returns:
|
||||
Cardinal direction ("north", "south", "east", "west") or None if at waypoint
|
||||
"""
|
||||
waypoints = mob.behavior_data.get("waypoints", [])
|
||||
if not waypoints:
|
||||
return None
|
||||
|
||||
waypoint_index = mob.behavior_data.get("waypoint_index", 0)
|
||||
current_waypoint = waypoints[waypoint_index]
|
||||
|
||||
target_x = current_waypoint["x"]
|
||||
target_y = current_waypoint["y"]
|
||||
|
||||
# Check if already at waypoint
|
||||
if mob.x == target_x and mob.y == target_y:
|
||||
return None
|
||||
|
||||
# Calculate direction to waypoint
|
||||
dx = target_x - mob.x
|
||||
dy = target_y - mob.y
|
||||
|
||||
# Handle toroidal wrapping (find shortest path)
|
||||
if world.toroidal:
|
||||
# Check if wrapping gives a shorter distance on x axis
|
||||
if abs(dx) > world.width // 2:
|
||||
# Wrapping is shorter, reverse direction
|
||||
dx = -(world.width - abs(dx)) * (1 if dx > 0 else -1)
|
||||
|
||||
# Check if wrapping gives a shorter distance on y axis
|
||||
if abs(dy) > world.height // 2:
|
||||
# Wrapping is shorter, reverse direction
|
||||
dy = -(world.height - abs(dy)) * (1 if dy > 0 else -1)
|
||||
|
||||
# Prefer axis with larger distance (ties prefer x)
|
||||
if abs(dx) >= abs(dy):
|
||||
return "east" if dx > 0 else "west"
|
||||
else:
|
||||
return "south" if dy > 0 else "north"
|
||||
229
tests/test_npc_behavior.py
Normal file
229
tests/test_npc_behavior.py
Normal file
|
|
@ -0,0 +1,229 @@
|
|||
"""Tests for NPC behavior state machine."""
|
||||
|
||||
import pytest
|
||||
|
||||
from mudlib.entity import Mob
|
||||
from mudlib.npc_behavior import (
|
||||
VALID_STATES,
|
||||
get_flee_direction,
|
||||
get_patrol_direction,
|
||||
process_behavior,
|
||||
transition_state,
|
||||
)
|
||||
from mudlib.zone import Zone
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_zone():
|
||||
"""Create a simple test zone for behavior tests."""
|
||||
terrain = [["." for _ in range(256)] for _ in range(256)]
|
||||
zone = Zone(
|
||||
name="testzone",
|
||||
width=256,
|
||||
height=256,
|
||||
toroidal=True,
|
||||
terrain=terrain,
|
||||
impassable=set(),
|
||||
)
|
||||
return zone
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mob(test_zone):
|
||||
"""Create a basic mob for testing."""
|
||||
m = Mob(name="test mob", x=10, y=10)
|
||||
m.location = test_zone
|
||||
return m
|
||||
|
||||
|
||||
class TestTransitionState:
|
||||
def test_valid_transition_sets_state(self, mob):
|
||||
"""transition_state with valid state sets behavior_state."""
|
||||
result = transition_state(mob, "patrol")
|
||||
assert result is True
|
||||
assert mob.behavior_state == "patrol"
|
||||
|
||||
def test_valid_transition_sets_data(self, mob):
|
||||
"""transition_state sets behavior_data when provided."""
|
||||
data = {"waypoints": [{"x": 5, "y": 5}]}
|
||||
result = transition_state(mob, "patrol", data)
|
||||
assert result is True
|
||||
assert mob.behavior_data == data
|
||||
|
||||
def test_valid_transition_clears_data(self, mob):
|
||||
"""transition_state clears data if None provided."""
|
||||
mob.behavior_data = {"old": "data"}
|
||||
result = transition_state(mob, "idle", None)
|
||||
assert result is True
|
||||
assert mob.behavior_data == {}
|
||||
|
||||
def test_invalid_state_returns_false(self, mob):
|
||||
"""transition_state with invalid state returns False."""
|
||||
result = transition_state(mob, "invalid_state")
|
||||
assert result is False
|
||||
|
||||
def test_invalid_state_no_change(self, mob):
|
||||
"""transition_state with invalid state doesn't change mob."""
|
||||
original_state = mob.behavior_state
|
||||
original_data = mob.behavior_data
|
||||
transition_state(mob, "invalid_state")
|
||||
assert mob.behavior_state == original_state
|
||||
assert mob.behavior_data == original_data
|
||||
|
||||
def test_all_valid_states_accepted(self, mob):
|
||||
"""All states in VALID_STATES can be transitioned to."""
|
||||
for state in VALID_STATES:
|
||||
result = transition_state(mob, state)
|
||||
assert result is True
|
||||
assert mob.behavior_state == state
|
||||
|
||||
|
||||
class TestDefaultState:
|
||||
def test_new_mob_starts_idle(self):
|
||||
"""New Mob instance defaults to idle state."""
|
||||
mob = Mob(name="new mob", x=0, y=0)
|
||||
assert mob.behavior_state == "idle"
|
||||
assert mob.behavior_data == {}
|
||||
|
||||
|
||||
class TestPatrolMovement:
|
||||
@pytest.mark.asyncio
|
||||
async def test_patrol_moves_toward_waypoint(self, mob, test_zone):
|
||||
"""Mob on patrol moves toward current waypoint."""
|
||||
waypoints = [{"x": 15, "y": 10}]
|
||||
transition_state(mob, "patrol", {"waypoints": waypoints, "waypoint_index": 0})
|
||||
|
||||
# process_behavior should determine direction but not move
|
||||
# (movement is handled by mob_ai integration later)
|
||||
direction = get_patrol_direction(mob, test_zone)
|
||||
assert direction == "east" # mob at (10, 10) moves east to (15, 10)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_patrol_advances_waypoint(self, mob, test_zone):
|
||||
"""Mob reaching waypoint advances to next."""
|
||||
waypoints = [{"x": 10, "y": 10}, {"x": 15, "y": 15}]
|
||||
transition_state(mob, "patrol", {"waypoints": waypoints, "waypoint_index": 0})
|
||||
|
||||
# At first waypoint already
|
||||
direction = get_patrol_direction(mob, test_zone)
|
||||
assert direction is None # at waypoint
|
||||
# Process behavior to advance
|
||||
await process_behavior(mob, test_zone)
|
||||
assert mob.behavior_data["waypoint_index"] == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_patrol_loops_waypoints(self, mob, test_zone):
|
||||
"""Patrol loops back to first waypoint after last."""
|
||||
waypoints = [{"x": 5, "y": 5}, {"x": 10, "y": 10}]
|
||||
transition_state(mob, "patrol", {"waypoints": waypoints, "waypoint_index": 1})
|
||||
|
||||
# At second waypoint (last one)
|
||||
direction = get_patrol_direction(mob, test_zone)
|
||||
assert direction is None
|
||||
await process_behavior(mob, test_zone)
|
||||
assert mob.behavior_data["waypoint_index"] == 0 # looped back
|
||||
|
||||
def test_patrol_direction_north(self, mob, test_zone):
|
||||
"""get_patrol_direction returns 'north' when waypoint is north."""
|
||||
waypoints = [{"x": 10, "y": 5}]
|
||||
transition_state(mob, "patrol", {"waypoints": waypoints, "waypoint_index": 0})
|
||||
direction = get_patrol_direction(mob, test_zone)
|
||||
assert direction == "north"
|
||||
|
||||
def test_patrol_direction_south(self, mob, test_zone):
|
||||
"""get_patrol_direction returns 'south' when waypoint is south."""
|
||||
waypoints = [{"x": 10, "y": 15}]
|
||||
transition_state(mob, "patrol", {"waypoints": waypoints, "waypoint_index": 0})
|
||||
direction = get_patrol_direction(mob, test_zone)
|
||||
assert direction == "south"
|
||||
|
||||
def test_patrol_direction_west(self, mob, test_zone):
|
||||
"""get_patrol_direction returns 'west' when waypoint is west."""
|
||||
waypoints = [{"x": 5, "y": 10}]
|
||||
transition_state(mob, "patrol", {"waypoints": waypoints, "waypoint_index": 0})
|
||||
direction = get_patrol_direction(mob, test_zone)
|
||||
assert direction == "west"
|
||||
|
||||
def test_patrol_direction_with_wrapping(self, mob, test_zone):
|
||||
"""get_patrol_direction handles toroidal wrapping."""
|
||||
# Mob at (10, 10), waypoint at (250, 10)
|
||||
# Direct path: 240 tiles east
|
||||
# Wrapped path: 20 tiles west (256 - 240 + 10)
|
||||
waypoints = [{"x": 250, "y": 10}]
|
||||
transition_state(mob, "patrol", {"waypoints": waypoints, "waypoint_index": 0})
|
||||
direction = get_patrol_direction(mob, test_zone)
|
||||
assert direction == "west" # wrapping is shorter
|
||||
|
||||
|
||||
class TestConverseState:
|
||||
@pytest.mark.asyncio
|
||||
async def test_converse_state_no_movement(self, mob, test_zone):
|
||||
"""Mob in converse state doesn't move."""
|
||||
original_x, original_y = mob.x, mob.y
|
||||
transition_state(mob, "converse")
|
||||
await process_behavior(mob, test_zone)
|
||||
assert mob.x == original_x
|
||||
assert mob.y == original_y
|
||||
|
||||
|
||||
class TestFleeState:
|
||||
def test_flee_direction_west(self, mob, test_zone):
|
||||
"""get_flee_direction returns 'west' when fleeing from threat to the east."""
|
||||
# Threat at (15, 10), mob at (10, 10)
|
||||
# Should flee west (away from threat)
|
||||
transition_state(mob, "flee", {"flee_from": {"x": 15, "y": 10}})
|
||||
direction = get_flee_direction(mob, test_zone)
|
||||
assert direction == "west"
|
||||
|
||||
def test_flee_direction_east(self, mob, test_zone):
|
||||
"""get_flee_direction returns 'east' when fleeing from threat to the west."""
|
||||
# Threat at (5, 10), mob at (10, 10)
|
||||
# Should flee east (away from threat)
|
||||
transition_state(mob, "flee", {"flee_from": {"x": 5, "y": 10}})
|
||||
direction = get_flee_direction(mob, test_zone)
|
||||
assert direction == "east"
|
||||
|
||||
def test_flee_direction_north(self, mob, test_zone):
|
||||
"""get_flee_direction returns 'north' when fleeing from threat to the south."""
|
||||
# Threat at (10, 15), mob at (10, 10)
|
||||
# Should flee north (away from threat)
|
||||
transition_state(mob, "flee", {"flee_from": {"x": 10, "y": 15}})
|
||||
direction = get_flee_direction(mob, test_zone)
|
||||
assert direction == "north"
|
||||
|
||||
def test_flee_direction_south(self, mob, test_zone):
|
||||
"""get_flee_direction returns 'south' when fleeing from threat to the north."""
|
||||
# Threat at (10, 5), mob at (10, 10)
|
||||
# Should flee south (away from threat)
|
||||
transition_state(mob, "flee", {"flee_from": {"x": 10, "y": 5}})
|
||||
direction = get_flee_direction(mob, test_zone)
|
||||
assert direction == "south"
|
||||
|
||||
def test_flee_no_threat(self, mob, test_zone):
|
||||
"""get_flee_direction returns None when no threat specified."""
|
||||
transition_state(mob, "flee", {})
|
||||
direction = get_flee_direction(mob, test_zone)
|
||||
assert direction is None
|
||||
|
||||
|
||||
class TestWorkingState:
|
||||
@pytest.mark.asyncio
|
||||
async def test_working_state_no_movement(self, mob, test_zone):
|
||||
"""Mob in working state stays put."""
|
||||
original_x, original_y = mob.x, mob.y
|
||||
transition_state(mob, "working")
|
||||
await process_behavior(mob, test_zone)
|
||||
assert mob.x == original_x
|
||||
assert mob.y == original_y
|
||||
|
||||
|
||||
class TestIdleState:
|
||||
@pytest.mark.asyncio
|
||||
async def test_idle_state_no_action(self, mob, test_zone):
|
||||
"""Idle state does nothing (movement handled by mob_ai)."""
|
||||
original_x, original_y = mob.x, mob.y
|
||||
transition_state(mob, "idle")
|
||||
await process_behavior(mob, test_zone)
|
||||
# process_behavior is a no-op for idle
|
||||
assert mob.x == original_x
|
||||
assert mob.y == original_y
|
||||
Loading…
Reference in a new issue