diff --git a/src/mudlib/entity.py b/src/mudlib/entity.py index d00e110..7e99bec 100644 --- a/src/mudlib/entity.py +++ b/src/mudlib/entity.py @@ -80,3 +80,8 @@ class Mob(Entity): home_x_max: int | None = None home_y_min: int | None = None home_y_max: int | None = None + # NPC behavior state machine + behavior_state: str = "idle" + behavior_data: dict = field(default_factory=dict) + npc_name: str | None = None # links to dialogue tree + schedule: list | None = None # for future schedule system diff --git a/src/mudlib/npc_behavior.py b/src/mudlib/npc_behavior.py new file mode 100644 index 0000000..0918d82 --- /dev/null +++ b/src/mudlib/npc_behavior.py @@ -0,0 +1,155 @@ +"""NPC behavior state machine.""" + +from mudlib.entity import Mob +from mudlib.zone import Zone + +# Valid behavior states +VALID_STATES = {"idle", "patrol", "converse", "flee", "working"} + + +def transition_state(mob: Mob, new_state: str, data: dict | None = None) -> bool: + """Transition mob to a new behavior state. + + Args: + mob: The mob to transition + new_state: The target state (must be in VALID_STATES) + data: Optional state-specific data + + Returns: + True if transition succeeded, False if state is invalid + """ + if new_state not in VALID_STATES: + return False + + mob.behavior_state = new_state + mob.behavior_data = data or {} + return True + + +async def process_behavior(mob: Mob, world: Zone) -> None: + """Process mob behavior for current tick. + + Called each game loop tick. Dispatches based on behavior_state: + - idle: do nothing (existing wander logic handles movement) + - patrol: move through waypoints + - converse: do nothing (conversation is player-driven) + - flee: do nothing (direction computed by get_flee_direction) + - working: do nothing (stationary NPC) + + Args: + mob: The mob to process + world: The zone the mob is in + """ + if mob.behavior_state == "idle": + # No-op, existing wander logic handles idle movement + pass + elif mob.behavior_state == "patrol": + await _process_patrol(mob, world) + elif mob.behavior_state == "converse": + # No-op, mob stays put during conversation + pass + elif mob.behavior_state == "flee": + # No-op, mob_ai uses get_flee_direction for movement + pass + elif mob.behavior_state == "working": + # No-op, mob stays at their post + pass + + +async def _process_patrol(mob: Mob, world: Zone) -> None: + """Process patrol behavior: advance waypoint if at current target.""" + waypoints = mob.behavior_data.get("waypoints", []) + if not waypoints: + return + + waypoint_index = mob.behavior_data.get("waypoint_index", 0) + current_waypoint = waypoints[waypoint_index] + + # Check if at waypoint (use world wrapping for distance) + if mob.x == current_waypoint["x"] and mob.y == current_waypoint["y"]: + # Advance to next waypoint (loop back to start if at end) + waypoint_index = (waypoint_index + 1) % len(waypoints) + mob.behavior_data["waypoint_index"] = waypoint_index + + +def get_flee_direction(mob: Mob, world: Zone) -> str | None: + """Get the cardinal direction to flee away from threat. + + Args: + mob: The mob fleeing + world: The zone the mob is in + + Returns: + Cardinal direction ("north", "south", "east", "west") or None if no threat + """ + flee_from = mob.behavior_data.get("flee_from") + if not flee_from: + return None + + threat_x = flee_from["x"] + threat_y = flee_from["y"] + + # Calculate direction away from threat + dx = mob.x - threat_x + dy = mob.y - threat_y + + # Handle toroidal wrapping for shortest distance + if world.toroidal: + # Adjust dx/dy if wrapping gives shorter distance + if abs(dx) > world.width // 2: + dx = -(world.width - abs(dx)) * (1 if dx > 0 else -1) + if abs(dy) > world.height // 2: + dy = -(world.height - abs(dy)) * (1 if dy > 0 else -1) + + # Prefer axis with larger distance (prefer moving away faster) + if abs(dx) >= abs(dy): + return "east" if dx > 0 else "west" if dx < 0 else None + else: + return "south" if dy > 0 else "north" if dy < 0 else None + + +def get_patrol_direction(mob: Mob, world: Zone) -> str | None: + """Get the cardinal direction to move toward current patrol waypoint. + + Args: + mob: The mob on patrol + world: The zone the mob is in + + Returns: + Cardinal direction ("north", "south", "east", "west") or None if at waypoint + """ + waypoints = mob.behavior_data.get("waypoints", []) + if not waypoints: + return None + + waypoint_index = mob.behavior_data.get("waypoint_index", 0) + current_waypoint = waypoints[waypoint_index] + + target_x = current_waypoint["x"] + target_y = current_waypoint["y"] + + # Check if already at waypoint + if mob.x == target_x and mob.y == target_y: + return None + + # Calculate direction to waypoint + dx = target_x - mob.x + dy = target_y - mob.y + + # Handle toroidal wrapping (find shortest path) + if world.toroidal: + # Check if wrapping gives a shorter distance on x axis + if abs(dx) > world.width // 2: + # Wrapping is shorter, reverse direction + dx = -(world.width - abs(dx)) * (1 if dx > 0 else -1) + + # Check if wrapping gives a shorter distance on y axis + if abs(dy) > world.height // 2: + # Wrapping is shorter, reverse direction + dy = -(world.height - abs(dy)) * (1 if dy > 0 else -1) + + # Prefer axis with larger distance (ties prefer x) + if abs(dx) >= abs(dy): + return "east" if dx > 0 else "west" + else: + return "south" if dy > 0 else "north" diff --git a/tests/test_npc_behavior.py b/tests/test_npc_behavior.py new file mode 100644 index 0000000..f8a1494 --- /dev/null +++ b/tests/test_npc_behavior.py @@ -0,0 +1,229 @@ +"""Tests for NPC behavior state machine.""" + +import pytest + +from mudlib.entity import Mob +from mudlib.npc_behavior import ( + VALID_STATES, + get_flee_direction, + get_patrol_direction, + process_behavior, + transition_state, +) +from mudlib.zone import Zone + + +@pytest.fixture +def test_zone(): + """Create a simple test zone for behavior tests.""" + terrain = [["." for _ in range(256)] for _ in range(256)] + zone = Zone( + name="testzone", + width=256, + height=256, + toroidal=True, + terrain=terrain, + impassable=set(), + ) + return zone + + +@pytest.fixture +def mob(test_zone): + """Create a basic mob for testing.""" + m = Mob(name="test mob", x=10, y=10) + m.location = test_zone + return m + + +class TestTransitionState: + def test_valid_transition_sets_state(self, mob): + """transition_state with valid state sets behavior_state.""" + result = transition_state(mob, "patrol") + assert result is True + assert mob.behavior_state == "patrol" + + def test_valid_transition_sets_data(self, mob): + """transition_state sets behavior_data when provided.""" + data = {"waypoints": [{"x": 5, "y": 5}]} + result = transition_state(mob, "patrol", data) + assert result is True + assert mob.behavior_data == data + + def test_valid_transition_clears_data(self, mob): + """transition_state clears data if None provided.""" + mob.behavior_data = {"old": "data"} + result = transition_state(mob, "idle", None) + assert result is True + assert mob.behavior_data == {} + + def test_invalid_state_returns_false(self, mob): + """transition_state with invalid state returns False.""" + result = transition_state(mob, "invalid_state") + assert result is False + + def test_invalid_state_no_change(self, mob): + """transition_state with invalid state doesn't change mob.""" + original_state = mob.behavior_state + original_data = mob.behavior_data + transition_state(mob, "invalid_state") + assert mob.behavior_state == original_state + assert mob.behavior_data == original_data + + def test_all_valid_states_accepted(self, mob): + """All states in VALID_STATES can be transitioned to.""" + for state in VALID_STATES: + result = transition_state(mob, state) + assert result is True + assert mob.behavior_state == state + + +class TestDefaultState: + def test_new_mob_starts_idle(self): + """New Mob instance defaults to idle state.""" + mob = Mob(name="new mob", x=0, y=0) + assert mob.behavior_state == "idle" + assert mob.behavior_data == {} + + +class TestPatrolMovement: + @pytest.mark.asyncio + async def test_patrol_moves_toward_waypoint(self, mob, test_zone): + """Mob on patrol moves toward current waypoint.""" + waypoints = [{"x": 15, "y": 10}] + transition_state(mob, "patrol", {"waypoints": waypoints, "waypoint_index": 0}) + + # process_behavior should determine direction but not move + # (movement is handled by mob_ai integration later) + direction = get_patrol_direction(mob, test_zone) + assert direction == "east" # mob at (10, 10) moves east to (15, 10) + + @pytest.mark.asyncio + async def test_patrol_advances_waypoint(self, mob, test_zone): + """Mob reaching waypoint advances to next.""" + waypoints = [{"x": 10, "y": 10}, {"x": 15, "y": 15}] + transition_state(mob, "patrol", {"waypoints": waypoints, "waypoint_index": 0}) + + # At first waypoint already + direction = get_patrol_direction(mob, test_zone) + assert direction is None # at waypoint + # Process behavior to advance + await process_behavior(mob, test_zone) + assert mob.behavior_data["waypoint_index"] == 1 + + @pytest.mark.asyncio + async def test_patrol_loops_waypoints(self, mob, test_zone): + """Patrol loops back to first waypoint after last.""" + waypoints = [{"x": 5, "y": 5}, {"x": 10, "y": 10}] + transition_state(mob, "patrol", {"waypoints": waypoints, "waypoint_index": 1}) + + # At second waypoint (last one) + direction = get_patrol_direction(mob, test_zone) + assert direction is None + await process_behavior(mob, test_zone) + assert mob.behavior_data["waypoint_index"] == 0 # looped back + + def test_patrol_direction_north(self, mob, test_zone): + """get_patrol_direction returns 'north' when waypoint is north.""" + waypoints = [{"x": 10, "y": 5}] + transition_state(mob, "patrol", {"waypoints": waypoints, "waypoint_index": 0}) + direction = get_patrol_direction(mob, test_zone) + assert direction == "north" + + def test_patrol_direction_south(self, mob, test_zone): + """get_patrol_direction returns 'south' when waypoint is south.""" + waypoints = [{"x": 10, "y": 15}] + transition_state(mob, "patrol", {"waypoints": waypoints, "waypoint_index": 0}) + direction = get_patrol_direction(mob, test_zone) + assert direction == "south" + + def test_patrol_direction_west(self, mob, test_zone): + """get_patrol_direction returns 'west' when waypoint is west.""" + waypoints = [{"x": 5, "y": 10}] + transition_state(mob, "patrol", {"waypoints": waypoints, "waypoint_index": 0}) + direction = get_patrol_direction(mob, test_zone) + assert direction == "west" + + def test_patrol_direction_with_wrapping(self, mob, test_zone): + """get_patrol_direction handles toroidal wrapping.""" + # Mob at (10, 10), waypoint at (250, 10) + # Direct path: 240 tiles east + # Wrapped path: 20 tiles west (256 - 240 + 10) + waypoints = [{"x": 250, "y": 10}] + transition_state(mob, "patrol", {"waypoints": waypoints, "waypoint_index": 0}) + direction = get_patrol_direction(mob, test_zone) + assert direction == "west" # wrapping is shorter + + +class TestConverseState: + @pytest.mark.asyncio + async def test_converse_state_no_movement(self, mob, test_zone): + """Mob in converse state doesn't move.""" + original_x, original_y = mob.x, mob.y + transition_state(mob, "converse") + await process_behavior(mob, test_zone) + assert mob.x == original_x + assert mob.y == original_y + + +class TestFleeState: + def test_flee_direction_west(self, mob, test_zone): + """get_flee_direction returns 'west' when fleeing from threat to the east.""" + # Threat at (15, 10), mob at (10, 10) + # Should flee west (away from threat) + transition_state(mob, "flee", {"flee_from": {"x": 15, "y": 10}}) + direction = get_flee_direction(mob, test_zone) + assert direction == "west" + + def test_flee_direction_east(self, mob, test_zone): + """get_flee_direction returns 'east' when fleeing from threat to the west.""" + # Threat at (5, 10), mob at (10, 10) + # Should flee east (away from threat) + transition_state(mob, "flee", {"flee_from": {"x": 5, "y": 10}}) + direction = get_flee_direction(mob, test_zone) + assert direction == "east" + + def test_flee_direction_north(self, mob, test_zone): + """get_flee_direction returns 'north' when fleeing from threat to the south.""" + # Threat at (10, 15), mob at (10, 10) + # Should flee north (away from threat) + transition_state(mob, "flee", {"flee_from": {"x": 10, "y": 15}}) + direction = get_flee_direction(mob, test_zone) + assert direction == "north" + + def test_flee_direction_south(self, mob, test_zone): + """get_flee_direction returns 'south' when fleeing from threat to the north.""" + # Threat at (10, 5), mob at (10, 10) + # Should flee south (away from threat) + transition_state(mob, "flee", {"flee_from": {"x": 10, "y": 5}}) + direction = get_flee_direction(mob, test_zone) + assert direction == "south" + + def test_flee_no_threat(self, mob, test_zone): + """get_flee_direction returns None when no threat specified.""" + transition_state(mob, "flee", {}) + direction = get_flee_direction(mob, test_zone) + assert direction is None + + +class TestWorkingState: + @pytest.mark.asyncio + async def test_working_state_no_movement(self, mob, test_zone): + """Mob in working state stays put.""" + original_x, original_y = mob.x, mob.y + transition_state(mob, "working") + await process_behavior(mob, test_zone) + assert mob.x == original_x + assert mob.y == original_y + + +class TestIdleState: + @pytest.mark.asyncio + async def test_idle_state_no_action(self, mob, test_zone): + """Idle state does nothing (movement handled by mob_ai).""" + original_x, original_y = mob.x, mob.y + transition_state(mob, "idle") + await process_behavior(mob, test_zone) + # process_behavior is a no-op for idle + assert mob.x == original_x + assert mob.y == original_y