421 lines
12 KiB
Python
421 lines
12 KiB
Python
"""Tests for combat engine and encounter management."""
|
|
|
|
import time
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from mudlib.combat.encounter import CombatState
|
|
from mudlib.combat.engine import (
|
|
active_encounters,
|
|
end_encounter,
|
|
get_encounter,
|
|
process_combat,
|
|
start_encounter,
|
|
)
|
|
from mudlib.combat.moves import CombatMove
|
|
from mudlib.entity import Entity
|
|
from mudlib.player import Player
|
|
|
|
|
|
def _mock_writer():
|
|
writer = MagicMock()
|
|
writer.write = MagicMock()
|
|
writer.drain = AsyncMock()
|
|
return writer
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def clear_encounters():
|
|
"""Clear encounters before and after each test."""
|
|
active_encounters.clear()
|
|
yield
|
|
active_encounters.clear()
|
|
|
|
|
|
@pytest.fixture
|
|
def attacker():
|
|
return Entity(name="Goku", x=0, y=0, pl=100.0, stamina=50.0)
|
|
|
|
|
|
@pytest.fixture
|
|
def defender():
|
|
return Entity(name="Vegeta", x=0, y=0, pl=100.0, stamina=50.0)
|
|
|
|
|
|
@pytest.fixture
|
|
def punch():
|
|
return CombatMove(
|
|
name="punch right",
|
|
move_type="attack",
|
|
stamina_cost=5.0,
|
|
timing_window_ms=800,
|
|
damage_pct=0.15,
|
|
countered_by=["dodge left"],
|
|
)
|
|
|
|
|
|
def test_start_encounter_creates_encounter(attacker, defender):
|
|
"""Test starting an encounter creates and registers it."""
|
|
encounter = start_encounter(attacker, defender)
|
|
|
|
assert encounter is not None
|
|
assert encounter.attacker is attacker
|
|
assert encounter.defender is defender
|
|
assert encounter in active_encounters
|
|
|
|
|
|
def test_get_encounter_finds_by_attacker(attacker, defender):
|
|
"""Test get_encounter finds encounter by attacker."""
|
|
encounter = start_encounter(attacker, defender)
|
|
found = get_encounter(attacker)
|
|
|
|
assert found is encounter
|
|
|
|
|
|
def test_get_encounter_finds_by_defender(attacker, defender):
|
|
"""Test get_encounter finds encounter by defender."""
|
|
encounter = start_encounter(attacker, defender)
|
|
found = get_encounter(defender)
|
|
|
|
assert found is encounter
|
|
|
|
|
|
def test_get_encounter_returns_none_if_not_in_combat(attacker):
|
|
"""Test get_encounter returns None for entity not in combat."""
|
|
found = get_encounter(attacker)
|
|
assert found is None
|
|
|
|
|
|
def test_end_encounter_removes_from_active_list(attacker, defender):
|
|
"""Test ending encounter removes it from active list."""
|
|
encounter = start_encounter(attacker, defender)
|
|
end_encounter(encounter)
|
|
|
|
assert encounter not in active_encounters
|
|
assert get_encounter(attacker) is None
|
|
assert get_encounter(defender) is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_combat_advances_encounters(attacker, defender, punch):
|
|
"""Test process_combat advances all active encounters."""
|
|
encounter = start_encounter(attacker, defender)
|
|
encounter.attack(punch)
|
|
|
|
# Process combat should advance state from TELEGRAPH to WINDOW
|
|
time.sleep(0.31)
|
|
await process_combat()
|
|
|
|
assert encounter.state == CombatState.WINDOW
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_combat_handles_multiple_encounters(punch):
|
|
"""Test process_combat handles multiple simultaneous encounters."""
|
|
e1_attacker = Entity(name="A1", x=0, y=0)
|
|
e1_defender = Entity(name="D1", x=0, y=0)
|
|
e2_attacker = Entity(name="A2", x=10, y=10)
|
|
e2_defender = Entity(name="D2", x=10, y=10)
|
|
|
|
enc1 = start_encounter(e1_attacker, e1_defender)
|
|
enc2 = start_encounter(e2_attacker, e2_defender)
|
|
|
|
enc1.attack(punch)
|
|
enc2.attack(punch)
|
|
|
|
time.sleep(0.31)
|
|
await process_combat()
|
|
|
|
assert enc1.state == CombatState.WINDOW
|
|
assert enc2.state == CombatState.WINDOW
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_combat_auto_resolves_expired_windows(attacker, defender, punch):
|
|
"""Test process_combat auto-resolves when window expires."""
|
|
encounter = start_encounter(attacker, defender)
|
|
encounter.attack(punch)
|
|
initial_pl = defender.pl
|
|
|
|
# Skip past telegraph and window
|
|
time.sleep(0.31) # Telegraph
|
|
await process_combat()
|
|
assert encounter.state == CombatState.WINDOW
|
|
|
|
time.sleep(0.85) # Window
|
|
await process_combat()
|
|
# Should auto-resolve and return to IDLE
|
|
assert encounter.state == CombatState.IDLE
|
|
# Damage should have been applied (no defense = 1.5x damage)
|
|
assert defender.pl < initial_pl
|
|
|
|
|
|
def test_start_encounter_when_already_in_combat(attacker, defender):
|
|
"""Test starting encounter when attacker already in combat raises error."""
|
|
start_encounter(attacker, defender)
|
|
|
|
other = Entity(name="Other", x=5, y=5)
|
|
with pytest.raises(ValueError, match="already in combat"):
|
|
start_encounter(attacker, other)
|
|
|
|
|
|
def test_start_encounter_when_defender_in_combat(attacker, defender):
|
|
"""Test starting encounter when defender already in combat raises error."""
|
|
start_encounter(attacker, defender)
|
|
|
|
other = Entity(name="Other", x=5, y=5)
|
|
with pytest.raises(ValueError, match="already in combat"):
|
|
start_encounter(other, defender)
|
|
|
|
|
|
def test_active_encounters_list():
|
|
"""Test active_encounters is a list."""
|
|
assert isinstance(active_encounters, list)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_combat_with_no_encounters():
|
|
"""Test process_combat handles empty encounter list."""
|
|
await process_combat() # Should not raise
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_encounter_cleanup_after_resolution(attacker, defender, punch):
|
|
"""Test encounter can be ended after resolution."""
|
|
encounter = start_encounter(attacker, defender)
|
|
encounter.attack(punch)
|
|
|
|
# Advance to resolution
|
|
time.sleep(0.31)
|
|
await process_combat()
|
|
time.sleep(0.85)
|
|
await process_combat()
|
|
|
|
# Resolve
|
|
encounter.resolve()
|
|
|
|
# Now end it
|
|
end_encounter(encounter)
|
|
assert get_encounter(attacker) is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_combat_ends_encounter_on_knockout(punch):
|
|
"""Test process_combat ends encounter when defender is knocked out."""
|
|
w = _mock_writer
|
|
attacker = Player(name="Goku", x=0, y=0, pl=100.0, stamina=50.0, writer=w())
|
|
defender = Player(name="Vegeta", x=0, y=0, pl=10.0, stamina=50.0, writer=w())
|
|
|
|
# Push combat mode onto both stacks
|
|
attacker.mode_stack.append("combat")
|
|
defender.mode_stack.append("combat")
|
|
|
|
encounter = start_encounter(attacker, defender)
|
|
encounter.attack(punch)
|
|
|
|
# Advance to resolution
|
|
time.sleep(0.31)
|
|
await process_combat()
|
|
time.sleep(0.85)
|
|
await process_combat()
|
|
|
|
# Combat should have ended and been cleaned up
|
|
assert get_encounter(attacker) is None
|
|
assert get_encounter(defender) is None
|
|
# Mode stacks should have combat popped
|
|
assert attacker.mode_stack == ["normal"]
|
|
assert defender.mode_stack == ["normal"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_combat_ends_encounter_on_exhaustion(punch):
|
|
"""Test process_combat ends encounter when attacker is exhausted."""
|
|
w = _mock_writer
|
|
attacker = Player(
|
|
name="Goku",
|
|
x=0,
|
|
y=0,
|
|
pl=100.0,
|
|
stamina=punch.stamina_cost,
|
|
writer=w(),
|
|
)
|
|
defender = Player(
|
|
name="Vegeta",
|
|
x=0,
|
|
y=0,
|
|
pl=100.0,
|
|
stamina=50.0,
|
|
writer=w(),
|
|
)
|
|
|
|
# Push combat mode onto both stacks
|
|
attacker.mode_stack.append("combat")
|
|
defender.mode_stack.append("combat")
|
|
|
|
encounter = start_encounter(attacker, defender)
|
|
encounter.attack(punch)
|
|
|
|
# Advance to resolution
|
|
time.sleep(0.31)
|
|
await process_combat()
|
|
time.sleep(0.85)
|
|
await process_combat()
|
|
|
|
# Combat should have ended
|
|
assert get_encounter(attacker) is None
|
|
assert get_encounter(defender) is None
|
|
assert attacker.mode_stack == ["normal"]
|
|
assert defender.mode_stack == ["normal"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_combat_continues_with_resources(punch):
|
|
"""Test process_combat continues encounter when both have resources."""
|
|
w = _mock_writer
|
|
attacker = Player(name="Goku", x=0, y=0, pl=100.0, stamina=50.0, writer=w())
|
|
defender = Player(name="Vegeta", x=0, y=0, pl=100.0, stamina=50.0, writer=w())
|
|
|
|
# Push combat mode onto both stacks
|
|
attacker.mode_stack.append("combat")
|
|
defender.mode_stack.append("combat")
|
|
|
|
encounter = start_encounter(attacker, defender)
|
|
encounter.attack(punch)
|
|
|
|
# Advance to resolution
|
|
time.sleep(0.31)
|
|
await process_combat()
|
|
time.sleep(0.85)
|
|
await process_combat()
|
|
|
|
# Combat should still be active (but in IDLE state)
|
|
assert get_encounter(attacker) is encounter
|
|
assert get_encounter(defender) is encounter
|
|
assert attacker.mode_stack == ["normal", "combat"]
|
|
assert defender.mode_stack == ["normal", "combat"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_combat_sends_messages_on_resolve(punch):
|
|
"""Test process_combat sends resolution messages to both players."""
|
|
mock_writer = _mock_writer()
|
|
attacker = Player(name="Goku", x=0, y=0, pl=100.0, stamina=50.0, writer=mock_writer)
|
|
defender_writer = _mock_writer()
|
|
defender = Player(
|
|
name="Vegeta", x=0, y=0, pl=100.0, stamina=50.0, writer=defender_writer
|
|
)
|
|
|
|
attacker.mode_stack.append("combat")
|
|
defender.mode_stack.append("combat")
|
|
|
|
encounter = start_encounter(attacker, defender)
|
|
encounter.attack(punch)
|
|
|
|
# Advance past telegraph and window
|
|
time.sleep(0.31)
|
|
await process_combat()
|
|
time.sleep(0.85)
|
|
|
|
mock_writer.write.reset_mock()
|
|
defender_writer.write.reset_mock()
|
|
|
|
await process_combat()
|
|
|
|
# Both players should have received messages
|
|
attacker_msgs = [call[0][0] for call in mock_writer.write.call_args_list]
|
|
defender_msgs = [call[0][0] for call in defender_writer.write.call_args_list]
|
|
|
|
assert len(attacker_msgs) > 0
|
|
assert len(defender_msgs) > 0
|
|
|
|
|
|
# --- Idle timeout tests ---
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_idle_timeout_ends_encounter():
|
|
"""Test encounter times out after 30s of no actions."""
|
|
w = _mock_writer
|
|
attacker = Player(name="Goku", x=0, y=0, writer=w())
|
|
defender = Player(name="Vegeta", x=0, y=0, writer=w())
|
|
attacker.mode_stack.append("combat")
|
|
defender.mode_stack.append("combat")
|
|
|
|
encounter = start_encounter(attacker, defender)
|
|
# Force last_action_at to 31 seconds ago
|
|
encounter.last_action_at = time.monotonic() - 31
|
|
|
|
await process_combat()
|
|
|
|
assert get_encounter(attacker) is None
|
|
assert get_encounter(defender) is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_idle_timeout_sends_message():
|
|
"""Test timeout sends fizzle message to both players."""
|
|
atk_writer = _mock_writer()
|
|
def_writer = _mock_writer()
|
|
attacker = Player(name="Goku", x=0, y=0, writer=atk_writer)
|
|
defender = Player(name="Vegeta", x=0, y=0, writer=def_writer)
|
|
attacker.mode_stack.append("combat")
|
|
defender.mode_stack.append("combat")
|
|
|
|
encounter = start_encounter(attacker, defender)
|
|
encounter.last_action_at = time.monotonic() - 31
|
|
|
|
await process_combat()
|
|
|
|
atk_msgs = [c[0][0] for c in atk_writer.write.call_args_list]
|
|
def_msgs = [c[0][0] for c in def_writer.write.call_args_list]
|
|
assert any("fizzled" in msg.lower() for msg in atk_msgs)
|
|
assert any("fizzled" in msg.lower() for msg in def_msgs)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_idle_timeout_pops_combat_mode():
|
|
"""Test timeout pops combat mode from both players."""
|
|
w = _mock_writer
|
|
attacker = Player(name="Goku", x=0, y=0, writer=w())
|
|
defender = Player(name="Vegeta", x=0, y=0, writer=w())
|
|
attacker.mode_stack.append("combat")
|
|
defender.mode_stack.append("combat")
|
|
|
|
encounter = start_encounter(attacker, defender)
|
|
encounter.last_action_at = time.monotonic() - 31
|
|
|
|
await process_combat()
|
|
|
|
assert attacker.mode_stack == ["normal"]
|
|
assert defender.mode_stack == ["normal"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_recent_action_prevents_timeout():
|
|
"""Test recent action prevents idle timeout."""
|
|
w = _mock_writer
|
|
attacker = Player(name="Goku", x=0, y=0, writer=w())
|
|
defender = Player(name="Vegeta", x=0, y=0, writer=w())
|
|
attacker.mode_stack.append("combat")
|
|
defender.mode_stack.append("combat")
|
|
|
|
encounter = start_encounter(attacker, defender)
|
|
# last_action_at was set to now by start_encounter
|
|
|
|
await process_combat()
|
|
|
|
# Should still be active
|
|
assert get_encounter(attacker) is encounter
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_encounter_sets_last_action_at():
|
|
"""Test start_encounter initializes last_action_at."""
|
|
attacker = Entity(name="Goku", x=0, y=0)
|
|
defender = Entity(name="Vegeta", x=0, y=0)
|
|
|
|
before = time.monotonic()
|
|
encounter = start_encounter(attacker, defender)
|
|
|
|
assert encounter.last_action_at >= before
|