diff --git a/src/mudlib/commands/power.py b/src/mudlib/commands/power.py new file mode 100644 index 0000000..ac60b57 --- /dev/null +++ b/src/mudlib/commands/power.py @@ -0,0 +1,211 @@ +"""Power commands for raising and lowering PL.""" + +import asyncio + +from mudlib.combat.engine import get_encounter +from mudlib.commands import CommandDefinition, register +from mudlib.commands.movement import send_nearby_message +from mudlib.gmcp import send_char_vitals +from mudlib.player import Player + +# Power-up mechanics +POWER_UP_TICK_INTERVAL = 0.05 # seconds between ticks +POWER_UP_PL_PER_TICK = 5.0 # PL increase per tick +POWER_UP_STAMINA_COST_PER_TICK = 2.0 # stamina cost per tick +POWER_DOWN_MIN_PL = 1.0 # minimum PL when powering down + + +async def power_up_loop(player: Player, target_pl: float | None = None) -> None: + """Background task that powers up the player over time. + + Args: + player: The player powering up + target_pl: Optional target PL to stop at (defaults to max_pl) + """ + target = target_pl if target_pl is not None else player.max_pl + + try: + while player.pl < target and player.stamina > 0: + await asyncio.sleep(POWER_UP_TICK_INTERVAL) + + # Deduct stamina + player.stamina -= POWER_UP_STAMINA_COST_PER_TICK + if player.stamina < 0: + player.stamina = 0.0 + + # Increase PL + old_pl = player.pl + player.pl = min(player.pl + POWER_UP_PL_PER_TICK, target) + + # Send GMCP update + send_char_vitals(player) + + # Send periodic aura message (every ~0.2s) + if int(old_pl / 20) != int(player.pl / 20): + await player.send("Your aura flares!\r\n") + await send_nearby_message( + player, player.x, player.y, f"{player.name}'s aura flares!\r\n" + ) + + # Check if we've run out of stamina + if player.stamina <= 0: + await player.send("You run out of stamina and stop powering up.\r\n") + await send_nearby_message( + player, + player.x, + player.y, + f"{player.name} stops powering up.\r\n", + ) + break + + # Reached target + if player.pl >= target: + await player.send(f"You reach your target power level of {target}.\r\n") + await send_nearby_message( + player, + player.x, + player.y, + f"{player.name} reaches their target power level.\r\n", + ) + finally: + # Clean up the task reference + player._power_task = None + + +async def cmd_power(player: Player, args: str) -> None: + """Manage power level. + + Usage: + power up - raise PL toward max_pl + power down - lower PL to minimum + power - set PL to exact value + power stop - cancel ongoing power-up + """ + if not args: + await player.send("Usage: power up|down|stop|\r\n") + return + + args = args.strip().lower() + + # Handle power stop + if args == "stop": + if player._power_task is None or player._power_task.done(): + await player.send("You're not powering up.\r\n") + return + + player._power_task.cancel() + player._power_task = None + await player.send("You stop powering up.\r\n") + await send_nearby_message( + player, player.x, player.y, f"{player.name} stops powering up.\r\n" + ) + return + + # Handle power down + if args == "down": + # Cancel any ongoing power-up + if player._power_task is not None and not player._power_task.done(): + player._power_task.cancel() + player._power_task = None + + player.pl = POWER_DOWN_MIN_PL + send_char_vitals(player) + await player.send(f"You power down to PL {POWER_DOWN_MIN_PL}.\r\n") + await send_nearby_message( + player, player.x, player.y, f"{player.name} powers down.\r\n" + ) + return + + # Handle power up + if args == "up": + # Check if already powering up + if player._power_task is not None and not player._power_task.done(): + await player.send("You're already powering up!\r\n") + return + + # Check for combat + if get_encounter(player) is not None: + await player.send("You can't power up during combat!\r\n") + return + + # Check stamina + if player.stamina <= 0: + await player.send("You don't have enough stamina to power up!\r\n") + return + + # Check if already at max + if player.pl >= player.max_pl: + await player.send("You're already at maximum power!\r\n") + return + + # Start power-up loop + await player.send("You begin powering up...\r\n") + await send_nearby_message( + player, player.x, player.y, f"{player.name} begins powering up!\r\n" + ) + player._power_task = asyncio.create_task(power_up_loop(player)) + return + + # Handle power + try: + target = float(args) + except ValueError: + await player.send(f"Invalid power level: {args}\r\n") + return + + # Validate target + if target <= 0: + await player.send("Power level must be greater than 0.\r\n") + return + + if target > player.max_pl: + await player.send( + f"Power level cannot exceed your maximum of {player.max_pl}.\r\n" + ) + return + + # If target is lower than current, instant change + if target < player.pl: + # Cancel any ongoing power-up + if player._power_task is not None and not player._power_task.done(): + player._power_task.cancel() + player._power_task = None + + player.pl = target + send_char_vitals(player) + await player.send(f"You lower your power to {target}.\r\n") + await send_nearby_message( + player, player.x, player.y, f"{player.name} lowers their power.\r\n" + ) + return + + # If target is higher, start power-up loop to that target + if target > player.pl: + # Check if already powering up + if player._power_task is not None and not player._power_task.done(): + await player.send("You're already powering up!\r\n") + return + + # Check for combat + if get_encounter(player) is not None: + await player.send("You can't power up during combat!\r\n") + return + + # Check stamina + if player.stamina <= 0: + await player.send("You don't have enough stamina to power up!\r\n") + return + + # Start power-up loop to target + await player.send(f"You begin powering up to {target}...\r\n") + await send_nearby_message( + player, player.x, player.y, f"{player.name} begins powering up!\r\n" + ) + player._power_task = asyncio.create_task(power_up_loop(player, target)) + return + + # Target equals current PL + await player.send(f"You're already at PL {target}.\r\n") + + +register(CommandDefinition("power", cmd_power, aliases=["pow"])) diff --git a/src/mudlib/player.py b/src/mudlib/player.py index 775dbf5..62a21cb 100644 --- a/src/mudlib/player.py +++ b/src/mudlib/player.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any @@ -32,6 +33,7 @@ class Player(Entity): paint_brush: str = "." prompt_template: str | None = None _last_msdp: dict = field(default_factory=dict, repr=False) + _power_task: asyncio.Task | None = None @property def mode(self) -> str: diff --git a/src/mudlib/server.py b/src/mudlib/server.py index bce88dd..3a4dfc0 100644 --- a/src/mudlib/server.py +++ b/src/mudlib/server.py @@ -24,6 +24,7 @@ import mudlib.commands.look import mudlib.commands.movement import mudlib.commands.play import mudlib.commands.portals +import mudlib.commands.power import mudlib.commands.quit import mudlib.commands.reload import mudlib.commands.spawn diff --git a/tests/test_power.py b/tests/test_power.py new file mode 100644 index 0000000..4a521ae --- /dev/null +++ b/tests/test_power.py @@ -0,0 +1,309 @@ +"""Tests for power commands.""" + +import asyncio +import time +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from mudlib.combat.encounter import CombatEncounter +from mudlib.combat.engine import active_encounters +from mudlib.commands.power import cmd_power +from mudlib.player import Player, players +from mudlib.zone import Zone + + +@pytest.fixture(autouse=True) +def clear_state(): + """Clear players and encounters before and after each test.""" + players.clear() + active_encounters.clear() + yield + players.clear() + active_encounters.clear() + + +@pytest.fixture +def test_zone(): + """Create a test zone for players.""" + terrain = [["." for _ in range(256)] for _ in range(256)] + zone = Zone( + name="testzone", + width=256, + height=256, + toroidal=True, + terrain=terrain, + impassable=set(), + ) + return zone + + +@pytest.fixture +def mock_writer(): + writer = MagicMock() + writer.write = MagicMock() + writer.drain = AsyncMock() + return writer + + +@pytest.fixture +def mock_reader(): + return MagicMock() + + +@pytest.fixture +def player(mock_reader, mock_writer, test_zone): + p = Player(name="Goku", x=0, y=0, reader=mock_reader, writer=mock_writer) + p.location = test_zone + test_zone._contents.append(p) + players[p.name] = p + return p + + +@pytest.fixture +def nearby_player(mock_reader, mock_writer, test_zone): + p = Player(name="Vegeta", x=0, y=0, reader=mock_reader, writer=mock_writer) + p.location = test_zone + test_zone._contents.append(p) + players[p.name] = p + return p + + +@pytest.mark.asyncio +async def test_power_down_instantly_lowers_pl(player): + """Test power down instantly lowers PL to minimum.""" + player.pl = 100.0 + player.max_pl = 100.0 + + await cmd_power(player, "down") + + assert player.pl == 1.0 + messages = [call[0][0] for call in player.writer.write.call_args_list] + assert any("power down" in msg.lower() for msg in messages) + + +@pytest.mark.asyncio +async def test_power_to_number_lowers_instantly(player): + """Test power instantly lowers when target < current.""" + player.pl = 100.0 + player.max_pl = 100.0 + + await cmd_power(player, "50") + + assert player.pl == 50.0 + messages = [call[0][0] for call in player.writer.write.call_args_list] + assert any("50" in msg for msg in messages) + + +@pytest.mark.asyncio +async def test_power_to_number_rejects_zero(player): + """Test power rejects 0 or negative values.""" + player.pl = 100.0 + + await cmd_power(player, "0") + + assert player.pl == 100.0 # unchanged + messages = [call[0][0] for call in player.writer.write.call_args_list] + assert any("must be greater than 0" in msg.lower() for msg in messages) + + +@pytest.mark.asyncio +async def test_power_to_number_rejects_above_max(player): + """Test power rejects values above max_pl.""" + player.pl = 100.0 + player.max_pl = 100.0 + + await cmd_power(player, "150") + + assert player.pl == 100.0 # unchanged + messages = [call[0][0] for call in player.writer.write.call_args_list] + assert any("maximum" in msg.lower() for msg in messages) + + +@pytest.mark.asyncio +async def test_power_up_starts_increasing_pl(player): + """Test power up starts a loop that increases PL over time.""" + player.pl = 10.0 + player.max_pl = 100.0 + player.stamina = 100.0 + + await cmd_power(player, "up") + + # Give it a moment to run a few ticks + await asyncio.sleep(0.15) + + assert player.pl > 10.0 + assert player.pl < player.max_pl + + +@pytest.mark.asyncio +async def test_power_up_deducts_stamina(player): + """Test power up deducts stamina per tick.""" + player.pl = 10.0 + player.max_pl = 100.0 + player.stamina = 100.0 + + await cmd_power(player, "up") + + # Give it a moment to run a few ticks + await asyncio.sleep(0.15) + + assert player.stamina < 100.0 + + +@pytest.mark.asyncio +async def test_power_up_stops_at_max_pl(player): + """Test power up stops when PL reaches max_pl.""" + player.pl = 95.0 + player.max_pl = 100.0 + player.stamina = 100.0 + + await cmd_power(player, "up") + + # Wait for it to complete + await asyncio.sleep(0.3) + + assert player.pl == 100.0 + assert player._power_task is None or player._power_task.done() + + +@pytest.mark.asyncio +async def test_power_up_stops_when_stamina_depleted(player): + """Test power up stops when stamina runs out.""" + player.pl = 10.0 + player.max_pl = 100.0 + player.stamina = 5.0 # Very low stamina + + await cmd_power(player, "up") + + # Wait for stamina to deplete + await asyncio.sleep(0.3) + + # PL should have increased a little, but stopped + assert player.pl > 10.0 + assert player.pl < 100.0 + assert player.stamina <= 0.0 + assert player._power_task is None or player._power_task.done() + + +@pytest.mark.asyncio +async def test_power_stop_cancels_power_up(player): + """Test power stop cancels an ongoing power-up.""" + player.pl = 10.0 + player.max_pl = 100.0 + player.stamina = 100.0 + + await cmd_power(player, "up") + await asyncio.sleep(0.05) # Let it start + + await cmd_power(player, "stop") + + pl_at_stop = player.pl + await asyncio.sleep(0.1) # Wait a bit more + + # PL should not have increased after stop + assert player.pl == pl_at_stop + assert player._power_task is None or player._power_task.done() + + +@pytest.mark.asyncio +async def test_power_up_rejects_during_combat(player, nearby_player): + """Test power up is rejected when player is in combat.""" + player.pl = 50.0 + player.stamina = 100.0 + + # Put player in combat + encounter = CombatEncounter( + attacker=player, defender=nearby_player, last_action_at=time.monotonic() + ) + active_encounters.append(encounter) + + await cmd_power(player, "up") + + messages = [call[0][0] for call in player.writer.write.call_args_list] + assert any("combat" in msg.lower() for msg in messages) + assert player.pl == 50.0 # unchanged + + +@pytest.mark.asyncio +async def test_power_up_rejects_with_no_stamina(player): + """Test power up is rejected when stamina is 0.""" + player.pl = 50.0 + player.stamina = 0.0 + + await cmd_power(player, "up") + + messages = [call[0][0] for call in player.writer.write.call_args_list] + assert any("stamina" in msg.lower() for msg in messages) + assert player.pl == 50.0 # unchanged + + +@pytest.mark.asyncio +async def test_power_up_sends_feedback_messages(player): + """Test power up sends feedback to player.""" + player.pl = 10.0 + player.max_pl = 100.0 + player.stamina = 100.0 + + await cmd_power(player, "up") + + messages = [call[0][0] for call in player.writer.write.call_args_list] + assert any("powering up" in msg.lower() for msg in messages) + + +@pytest.mark.asyncio +async def test_power_up_broadcasts_to_nearby(player, nearby_player): + """Test power up broadcasts to nearby players.""" + player.pl = 10.0 + player.max_pl = 100.0 + player.stamina = 100.0 + + await cmd_power(player, "up") + + nearby_messages = [call[0][0] for call in nearby_player.writer.write.call_args_list] + assert any("Goku" in msg and "power" in msg.lower() for msg in nearby_messages) + + +@pytest.mark.asyncio +async def test_power_up_when_already_powering_up(player): + """Test power up when already powering up sends message.""" + player.pl = 10.0 + player.max_pl = 100.0 + player.stamina = 100.0 + + await cmd_power(player, "up") + await asyncio.sleep(0.05) + + # Try to power up again + await cmd_power(player, "up") + + messages = [call[0][0] for call in player.writer.write.call_args_list] + assert any("already" in msg.lower() for msg in messages) + + +@pytest.mark.asyncio +async def test_power_down_broadcasts_to_nearby(player, nearby_player): + """Test power down broadcasts to nearby players.""" + player.pl = 100.0 + + await cmd_power(player, "down") + + nearby_messages = [call[0][0] for call in nearby_player.writer.write.call_args_list] + assert any("Goku" in msg and "power" in msg.lower() for msg in nearby_messages) + + +@pytest.mark.asyncio +async def test_power_to_number_raises_with_loop(player): + """Test power starts power-up loop when target > current.""" + player.pl = 10.0 + player.max_pl = 100.0 + player.stamina = 100.0 + + await cmd_power(player, "80") + + # Give it a moment to run + await asyncio.sleep(0.15) + + # Should be increasing toward 80 + assert player.pl > 10.0 + assert player.stamina < 100.0 # deducting stamina