// Core server: HTTP + WebSocket + SSE import type { ServerWebSocket } from "bun"; import { ansiToHtml } from "./ansi"; import { splitAnsiCarryover } from "./ansi-carryover"; import { appendOutput, createDevice, createPrompt, createSession, endAllSessions, endSession, getActiveSessions, getDeviceBySecret, getPrompt, getSession, initDb, respondToPrompt, updateLastSeen, updateSessionStats, } from "./db"; import { createTerminal, disposeTerminal, type TerminalSession } from "./terminal"; import type { AnswerResponse, ClientMessage, ServerMessage, SessionState, SSEEvent, } from "./types"; // Server state const sseClients = new Set>(); const sessionWebSockets = new Map>(); const sessionStates = new Map(); // Buffer for incomplete ANSI sequences per session to avoid leaking partial control codes const ansiCarryovers = new Map(); const sessionTerminals = new Map(); interface SessionData { deviceId: number; sessionId: number | null; } // Helper function to create default SessionState function createDefaultSessionState(): SessionState { return { state: "ready", prompts: 0, completions: 0, tools: 0, compressions: 0, thinking_seconds: 0, work_seconds: 0, mode: "normal", model: null, idle_since: null, git_branch: null, git_files_json: null, dirty: false, }; } // Persist dirty sessions to database (throttled, called every 5s) function persistDirtySessions(): void { for (const [sessionId, state] of sessionStates.entries()) { if (state.dirty) { updateSessionStats(sessionId, state); state.dirty = false; } } } // Broadcast SSE event to all connected dashboards function broadcastSSE(event: SSEEvent): void { const eventStr = `event: ${event.type}\ndata: ${JSON.stringify(event)}\n\n`; for (const controller of sseClients) { try { controller.enqueue(eventStr); } catch (error) { // Client disconnected, clean up console.debug("SSE client write error:", error); sseClients.delete(controller); } } } // Initialize database const port = Number.parseInt(process.env.PORT || "7200", 10); initDb(); // Clean up ghost sessions from previous runs const cleanedSessions = endAllSessions(); if (cleanedSessions > 0) { console.log( `Cleaned up ${cleanedSessions} ghost session(s) from previous runs`, ); } // Auto-seed device from env if configured const envSecret = process.env.CLAUDE_REMOTE_SECRET; if (envSecret) { const existing = getDeviceBySecret(envSecret); if (!existing) { const name = process.env.CLAUDE_REMOTE_DEVICE_NAME || "default"; createDevice(envSecret, name); console.log(`Auto-seeded device: ${name}`); } } // Start server const server = Bun.serve({ port, idleTimeout: 0, // Disable timeout for long-lived SSE connections async fetch(req, server) { const url = new URL(req.url); // Upgrade WebSocket connections if (url.pathname === "/ws") { const upgraded = server.upgrade(req, { data: { deviceId: 0, sessionId: null }, }); if (!upgraded) { return new Response("WebSocket upgrade failed", { status: 400 }); } return undefined; } // SSE endpoint for dashboard if (url.pathname === "/events") { let ctrl: ReadableStreamDefaultController; const stream = new ReadableStream({ start(controller) { ctrl = controller; sseClients.add(controller); // Send initial headers controller.enqueue(": connected\n\n"); }, cancel() { sseClients.delete(ctrl); }, }); return new Response(stream, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", }, }); } // REST API endpoints if (url.pathname === "/api/sessions" && req.method === "GET") { const sessions = getActiveSessions(); return Response.json(sessions); } // Resize endpoint for dashboard to resize CLI PTY if ( url.pathname.match(/^\/api\/sessions\/\d+\/resize$/) && req.method === "POST" ) { const parts = url.pathname.split("/"); const sessionId = Number.parseInt(parts[3] || "", 10); if (Number.isNaN(sessionId)) { return new Response("Invalid session ID", { status: 400 }); } const body = (await req.json()) as { cols?: unknown; rows?: unknown }; if ( typeof body.cols !== "number" || typeof body.rows !== "number" || body.cols <= 0 || body.rows <= 0 ) { return new Response("Missing or invalid cols/rows", { status: 400 }); } // Get WebSocket connection for this session const ws = sessionWebSockets.get(sessionId); if (!ws) { return new Response("Session WebSocket not found", { status: 404 }); } // Send resize command to CLI const message: ServerMessage = { type: "resize", cols: body.cols, rows: body.rows, }; ws.send(JSON.stringify(message)); return Response.json({ success: true }); } // Create prompt for a session if ( url.pathname.match(/^\/api\/sessions\/\d+\/prompts$/) && req.method === "POST" ) { const parts = url.pathname.split("/"); const sessionId = Number.parseInt(parts[3] || "", 10); if (Number.isNaN(sessionId)) { return new Response("Invalid session ID", { status: 400 }); } const session = getSession(sessionId); if (!session) { return new Response("Session not found", { status: 404 }); } const body = (await req.json()) as { prompt_text?: unknown }; if (!body.prompt_text || typeof body.prompt_text !== "string") { return new Response("Missing or invalid prompt_text", { status: 400 }); } const prompt = createPrompt(sessionId, body.prompt_text); // Broadcast to dashboards broadcastSSE({ type: "prompt", prompt_id: prompt.id, session_id: sessionId, prompt_text: prompt.prompt_text, prompt_json: prompt.prompt_json ? JSON.stringify(prompt.prompt_json) : null, }); return Response.json(prompt); } if (url.pathname.startsWith("/api/prompts/")) { const parts = url.pathname.split("/"); const promptId = Number.parseInt(parts[3] || "", 10); if (Number.isNaN(promptId)) { return new Response("Invalid prompt ID", { status: 400 }); } // New unified answer endpoint if (url.pathname.endsWith("/answer") && req.method === "POST") { const prompt = getPrompt(promptId); if (!prompt) { return new Response("Prompt not found", { status: 404 }); } if (prompt.response !== null) { return new Response("Prompt already responded", { status: 400 }); } const body = (await req.json()) as { response?: unknown }; if (!body.response || typeof body.response !== "object") { return new Response("Missing or invalid response", { status: 400 }); } const answer = body.response as AnswerResponse; // Get WebSocket connection const ws = sessionWebSockets.get(prompt.session_id); if (!ws) { return new Response("Session WebSocket not found", { status: 404 }); } // Determine response value for DB and broadcast let responseValue: string; if (answer.type === "option") { responseValue = answer.value; } else if (answer.type === "text") { responseValue = "custom"; } else if (answer.type === "tab_instructions") { responseValue = `tab:${answer.selected_option}`; } else { return new Response("Invalid response type", { status: 400 }); } // Mark as responded in DB (once) respondToPrompt(promptId, responseValue); // Broadcast to dashboards (once) broadcastSSE({ type: "prompt_response", prompt_id: promptId, response: responseValue, }); // Handle WebSocket input based on type if (answer.type === "option" || answer.type === "text") { // Synchronous send for simple responses const inputData = `${answer.value}\n`; const message: ServerMessage = { type: "input", data: inputData }; ws.send(JSON.stringify(message)); } else if (answer.type === "tab_instructions") { // Async execution for tab instructions const promptData = prompt.prompt_json; if (!promptData || promptData.prompt_type !== "permission") { return new Response( "Tab instructions only valid for permission prompts", { status: 400 }, ); } const currentOption = promptData.selected_option; const targetOption = answer.selected_option; // Execute key sequence asynchronously with delays (async () => { const sendInput = (data: string) => { const message: ServerMessage = { type: "input", data }; ws.send(JSON.stringify(message)); }; // Navigate to target option const diff = targetOption - currentOption; const arrow = diff > 0 ? "\x1b[B" : "\x1b[A"; for (let i = 0; i < Math.abs(diff); i++) { sendInput(arrow); await Bun.sleep(50); // Small delay between arrows } await Bun.sleep(100); sendInput("\t"); // Tab key await Bun.sleep(100); sendInput(answer.instruction); // Instruction text sendInput("\n"); // Enter key })().catch((err) => { console.error("Error executing tab instruction sequence:", err); }); } return Response.json({ success: true }); } // Legacy approve endpoint (backward compatibility) if (url.pathname.endsWith("/approve") && req.method === "POST") { const prompt = getPrompt(promptId); if (!prompt) { return new Response("Prompt not found", { status: 404 }); } if (prompt.response !== null) { return new Response("Prompt already responded", { status: 400 }); } respondToPrompt(promptId, "approve"); // Notify CLI via WebSocket (using "1\n" for option 1) const ws = sessionWebSockets.get(prompt.session_id); if (ws) { const message: ServerMessage = { type: "input", data: "1\n" }; ws.send(JSON.stringify(message)); } // Broadcast to dashboards broadcastSSE({ type: "prompt_response", prompt_id: promptId, response: "approve", }); return Response.json({ success: true }); } // Legacy reject endpoint (backward compatibility) if (url.pathname.endsWith("/reject") && req.method === "POST") { const prompt = getPrompt(promptId); if (!prompt) { return new Response("Prompt not found", { status: 404 }); } if (prompt.response !== null) { return new Response("Prompt already responded", { status: 400 }); } respondToPrompt(promptId, "reject"); // Notify CLI via WebSocket (using "3\n" for option 3) const ws = sessionWebSockets.get(prompt.session_id); if (ws) { const message: ServerMessage = { type: "input", data: "3\n" }; ws.send(JSON.stringify(message)); } // Broadcast to dashboards broadcastSSE({ type: "prompt_response", prompt_id: promptId, response: "reject", }); return Response.json({ success: true }); } } // Serve static files from public/ if (url.pathname === "/") { return new Response(Bun.file("public/index.html")); } // Handle other static files const filePath = `public${url.pathname}`; const file = Bun.file(filePath); if (await file.exists()) { return new Response(file); } return new Response("Not found", { status: 404 }); }, websocket: { open(_ws) { console.debug("WebSocket connected, awaiting auth"); }, async message(ws, message) { try { const msg: ClientMessage = JSON.parse(message.toString()); // Handle auth message if (msg.type === "auth") { const device = getDeviceBySecret(msg.secret); if (!device) { ws.close(1008, "Invalid secret"); return; } updateLastSeen(device.id); ws.data.deviceId = device.id; // Create new session const session = createSession( device.id, msg.cwd || null, msg.command || null, ); ws.data.sessionId = session.id; sessionWebSockets.set(session.id, ws); // Initialize in-memory session state sessionStates.set(session.id, createDefaultSessionState()); // Create terminal emulator with default size (will be resized later) const termSession = createTerminal(80, 24); sessionTerminals.set(session.id, termSession); console.debug( `Session ${session.id} started for device ${device.id}`, ); // Broadcast session start broadcastSSE({ type: "session_start", session_id: session.id, cwd: session.cwd, command: session.command, }); // Broadcast initial state and stats const initialState = sessionStates.get(session.id); if (initialState) { broadcastSSE({ type: "state", session_id: session.id, state: initialState.state, timestamp: Date.now(), }); broadcastSSE({ type: "stats", session_id: session.id, prompts: initialState.prompts, completions: initialState.completions, tools: initialState.tools, compressions: initialState.compressions, thinking_seconds: initialState.thinking_seconds, work_seconds: initialState.work_seconds, mode: initialState.mode, model: initialState.model, idle_since: initialState.idle_since, }); } ws.send( JSON.stringify({ type: "authenticated", session_id: session.id }), ); return; } // All other messages require authentication if (!ws.data.sessionId) { ws.close(1008, "Not authenticated"); return; } // Handle output message if (msg.type === "output") { const sessionId = ws.data.sessionId; // Join with any saved carryover from previous chunk const prevCarry = ansiCarryovers.get(sessionId) || ""; const combined = prevCarry ? prevCarry + msg.data : msg.data; // Determine if new tail is an incomplete control sequence and split const [body, carry] = splitAnsiCarryover(combined); if (carry) { console.debug( `Session ${sessionId}: ANSI carryover detected (${carry.length} bytes)`, ); ansiCarryovers.set(sessionId, carry); } else if (prevCarry) { console.debug(`Session ${sessionId}: ANSI carryover resolved`); // Clear carry if previously set and now resolved ansiCarryovers.delete(sessionId); } appendOutput(sessionId, body); // Store raw ANSI without trailing incomplete fragment // Write to terminal emulator const termSession = sessionTerminals.get(sessionId); if (termSession) { termSession.terminal.write(msg.data); } broadcastSSE({ type: "output", session_id: sessionId, data: ansiToHtml(body), // Parse for display }); return; } // Handle prompt message if (msg.type === "prompt") { const prompt = createPrompt(msg.session_id, msg.prompt_text); broadcastSSE({ type: "prompt", prompt_id: prompt.id, session_id: msg.session_id, prompt_text: msg.prompt_text, prompt_json: prompt.prompt_json ? JSON.stringify(prompt.prompt_json) : null, }); return; } // Handle state message if (msg.type === "state") { const sessionState = sessionStates.get(ws.data.sessionId); const validStates = [ "ready", "thinking", "permission", "question", "complete", "interrupted", ]; if (sessionState && msg.state && validStates.includes(msg.state)) { sessionState.state = msg.state as SessionState["state"]; sessionState.dirty = true; // Broadcast SSE state event broadcastSSE({ type: "state", session_id: ws.data.sessionId, state: msg.state, timestamp: msg.timestamp || Date.now(), }); } return; } // Handle stats message if (msg.type === "stats") { const sessionState = sessionStates.get(ws.data.sessionId); if (sessionState) { // Update all stats fields from the message sessionState.prompts = msg.prompts ?? sessionState.prompts; sessionState.completions = msg.completions ?? sessionState.completions; sessionState.tools = msg.tools ?? sessionState.tools; sessionState.compressions = msg.compressions ?? sessionState.compressions; sessionState.thinking_seconds = msg.thinking_seconds ?? sessionState.thinking_seconds; sessionState.work_seconds = msg.work_seconds ?? sessionState.work_seconds; sessionState.mode = msg.mode ?? sessionState.mode; sessionState.model = msg.model ?? sessionState.model; sessionState.idle_since = msg.idle_since ?? sessionState.idle_since; sessionState.dirty = true; // Broadcast SSE stats event broadcastSSE({ type: "stats", session_id: ws.data.sessionId, prompts: sessionState.prompts, completions: sessionState.completions, tools: sessionState.tools, compressions: sessionState.compressions, thinking_seconds: sessionState.thinking_seconds, work_seconds: sessionState.work_seconds, mode: sessionState.mode, model: sessionState.model, idle_since: sessionState.idle_since, }); } return; } // Handle git message if (msg.type === "git") { const sessionState = sessionStates.get(ws.data.sessionId); if (sessionState) { sessionState.git_branch = msg.branch; sessionState.git_files_json = JSON.stringify(msg.files); sessionState.dirty = true; // Broadcast SSE git event broadcastSSE({ type: "git", session_id: ws.data.sessionId, branch: msg.branch, files_json: JSON.stringify(msg.files), }); } return; } // Handle resize message if (msg.type === "resize") { // Store resize info if needed (not yet implemented) console.debug( `Session ${ws.data.sessionId} resized to ${msg.cols}x${msg.rows}`, ); // Resize terminal emulator const termSession = sessionTerminals.get(ws.data.sessionId); if (termSession) { termSession.terminal.resize(msg.cols, msg.rows); } return; } // Handle exit message if (msg.type === "exit") { endSession(ws.data.sessionId); sessionWebSockets.delete(ws.data.sessionId); // Persist final state before cleanup const state = sessionStates.get(ws.data.sessionId); if (state?.dirty) { updateSessionStats(ws.data.sessionId, state); } sessionStates.delete(ws.data.sessionId); console.debug( `Session ${ws.data.sessionId} ended with code ${msg.code}`, ); broadcastSSE({ type: "session_end", session_id: ws.data.sessionId, exit_code: msg.code, }); ws.close(1000, "Session ended"); return; } } catch (error) { console.error("WebSocket message error:", error); ws.close(1008, "Invalid message format"); } }, close(ws) { if (ws.data.sessionId) { sessionWebSockets.delete(ws.data.sessionId); ansiCarryovers.delete(ws.data.sessionId); // Dispose terminal emulator const termSession = sessionTerminals.get(ws.data.sessionId); if (termSession) { disposeTerminal(termSession); sessionTerminals.delete(ws.data.sessionId); } // Persist final state before cleanup const state = sessionStates.get(ws.data.sessionId); if (state?.dirty) { updateSessionStats(ws.data.sessionId, state); } sessionStates.delete(ws.data.sessionId); // Mark session as ended if not already const sessionId = ws.data.sessionId; endSession(sessionId); console.debug(`WebSocket closed for session ${sessionId}`); } }, }, }); console.log(`Server listening on http://localhost:${port}`); // Periodic ping to keep WebSocket connections alive (every 30s) setInterval(() => { for (const ws of sessionWebSockets.values()) { try { const ping: ServerMessage = { type: "ping" }; ws.send(JSON.stringify(ping)); } catch (error) { console.debug("Ping error:", error); } } }, 30_000); // Periodic persistence of dirty session states (every 5s) setInterval(persistDirtySessions, 5000); export { server };