Handle state and stats WebSocket messages in server

This commit is contained in:
Jared Miller 2026-01-28 13:50:35 -05:00
parent f31d33f992
commit a93475f3ab
Signed by: shmup
GPG key ID: 22B5C6D66A38B06C
2 changed files with 203 additions and 0 deletions

View file

@ -30,6 +30,39 @@ let respondToPromptStmt: ReturnType<Database["prepare"]>;
let getPendingPromptsStmt: ReturnType<Database["prepare"]>; let getPendingPromptsStmt: ReturnType<Database["prepare"]>;
let appendOutputStmt: ReturnType<Database["prepare"]>; let appendOutputStmt: ReturnType<Database["prepare"]>;
let getSessionOutputStmt: ReturnType<Database["prepare"]>; let getSessionOutputStmt: ReturnType<Database["prepare"]>;
let updateSessionStatsStmt: ReturnType<Database["prepare"]>;
// Migration function to add new columns to existing tables
function runMigrations(): void {
// Add Phase 2.3 session state and stats columns
const migrations = [
// Stats columns
"ALTER TABLE sessions ADD COLUMN state TEXT DEFAULT 'ready'",
"ALTER TABLE sessions ADD COLUMN prompts INTEGER DEFAULT 0",
"ALTER TABLE sessions ADD COLUMN completions INTEGER DEFAULT 0",
"ALTER TABLE sessions ADD COLUMN tools INTEGER DEFAULT 0",
"ALTER TABLE sessions ADD COLUMN compressions INTEGER DEFAULT 0",
"ALTER TABLE sessions ADD COLUMN thinking_seconds INTEGER DEFAULT 0",
"ALTER TABLE sessions ADD COLUMN work_seconds INTEGER DEFAULT 0",
"ALTER TABLE sessions ADD COLUMN mode TEXT DEFAULT 'normal'",
"ALTER TABLE sessions ADD COLUMN model TEXT",
"ALTER TABLE sessions ADD COLUMN idle_since INTEGER",
// Git columns (for later phase)
"ALTER TABLE sessions ADD COLUMN git_branch TEXT",
"ALTER TABLE sessions ADD COLUMN git_files_json TEXT",
];
for (const migration of migrations) {
try {
db.exec(migration);
} catch (error: any) {
// Ignore "duplicate column" errors - column already exists
if (!error.message?.includes("duplicate column")) {
throw error;
}
}
}
}
export function initDb(path = "claude-remote.db"): Database { export function initDb(path = "claude-remote.db"): Database {
db = new Database(path); db = new Database(path);
@ -74,6 +107,9 @@ export function initDb(path = "claude-remote.db"): Database {
); );
`); `);
// Run migrations to add new columns
runMigrations();
// Prepare all statements once // Prepare all statements once
createDeviceStmt = db.prepare( createDeviceStmt = db.prepare(
"INSERT INTO devices (secret, name, created_at, last_seen) VALUES (?, ?, ?, ?) RETURNING *", "INSERT INTO devices (secret, name, created_at, last_seen) VALUES (?, ?, ?, ?) RETURNING *",
@ -106,6 +142,12 @@ export function initDb(path = "claude-remote.db"): Database {
getSessionOutputStmt = db.prepare( getSessionOutputStmt = db.prepare(
"SELECT * FROM output_log WHERE session_id = ? ORDER BY timestamp ASC", "SELECT * FROM output_log WHERE session_id = ? ORDER BY timestamp ASC",
); );
updateSessionStatsStmt = db.prepare(`
UPDATE sessions
SET state = ?, prompts = ?, completions = ?, tools = ?, compressions = ?,
thinking_seconds = ?, work_seconds = ?, mode = ?, model = ?, idle_since = ?
WHERE id = ?
`);
return db; return db;
} }
@ -226,3 +268,35 @@ export function appendOutput(sessionId: number, line: string): void {
export function getSessionOutput(sessionId: number): OutputLog[] { export function getSessionOutput(sessionId: number): OutputLog[] {
return getSessionOutputStmt.all(sessionId) as OutputLog[]; return getSessionOutputStmt.all(sessionId) as OutputLog[];
} }
// Session state functions
export function updateSessionStats(
sessionId: number,
state: {
state: string;
prompts: number;
completions: number;
tools: number;
compressions: number;
thinking_seconds: number;
work_seconds: number;
mode: string;
model: string | null;
idle_since: number | null;
},
): void {
updateSessionStatsStmt.run(
state.state,
state.prompts,
state.completions,
state.tools,
state.compressions,
state.thinking_seconds,
state.work_seconds,
state.mode,
state.model,
state.idle_since,
sessionId,
);
}

View file

@ -13,23 +13,53 @@ import {
initDb, initDb,
respondToPrompt, respondToPrompt,
updateLastSeen, updateLastSeen,
updateSessionStats,
} from "./db"; } from "./db";
import type { import type {
AnswerResponse, AnswerResponse,
ClientMessage, ClientMessage,
ServerMessage, ServerMessage,
SessionState,
SSEEvent, SSEEvent,
} from "./types"; } from "./types";
// Server state // Server state
const sseClients = new Set<ReadableStreamDefaultController<string>>(); const sseClients = new Set<ReadableStreamDefaultController<string>>();
const sessionWebSockets = new Map<number, ServerWebSocket<SessionData>>(); const sessionWebSockets = new Map<number, ServerWebSocket<SessionData>>();
const sessionStates = new Map<number, SessionState>();
interface SessionData { interface SessionData {
deviceId: number; deviceId: number;
sessionId: number | null; sessionId: number | null;
} }
// Helper function to create default SessionState
function createDefaultSessionState(): SessionState {
return {
state: "ready",
prompts: 0,
completions: 0,
tools: 0,
compressions: 0,
thinking_seconds: 0,
work_seconds: 0,
mode: "normal",
model: null,
idle_since: null,
dirty: false,
};
}
// Persist dirty sessions to database (throttled, called every 5s)
function persistDirtySessions(): void {
for (const [sessionId, state] of sessionStates.entries()) {
if (state.dirty) {
updateSessionStats(sessionId, state);
state.dirty = false;
}
}
}
// Broadcast SSE event to all connected dashboards // Broadcast SSE event to all connected dashboards
function broadcastSSE(event: SSEEvent): void { function broadcastSSE(event: SSEEvent): void {
const eventStr = `event: ${event.type}\ndata: ${JSON.stringify(event)}\n\n`; const eventStr = `event: ${event.type}\ndata: ${JSON.stringify(event)}\n\n`;
@ -337,6 +367,9 @@ const server = Bun.serve<SessionData>({
ws.data.sessionId = session.id; ws.data.sessionId = session.id;
sessionWebSockets.set(session.id, ws); sessionWebSockets.set(session.id, ws);
// Initialize in-memory session state
sessionStates.set(session.id, createDefaultSessionState());
console.debug( console.debug(
`Session ${session.id} started for device ${device.id}`, `Session ${session.id} started for device ${device.id}`,
); );
@ -349,6 +382,30 @@ const server = Bun.serve<SessionData>({
command: session.command, command: session.command,
}); });
// Broadcast initial state and stats
const initialState = sessionStates.get(session.id);
if (initialState) {
broadcastSSE({
type: "state",
session_id: session.id,
state: initialState.state,
timestamp: Date.now(),
});
broadcastSSE({
type: "stats",
session_id: session.id,
prompts: initialState.prompts,
completions: initialState.completions,
tools: initialState.tools,
compressions: initialState.compressions,
thinking_seconds: initialState.thinking_seconds,
work_seconds: initialState.work_seconds,
mode: initialState.mode,
model: initialState.model,
idle_since: initialState.idle_since,
});
}
ws.send( ws.send(
JSON.stringify({ type: "authenticated", session_id: session.id }), JSON.stringify({ type: "authenticated", session_id: session.id }),
); );
@ -387,6 +444,60 @@ const server = Bun.serve<SessionData>({
return; return;
} }
// Handle state message
if (msg.type === "state") {
const sessionState = sessionStates.get(ws.data.sessionId);
if (sessionState && msg.state) {
sessionState.state = msg.state;
sessionState.dirty = true;
// Broadcast SSE state event
broadcastSSE({
type: "state",
session_id: ws.data.sessionId,
state: msg.state,
timestamp: msg.timestamp || Date.now(),
});
}
return;
}
// Handle stats message
if (msg.type === "stats") {
const sessionState = sessionStates.get(ws.data.sessionId);
if (sessionState) {
// Update all stats fields from the message
sessionState.prompts = msg.prompts ?? sessionState.prompts;
sessionState.completions =
msg.completions ?? sessionState.completions;
sessionState.tools = msg.tools ?? sessionState.tools;
sessionState.compressions =
msg.compressions ?? sessionState.compressions;
sessionState.thinking_seconds =
msg.thinking_seconds ?? sessionState.thinking_seconds;
sessionState.work_seconds =
msg.work_seconds ?? sessionState.work_seconds;
sessionState.mode = msg.mode ?? sessionState.mode;
sessionState.model = msg.model ?? sessionState.model;
sessionState.idle_since = msg.idle_since ?? sessionState.idle_since;
sessionState.dirty = true;
// Broadcast SSE stats event
broadcastSSE({
type: "stats",
session_id: ws.data.sessionId,
prompts: sessionState.prompts,
completions: sessionState.completions,
tools: sessionState.tools,
compressions: sessionState.compressions,
thinking_seconds: sessionState.thinking_seconds,
work_seconds: sessionState.work_seconds,
mode: sessionState.mode,
model: sessionState.model,
idle_since: sessionState.idle_since,
});
}
return;
}
// Handle resize message // Handle resize message
if (msg.type === "resize") { if (msg.type === "resize") {
// Store resize info if needed (not yet implemented) // Store resize info if needed (not yet implemented)
@ -401,6 +512,13 @@ const server = Bun.serve<SessionData>({
endSession(ws.data.sessionId); endSession(ws.data.sessionId);
sessionWebSockets.delete(ws.data.sessionId); sessionWebSockets.delete(ws.data.sessionId);
// Persist final state before cleanup
const state = sessionStates.get(ws.data.sessionId);
if (state?.dirty) {
updateSessionStats(ws.data.sessionId, state);
}
sessionStates.delete(ws.data.sessionId);
console.debug( console.debug(
`Session ${ws.data.sessionId} ended with code ${msg.code}`, `Session ${ws.data.sessionId} ended with code ${msg.code}`,
); );
@ -423,6 +541,14 @@ const server = Bun.serve<SessionData>({
close(ws) { close(ws) {
if (ws.data.sessionId) { if (ws.data.sessionId) {
sessionWebSockets.delete(ws.data.sessionId); sessionWebSockets.delete(ws.data.sessionId);
// Persist final state before cleanup
const state = sessionStates.get(ws.data.sessionId);
if (state?.dirty) {
updateSessionStats(ws.data.sessionId, state);
}
sessionStates.delete(ws.data.sessionId);
// Mark session as ended if not already // Mark session as ended if not already
const sessionId = ws.data.sessionId; const sessionId = ws.data.sessionId;
endSession(sessionId); endSession(sessionId);
@ -446,4 +572,7 @@ setInterval(() => {
} }
}, 30_000); }, 30_000);
// Periodic persistence of dirty session states (every 5s)
setInterval(persistDirtySessions, 5000);
export { server }; export { server };