Increment player.kills and player.mob_kills on mob defeat, player.deaths on player defeat. Session time accumulation via accumulate_play_time helper.
488 lines
13 KiB
Python
488 lines
13 KiB
Python
"""SQLite persistence for player accounts and state."""
|
|
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from typing import TypedDict
|
|
|
|
from mudlib.player import Player
|
|
from mudlib.thing import Thing
|
|
|
|
|
|
class PlayerData(TypedDict):
|
|
"""Shape of persisted player data from the database."""
|
|
|
|
x: int
|
|
y: int
|
|
pl: float
|
|
stamina: float
|
|
max_stamina: float
|
|
flying: bool
|
|
zone_name: str
|
|
inventory: list[str]
|
|
|
|
|
|
class StatsData(TypedDict):
|
|
"""Shape of persisted stats data from the database."""
|
|
|
|
kills: int
|
|
deaths: int
|
|
mob_kills: dict[str, int]
|
|
play_time_seconds: float
|
|
unlocked_moves: set[str]
|
|
|
|
|
|
# Module-level database path
|
|
_db_path: str | None = None
|
|
|
|
|
|
def init_db(db_path: str | Path) -> None:
|
|
"""Initialize the database and create tables if needed.
|
|
|
|
Args:
|
|
db_path: Path to the SQLite database file
|
|
"""
|
|
global _db_path
|
|
_db_path = str(db_path)
|
|
|
|
# Ensure parent directory exists
|
|
Path(_db_path).parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
conn = sqlite3.connect(_db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS accounts (
|
|
name TEXT PRIMARY KEY COLLATE NOCASE,
|
|
password_hash TEXT NOT NULL,
|
|
salt TEXT NOT NULL,
|
|
x INTEGER NOT NULL DEFAULT 0,
|
|
y INTEGER NOT NULL DEFAULT 0,
|
|
pl REAL NOT NULL DEFAULT 100.0,
|
|
stamina REAL NOT NULL DEFAULT 100.0,
|
|
max_stamina REAL NOT NULL DEFAULT 100.0,
|
|
flying INTEGER NOT NULL DEFAULT 0,
|
|
zone_name TEXT NOT NULL DEFAULT 'overworld',
|
|
inventory TEXT NOT NULL DEFAULT '[]',
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
last_login TEXT
|
|
)
|
|
""")
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS player_aliases (
|
|
player_name TEXT NOT NULL COLLATE NOCASE,
|
|
alias TEXT NOT NULL COLLATE NOCASE,
|
|
expansion TEXT NOT NULL,
|
|
PRIMARY KEY (player_name, alias)
|
|
)
|
|
""")
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS player_stats (
|
|
player_name TEXT PRIMARY KEY COLLATE NOCASE,
|
|
kills INTEGER NOT NULL DEFAULT 0,
|
|
deaths INTEGER NOT NULL DEFAULT 0,
|
|
mob_kills TEXT NOT NULL DEFAULT '{}',
|
|
play_time_seconds REAL NOT NULL DEFAULT 0.0,
|
|
unlocked_moves TEXT NOT NULL DEFAULT '[]'
|
|
)
|
|
""")
|
|
|
|
# Migrations: add columns if they don't exist (old schemas)
|
|
cursor.execute("PRAGMA table_info(accounts)")
|
|
columns = [row[1] for row in cursor.fetchall()]
|
|
if "zone_name" not in columns:
|
|
cursor.execute(
|
|
"ALTER TABLE accounts "
|
|
"ADD COLUMN zone_name TEXT NOT NULL DEFAULT 'overworld'"
|
|
)
|
|
if "inventory" not in columns:
|
|
cursor.execute(
|
|
"ALTER TABLE accounts ADD COLUMN inventory TEXT NOT NULL DEFAULT '[]'"
|
|
)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def _get_connection() -> sqlite3.Connection:
|
|
"""Get a connection to the database.
|
|
|
|
Returns:
|
|
sqlite3.Connection
|
|
|
|
Raises:
|
|
RuntimeError: If init_db has not been called
|
|
"""
|
|
if _db_path is None:
|
|
raise RuntimeError("Database not initialized. Call init_db() first.")
|
|
return sqlite3.connect(_db_path)
|
|
|
|
|
|
def _hash_password(password: str, salt: bytes) -> str:
|
|
"""Hash a password with the given salt.
|
|
|
|
Args:
|
|
password: The password to hash
|
|
salt: Salt bytes
|
|
|
|
Returns:
|
|
Hex-encoded hash
|
|
"""
|
|
return hashlib.pbkdf2_hmac(
|
|
"sha256", password.encode("utf-8"), salt, iterations=100000
|
|
).hex()
|
|
|
|
|
|
def create_account(name: str, password: str) -> bool:
|
|
"""Create a new account with the given name and password.
|
|
|
|
Args:
|
|
name: Account name (case-insensitive)
|
|
password: Password to hash and store
|
|
|
|
Returns:
|
|
True if account was created, False if name is already taken
|
|
"""
|
|
if account_exists(name):
|
|
return False
|
|
|
|
# Generate random salt
|
|
salt = os.urandom(32)
|
|
password_hash = _hash_password(password, salt)
|
|
|
|
conn = _get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
cursor.execute(
|
|
"INSERT INTO accounts (name, password_hash, salt) VALUES (?, ?, ?)",
|
|
(name, password_hash, salt.hex()),
|
|
)
|
|
conn.commit()
|
|
return True
|
|
except sqlite3.IntegrityError:
|
|
# Race condition: someone created the account between our check and insert
|
|
return False
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def account_exists(name: str) -> bool:
|
|
"""Check if an account exists.
|
|
|
|
Args:
|
|
name: Account name (case-insensitive)
|
|
|
|
Returns:
|
|
True if account exists, False otherwise
|
|
"""
|
|
conn = _get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT 1 FROM accounts WHERE name = ? LIMIT 1", (name,))
|
|
result = cursor.fetchone()
|
|
|
|
conn.close()
|
|
return result is not None
|
|
|
|
|
|
def authenticate(name: str, password: str) -> bool:
|
|
"""Verify a password for the given account.
|
|
|
|
Args:
|
|
name: Account name (case-insensitive)
|
|
password: Password to verify
|
|
|
|
Returns:
|
|
True if password is correct, False otherwise
|
|
"""
|
|
conn = _get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT password_hash, salt FROM accounts WHERE name = ?", (name,))
|
|
result = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if result is None:
|
|
return False
|
|
|
|
stored_hash, salt_hex = result
|
|
salt = bytes.fromhex(salt_hex)
|
|
password_hash = _hash_password(password, salt)
|
|
|
|
return hmac.compare_digest(password_hash, stored_hash)
|
|
|
|
|
|
def save_player(player: Player) -> None:
|
|
"""Save player state to the database.
|
|
|
|
The account must already exist. This updates game state only.
|
|
|
|
Args:
|
|
player: Player instance to save
|
|
"""
|
|
# Accumulate play time before saving
|
|
from mudlib.player import accumulate_play_time
|
|
|
|
accumulate_play_time(player)
|
|
|
|
# Serialize inventory as JSON list of thing names
|
|
inventory_names = [obj.name for obj in player.contents if isinstance(obj, Thing)]
|
|
inventory_json = json.dumps(inventory_names)
|
|
|
|
conn = _get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute(
|
|
"""
|
|
UPDATE accounts
|
|
SET x = ?, y = ?, pl = ?, stamina = ?, max_stamina = ?, flying = ?,
|
|
zone_name = ?, inventory = ?
|
|
WHERE name = ?
|
|
""",
|
|
(
|
|
player.x,
|
|
player.y,
|
|
player.pl,
|
|
player.stamina,
|
|
player.max_stamina,
|
|
1 if player.flying else 0,
|
|
player.location.name if player.location else "overworld",
|
|
inventory_json,
|
|
player.name,
|
|
),
|
|
)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Save aliases and stats
|
|
save_aliases(player.name, player.aliases)
|
|
save_player_stats(player)
|
|
|
|
|
|
def load_player_data(name: str) -> PlayerData | None:
|
|
"""Load player data from the database.
|
|
|
|
Args:
|
|
name: Account name (case-insensitive)
|
|
|
|
Returns:
|
|
Dictionary of persisted fields, or None if account not found
|
|
"""
|
|
conn = _get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check which columns exist (for migration compatibility)
|
|
cursor.execute("PRAGMA table_info(accounts)")
|
|
columns = [row[1] for row in cursor.fetchall()]
|
|
has_zone_name = "zone_name" in columns
|
|
has_inventory = "inventory" in columns
|
|
|
|
# Build SELECT based on available columns
|
|
select_cols = "x, y, pl, stamina, max_stamina, flying"
|
|
if has_zone_name:
|
|
select_cols += ", zone_name"
|
|
if has_inventory:
|
|
select_cols += ", inventory"
|
|
|
|
cursor.execute(
|
|
f"SELECT {select_cols} FROM accounts WHERE name = ?",
|
|
(name,),
|
|
)
|
|
|
|
result = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if result is None:
|
|
return None
|
|
|
|
# Unpack base fields
|
|
x, y, pl, stamina, max_stamina, flying_int = result[:6]
|
|
idx = 6
|
|
|
|
zone_name = "overworld"
|
|
if has_zone_name:
|
|
zone_name = result[idx]
|
|
idx += 1
|
|
|
|
inventory: list[str] = []
|
|
if has_inventory:
|
|
inventory = json.loads(result[idx])
|
|
|
|
return {
|
|
"x": x,
|
|
"y": y,
|
|
"pl": pl,
|
|
"stamina": stamina,
|
|
"max_stamina": max_stamina,
|
|
"flying": bool(flying_int),
|
|
"zone_name": zone_name,
|
|
"inventory": inventory,
|
|
}
|
|
|
|
|
|
def update_last_login(name: str) -> None:
|
|
"""Update the last_login timestamp for an account.
|
|
|
|
Args:
|
|
name: Account name (case-insensitive)
|
|
"""
|
|
conn = _get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute(
|
|
"UPDATE accounts SET last_login = datetime('now') WHERE name = ?", (name,)
|
|
)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def save_aliases(
|
|
name: str, aliases: dict[str, str], db_path: str | Path | None = None
|
|
) -> None:
|
|
"""Save player aliases to the database.
|
|
|
|
Replaces all existing aliases for the player.
|
|
|
|
Args:
|
|
name: Player name (case-insensitive)
|
|
aliases: Dictionary mapping alias names to expansions
|
|
db_path: Optional explicit database path (for testing)
|
|
"""
|
|
conn = sqlite3.connect(str(db_path)) if db_path is not None else _get_connection()
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# Delete all existing aliases for this player
|
|
cursor.execute("DELETE FROM player_aliases WHERE player_name = ?", (name,))
|
|
|
|
# Insert new aliases
|
|
for alias, expansion in aliases.items():
|
|
cursor.execute(
|
|
"INSERT INTO player_aliases (player_name, alias, expansion) "
|
|
"VALUES (?, ?, ?)",
|
|
(name, alias, expansion),
|
|
)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def load_aliases(name: str, db_path: str | Path | None = None) -> dict[str, str]:
|
|
"""Load player aliases from the database.
|
|
|
|
Args:
|
|
name: Player name (case-insensitive)
|
|
db_path: Optional explicit database path (for testing)
|
|
|
|
Returns:
|
|
Dictionary mapping alias names to expansions
|
|
"""
|
|
conn = sqlite3.connect(str(db_path)) if db_path is not None else _get_connection()
|
|
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute(
|
|
"SELECT alias, expansion FROM player_aliases WHERE player_name = ?",
|
|
(name,),
|
|
)
|
|
|
|
result = {alias: expansion for alias, expansion in cursor.fetchall()}
|
|
|
|
conn.close()
|
|
return result
|
|
|
|
|
|
def save_player_stats(player: Player, db_path: str | Path | None = None) -> None:
|
|
"""Save player stats to the database.
|
|
|
|
Args:
|
|
player: Player instance with stats to save
|
|
db_path: Optional explicit database path (for testing)
|
|
"""
|
|
conn = sqlite3.connect(str(db_path)) if db_path is not None else _get_connection()
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# Serialize mob_kills as JSON dict and unlocked_moves as sorted JSON list
|
|
mob_kills_json = json.dumps(player.mob_kills)
|
|
unlocked_moves_json = json.dumps(sorted(player.unlocked_moves))
|
|
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO player_stats
|
|
(player_name, kills, deaths, mob_kills, play_time_seconds, unlocked_moves)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(player_name) DO UPDATE SET
|
|
kills = excluded.kills,
|
|
deaths = excluded.deaths,
|
|
mob_kills = excluded.mob_kills,
|
|
play_time_seconds = excluded.play_time_seconds,
|
|
unlocked_moves = excluded.unlocked_moves
|
|
""",
|
|
(
|
|
player.name,
|
|
player.kills,
|
|
player.deaths,
|
|
mob_kills_json,
|
|
player.play_time_seconds,
|
|
unlocked_moves_json,
|
|
),
|
|
)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def load_player_stats(name: str, db_path: str | Path | None = None) -> StatsData:
|
|
"""Load player stats from the database.
|
|
|
|
Args:
|
|
name: Player name (case-insensitive)
|
|
db_path: Optional explicit database path (for testing)
|
|
|
|
Returns:
|
|
Dictionary with stats fields, defaults if no row exists
|
|
"""
|
|
conn = sqlite3.connect(str(db_path)) if db_path is not None else _get_connection()
|
|
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute(
|
|
"""
|
|
SELECT kills, deaths, mob_kills, play_time_seconds, unlocked_moves
|
|
FROM player_stats
|
|
WHERE player_name = ?
|
|
""",
|
|
(name,),
|
|
)
|
|
|
|
result = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if result is None:
|
|
# Return defaults if no stats row exists
|
|
return {
|
|
"kills": 0,
|
|
"deaths": 0,
|
|
"mob_kills": {},
|
|
"play_time_seconds": 0.0,
|
|
"unlocked_moves": set(),
|
|
}
|
|
|
|
kills, deaths, mob_kills_json, play_time_seconds, unlocked_moves_json = result
|
|
|
|
return {
|
|
"kills": kills,
|
|
"deaths": deaths,
|
|
"mob_kills": json.loads(mob_kills_json),
|
|
"play_time_seconds": play_time_seconds,
|
|
"unlocked_moves": set(json.loads(unlocked_moves_json)),
|
|
}
|