The server never proactively offered GMCP or MSDP to clients, so telnetlib3 logged "cannot send MSDP without negotiation" every second. Now the server sends WILL GMCP and WILL MSDP on connection, and send_msdp_vitals checks negotiation state before attempting to send.
551 lines
16 KiB
Python
551 lines
16 KiB
Python
"""Tests for GMCP and MSDP support."""
|
|
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from mudlib.gmcp import (
|
|
send_char_status,
|
|
send_char_vitals,
|
|
send_map_data,
|
|
send_msdp_vitals,
|
|
send_room_info,
|
|
)
|
|
from mudlib.player import Player, players
|
|
from mudlib.zone import Zone
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def clear_state():
|
|
"""Clear players before and after each test."""
|
|
players.clear()
|
|
yield
|
|
players.clear()
|
|
|
|
|
|
@pytest.fixture
|
|
def test_zone():
|
|
"""Create a small test zone with known terrain."""
|
|
terrain = [["." for _ in range(25)] for _ in range(25)] # 25x25 simple terrain
|
|
# Add some impassable tiles for exit testing
|
|
terrain[5][10] = "^" # mountain north of player
|
|
zone = Zone(
|
|
name="testzone",
|
|
width=25,
|
|
height=25,
|
|
toroidal=True,
|
|
terrain=terrain,
|
|
impassable={"^", "~"},
|
|
)
|
|
return zone
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_writer():
|
|
writer = MagicMock()
|
|
writer.write = MagicMock()
|
|
writer.drain = AsyncMock()
|
|
writer.send_gmcp = MagicMock()
|
|
writer.send_msdp = MagicMock()
|
|
# Option negotiation state (for gmcp_enabled/msdp_enabled checks)
|
|
writer.local_option = MagicMock()
|
|
writer.local_option.enabled = MagicMock(return_value=True)
|
|
writer.remote_option = MagicMock()
|
|
writer.remote_option.enabled = MagicMock(return_value=True)
|
|
return writer
|
|
|
|
|
|
@pytest.fixture
|
|
def player(mock_writer, test_zone):
|
|
p = Player(name="Goku", x=10, y=10, writer=mock_writer)
|
|
p.location = test_zone
|
|
p.pl = 85.5
|
|
p.max_pl = 100.0
|
|
p.stamina = 42.3
|
|
p.max_stamina = 100.0
|
|
players[p.name] = p
|
|
return p
|
|
|
|
|
|
def test_send_char_vitals(player):
|
|
"""Test Char.Vitals sends correct payload."""
|
|
send_char_vitals(player)
|
|
|
|
player.writer.send_gmcp.assert_called_once_with(
|
|
"Char.Vitals",
|
|
{
|
|
"pl": 85.5,
|
|
"max_pl": 100.0,
|
|
"stamina": 42.3,
|
|
"max_stamina": 100.0,
|
|
},
|
|
)
|
|
|
|
|
|
def test_send_char_status_normal_mode(player):
|
|
"""Test Char.Status sends correct payload in normal mode."""
|
|
player.flying = False
|
|
player.resting = False
|
|
player.mode_stack = ["normal"]
|
|
|
|
send_char_status(player)
|
|
|
|
player.writer.send_gmcp.assert_called_once_with(
|
|
"Char.Status",
|
|
{
|
|
"flying": False,
|
|
"resting": False,
|
|
"mode": "normal",
|
|
"in_combat": False,
|
|
},
|
|
)
|
|
|
|
|
|
def test_send_char_status_in_combat(player):
|
|
"""Test Char.Status detects combat mode."""
|
|
player.mode_stack = ["normal", "combat"]
|
|
|
|
send_char_status(player)
|
|
|
|
args = player.writer.send_gmcp.call_args[0]
|
|
assert args[0] == "Char.Status"
|
|
assert args[1]["mode"] == "combat"
|
|
assert args[1]["in_combat"] is True
|
|
|
|
|
|
def test_send_char_status_flying_and_resting(player):
|
|
"""Test Char.Status reflects flying and resting states."""
|
|
player.flying = True
|
|
player.resting = True
|
|
|
|
send_char_status(player)
|
|
|
|
args = player.writer.send_gmcp.call_args[0]
|
|
assert args[1]["flying"] is True
|
|
assert args[1]["resting"] is True
|
|
|
|
|
|
def test_send_room_info(player):
|
|
"""Test Room.Info sends zone, coordinates, terrain, and exits."""
|
|
send_room_info(player)
|
|
|
|
player.writer.send_gmcp.assert_called_once()
|
|
args = player.writer.send_gmcp.call_args[0]
|
|
assert args[0] == "Room.Info"
|
|
|
|
data = args[1]
|
|
assert data["zone"] == "testzone"
|
|
assert data["x"] == 10
|
|
assert data["y"] == 10
|
|
assert data["terrain"] == "."
|
|
# All adjacent tiles should be passable (player at 10,10, all tiles are ".")
|
|
assert "north" in data["exits"]
|
|
assert "south" in data["exits"]
|
|
assert "east" in data["exits"]
|
|
assert "west" in data["exits"]
|
|
assert "northeast" in data["exits"]
|
|
assert "northwest" in data["exits"]
|
|
assert "southeast" in data["exits"]
|
|
assert "southwest" in data["exits"]
|
|
|
|
|
|
def test_send_room_info_blocked_exit(player, test_zone):
|
|
"""Test Room.Info excludes blocked exits."""
|
|
# Put player next to mountain at 10,5
|
|
player.y = 6
|
|
test_zone.terrain[5][10] = "^" # mountain to the north
|
|
|
|
send_room_info(player)
|
|
|
|
args = player.writer.send_gmcp.call_args[0]
|
|
data = args[1]
|
|
# North (y-1=5) should be impassable
|
|
assert "north" not in data["exits"]
|
|
assert "south" in data["exits"]
|
|
|
|
|
|
def test_send_room_info_no_zone(player):
|
|
"""Test Room.Info with no zone does not crash."""
|
|
player.location = None
|
|
|
|
send_room_info(player)
|
|
|
|
# Should not have called send_gmcp
|
|
player.writer.send_gmcp.assert_not_called()
|
|
|
|
|
|
def test_send_map_data(player):
|
|
"""Test Room.Map sends terrain grid."""
|
|
send_map_data(player)
|
|
|
|
player.writer.send_gmcp.assert_called_once()
|
|
args = player.writer.send_gmcp.call_args[0]
|
|
assert args[0] == "Room.Map"
|
|
|
|
data = args[1]
|
|
assert data["x"] == 10
|
|
assert data["y"] == 10
|
|
assert data["radius"] == 10
|
|
# Grid should be 21x21 (radius 10 = -10 to +10 inclusive)
|
|
assert len(data["terrain"]) == 21
|
|
assert len(data["terrain"][0]) == 21
|
|
# Center tile should be "."
|
|
assert data["terrain"][10][10] == "."
|
|
|
|
|
|
def test_send_map_data_toroidal_wrapping(player, test_zone):
|
|
"""Test Room.Map wraps correctly in toroidal zones."""
|
|
# Put player near edge to test wrapping
|
|
player.x = 2
|
|
player.y = 2
|
|
|
|
send_map_data(player)
|
|
|
|
args = player.writer.send_gmcp.call_args[0]
|
|
data = args[1]
|
|
# Should still be 21x21 even near edge (wraps around)
|
|
assert len(data["terrain"]) == 21
|
|
assert len(data["terrain"][0]) == 21
|
|
|
|
|
|
def test_send_map_data_no_zone(player):
|
|
"""Test Room.Map with no zone does not crash."""
|
|
player.location = None
|
|
|
|
send_map_data(player)
|
|
|
|
player.writer.send_gmcp.assert_not_called()
|
|
|
|
|
|
def test_send_msdp_vitals(player):
|
|
"""Test MSDP sends vitals with correct variable names."""
|
|
send_msdp_vitals(player)
|
|
|
|
player.writer.send_msdp.assert_called_once_with(
|
|
{
|
|
"HEALTH": "85.5",
|
|
"HEALTH_MAX": "100.0",
|
|
"STAMINA": "42.3",
|
|
"STAMINA_MAX": "100.0",
|
|
}
|
|
)
|
|
|
|
|
|
def test_player_send_gmcp(player):
|
|
"""Test Player.send_gmcp convenience method."""
|
|
player.send_gmcp("Test.Package", {"key": "value"})
|
|
|
|
player.writer.send_gmcp.assert_called_once_with("Test.Package", {"key": "value"})
|
|
|
|
|
|
def test_player_send_gmcp_no_writer():
|
|
"""Test Player.send_gmcp with no writer does not crash."""
|
|
p = Player(name="NoWriter", writer=None)
|
|
p.send_gmcp("Test.Package", {}) # Should not raise
|
|
|
|
|
|
def test_player_send_msdp(player):
|
|
"""Test Player.send_msdp convenience method."""
|
|
player.send_msdp({"VAR": "value"})
|
|
|
|
player.writer.send_msdp.assert_called_once_with({"VAR": "value"})
|
|
|
|
|
|
def test_player_send_msdp_no_writer():
|
|
"""Test Player.send_msdp with no writer does not crash."""
|
|
p = Player(name="NoWriter", writer=None)
|
|
p.send_msdp({}) # Should not raise
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_room_info_sent_on_movement(player, test_zone):
|
|
"""Test Room.Info is sent when player moves."""
|
|
from mudlib.commands.movement import move_player
|
|
|
|
await move_player(player, 1, 0, "east")
|
|
|
|
# Verify send_gmcp was called with Room.Info
|
|
gmcp_calls = player.writer.send_gmcp.call_args_list
|
|
room_info_calls = [call for call in gmcp_calls if call[0][0] == "Room.Info"]
|
|
assert len(room_info_calls) > 0, "Room.Info should be sent on movement"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_map_data_sent_on_movement(player, test_zone):
|
|
"""Test Room.Map is sent when player moves."""
|
|
from mudlib.commands.movement import move_player
|
|
|
|
await move_player(player, 1, 0, "east")
|
|
|
|
# Verify send_gmcp was called with Room.Map
|
|
gmcp_calls = player.writer.send_gmcp.call_args_list
|
|
room_map_calls = [call for call in gmcp_calls if call[0][0] == "Room.Map"]
|
|
assert len(room_map_calls) > 0, "Room.Map should be sent on movement"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_char_vitals_sent_on_rest_complete(player):
|
|
"""Test Char.Vitals is sent when resting completes."""
|
|
from mudlib.resting import process_resting
|
|
|
|
player.resting = True
|
|
player.stamina = player.max_stamina - 0.1 # Almost full
|
|
|
|
await process_resting()
|
|
|
|
# Should have called send_gmcp with both Char.Status and Char.Vitals
|
|
assert player.writer.send_gmcp.call_count == 2
|
|
|
|
# First call should be Char.Status
|
|
first_call = player.writer.send_gmcp.call_args_list[0]
|
|
assert first_call[0][0] == "Char.Status"
|
|
|
|
# Second call should be Char.Vitals
|
|
second_call = player.writer.send_gmcp.call_args_list[1]
|
|
assert second_call[0][0] == "Char.Vitals"
|
|
assert second_call[0][1] == {
|
|
"pl": round(player.pl, 1),
|
|
"max_pl": round(player.max_pl, 1),
|
|
"stamina": round(player.max_stamina, 1),
|
|
"max_stamina": round(player.max_stamina, 1),
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_char_vitals_sent_on_combat_resolve():
|
|
"""Test Char.Vitals is sent to both players on combat resolution."""
|
|
import time
|
|
|
|
from mudlib.combat.engine import active_encounters, process_combat, start_encounter
|
|
from mudlib.combat.moves import CombatMove
|
|
|
|
# Clear encounters
|
|
active_encounters.clear()
|
|
|
|
# Create two players with mock writers
|
|
mock_writer_1 = MagicMock()
|
|
mock_writer_1.write = MagicMock()
|
|
mock_writer_1.drain = AsyncMock()
|
|
mock_writer_1.send_gmcp = MagicMock()
|
|
|
|
mock_writer_2 = MagicMock()
|
|
mock_writer_2.write = MagicMock()
|
|
mock_writer_2.drain = AsyncMock()
|
|
mock_writer_2.send_gmcp = MagicMock()
|
|
|
|
attacker = Player(
|
|
name="Goku", x=0, y=0, pl=100.0, stamina=50.0, writer=mock_writer_1
|
|
)
|
|
defender = Player(
|
|
name="Vegeta", x=0, y=0, pl=100.0, stamina=50.0, writer=mock_writer_2
|
|
)
|
|
|
|
attacker.mode_stack.append("combat")
|
|
defender.mode_stack.append("combat")
|
|
|
|
# Create encounter and attack
|
|
encounter = start_encounter(attacker, defender)
|
|
punch = CombatMove(
|
|
name="punch right",
|
|
move_type="attack",
|
|
stamina_cost=5.0,
|
|
timing_window_ms=800,
|
|
damage_pct=0.15,
|
|
countered_by=["dodge left"],
|
|
)
|
|
encounter.attack(punch)
|
|
|
|
# Advance past telegraph and window to trigger resolution
|
|
time.sleep(0.31)
|
|
await process_combat()
|
|
time.sleep(0.85)
|
|
|
|
# Reset mocks before the resolution call
|
|
mock_writer_1.send_gmcp.reset_mock()
|
|
mock_writer_2.send_gmcp.reset_mock()
|
|
|
|
await process_combat()
|
|
|
|
# Both players should have received Char.Vitals
|
|
assert mock_writer_1.send_gmcp.call_count == 1
|
|
assert mock_writer_2.send_gmcp.call_count == 1
|
|
|
|
# Check attacker's call
|
|
attacker_call = mock_writer_1.send_gmcp.call_args[0]
|
|
assert attacker_call[0] == "Char.Vitals"
|
|
assert "pl" in attacker_call[1]
|
|
assert "max_pl" in attacker_call[1]
|
|
assert "stamina" in attacker_call[1]
|
|
assert "max_stamina" in attacker_call[1]
|
|
|
|
# Check defender's call
|
|
defender_call = mock_writer_2.send_gmcp.call_args[0]
|
|
assert defender_call[0] == "Char.Vitals"
|
|
assert "pl" in defender_call[1]
|
|
assert "max_pl" in defender_call[1]
|
|
assert "stamina" in defender_call[1]
|
|
assert "max_stamina" in defender_call[1]
|
|
|
|
# Cleanup
|
|
active_encounters.clear()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_char_status_sent_on_combat_end():
|
|
"""Test Char.Status is sent when combat ends (victory/defeat)."""
|
|
import time
|
|
|
|
from mudlib.combat.engine import active_encounters, process_combat, start_encounter
|
|
from mudlib.combat.moves import CombatMove
|
|
|
|
# Clear encounters
|
|
active_encounters.clear()
|
|
|
|
# Create two players with mock writers
|
|
mock_writer_1 = MagicMock()
|
|
mock_writer_1.write = MagicMock()
|
|
mock_writer_1.drain = AsyncMock()
|
|
mock_writer_1.send_gmcp = MagicMock()
|
|
|
|
mock_writer_2 = MagicMock()
|
|
mock_writer_2.write = MagicMock()
|
|
mock_writer_2.drain = AsyncMock()
|
|
mock_writer_2.send_gmcp = MagicMock()
|
|
|
|
attacker = Player(
|
|
name="Goku", x=0, y=0, pl=100.0, stamina=50.0, writer=mock_writer_1
|
|
)
|
|
defender = Player(
|
|
name="Vegeta", x=0, y=0, pl=1.0, stamina=50.0, writer=mock_writer_2
|
|
)
|
|
|
|
attacker.mode_stack.append("combat")
|
|
defender.mode_stack.append("combat")
|
|
|
|
# Create encounter and attack (will kill defender)
|
|
encounter = start_encounter(attacker, defender)
|
|
punch = CombatMove(
|
|
name="punch right",
|
|
move_type="attack",
|
|
stamina_cost=5.0,
|
|
timing_window_ms=800,
|
|
damage_pct=0.15,
|
|
countered_by=["dodge left"],
|
|
)
|
|
encounter.attack(punch)
|
|
|
|
# Advance past telegraph and window to trigger resolution
|
|
time.sleep(0.31)
|
|
await process_combat()
|
|
time.sleep(0.85)
|
|
|
|
# Reset mocks before the resolution call
|
|
mock_writer_1.send_gmcp.reset_mock()
|
|
mock_writer_2.send_gmcp.reset_mock()
|
|
|
|
await process_combat()
|
|
|
|
# Both players should have received Char.Status (after Char.Vitals)
|
|
# Check that Char.Status was called at least once
|
|
attacker_calls = [
|
|
call
|
|
for call in mock_writer_1.send_gmcp.call_args_list
|
|
if call[0][0] == "Char.Status"
|
|
]
|
|
defender_calls = [
|
|
call
|
|
for call in mock_writer_2.send_gmcp.call_args_list
|
|
if call[0][0] == "Char.Status"
|
|
]
|
|
|
|
assert len(attacker_calls) >= 1, "Attacker should receive Char.Status on combat end"
|
|
assert len(defender_calls) >= 1, "Defender should receive Char.Status on combat end"
|
|
|
|
# Verify mode is back to normal for attacker
|
|
assert attacker_calls[0][0][1]["mode"] == "normal"
|
|
assert attacker_calls[0][0][1]["in_combat"] is False
|
|
|
|
# Cleanup
|
|
active_encounters.clear()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_char_status_sent_on_rest_start(player):
|
|
"""Test Char.Status is sent when resting begins."""
|
|
from mudlib.commands.rest import cmd_rest
|
|
|
|
player.stamina = 50.0
|
|
player.resting = False
|
|
|
|
await cmd_rest(player, "")
|
|
|
|
# Check that Char.Status was sent
|
|
status_calls = [
|
|
call
|
|
for call in player.writer.send_gmcp.call_args_list
|
|
if call[0][0] == "Char.Status"
|
|
]
|
|
assert len(status_calls) == 1
|
|
assert status_calls[0][0][1]["resting"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_char_status_sent_on_rest_complete(player):
|
|
"""Test Char.Status is sent when resting completes."""
|
|
from mudlib.resting import process_resting
|
|
|
|
player.resting = True
|
|
player.stamina = player.max_stamina - 0.1 # Almost full
|
|
|
|
await process_resting()
|
|
|
|
# Check that Char.Status was sent
|
|
status_calls = [
|
|
call
|
|
for call in player.writer.send_gmcp.call_args_list
|
|
if call[0][0] == "Char.Status"
|
|
]
|
|
assert len(status_calls) == 1
|
|
assert status_calls[0][0][1]["resting"] is False
|
|
|
|
|
|
def test_msdp_vitals_skipped_when_not_negotiated(player):
|
|
"""Test send_msdp_vitals skips when MSDP is not negotiated."""
|
|
player.writer.local_option.enabled.return_value = False
|
|
player.writer.remote_option.enabled.return_value = False
|
|
|
|
send_msdp_vitals(player)
|
|
|
|
player.writer.send_msdp.assert_not_called()
|
|
|
|
|
|
def test_msdp_enabled_property(player):
|
|
"""Test msdp_enabled reflects negotiation state."""
|
|
player.writer.local_option.enabled.return_value = False
|
|
player.writer.remote_option.enabled.return_value = False
|
|
assert not player.msdp_enabled
|
|
|
|
player.writer.local_option.enabled.return_value = True
|
|
assert player.msdp_enabled
|
|
|
|
|
|
def test_gmcp_enabled_property(player):
|
|
"""Test gmcp_enabled reflects negotiation state."""
|
|
player.writer.local_option.enabled.return_value = False
|
|
player.writer.remote_option.enabled.return_value = False
|
|
assert not player.gmcp_enabled
|
|
|
|
player.writer.remote_option.enabled.return_value = True
|
|
assert player.gmcp_enabled
|
|
|
|
|
|
def test_msdp_enabled_no_writer():
|
|
"""Test msdp_enabled with no writer returns False."""
|
|
p = Player(name="NoWriter", writer=None)
|
|
assert not p.msdp_enabled
|
|
|
|
|
|
def test_gmcp_enabled_no_writer():
|
|
"""Test gmcp_enabled with no writer returns False."""
|
|
p = Player(name="NoWriter", writer=None)
|
|
assert not p.gmcp_enabled
|