// Core server: HTTP + WebSocket + SSE import type { ServerWebSocket } from "bun"; import { appendOutput, createSession, endSession, getActiveSessions, getDeviceBySecret, getPrompt, initDb, respondToPrompt, updateLastSeen, } from "./db"; import type { ClientMessage, ServerMessage, SSEEvent } from "./types"; // Server state const sseClients = new Set>(); const sessionWebSockets = new Map>(); interface SessionData { deviceId: number; sessionId: number | null; } // Broadcast SSE event to all connected dashboards function broadcastSSE(event: SSEEvent): void { const eventStr = `event: ${event.type}\ndata: ${JSON.stringify(event)}\n\n`; for (const controller of sseClients) { try { controller.enqueue(eventStr); } catch (error) { // Client disconnected, will be cleaned up console.debug("SSE client write error:", error); } } } // Initialize database const port = Number.parseInt(process.env.PORT || "3000", 10); initDb(); // Start server const server = Bun.serve({ port, async fetch(req, server) { const url = new URL(req.url); // Upgrade WebSocket connections if (url.pathname === "/ws") { const upgraded = server.upgrade(req, { data: { deviceId: 0, sessionId: null }, }); if (!upgraded) { return new Response("WebSocket upgrade failed", { status: 400 }); } return undefined; } // SSE endpoint for dashboard if (url.pathname === "/events") { const stream = new ReadableStream({ start(controller) { sseClients.add(controller); // Send initial headers controller.enqueue(": connected\n\n"); }, cancel() { sseClients.delete( this as unknown as ReadableStreamDefaultController, ); }, }); return new Response(stream, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", }, }); } // REST API endpoints if (url.pathname === "/api/sessions" && req.method === "GET") { const sessions = getActiveSessions(); return Response.json(sessions); } if (url.pathname.startsWith("/api/prompts/")) { const parts = url.pathname.split("/"); const promptId = Number.parseInt(parts[3] || "", 10); if (Number.isNaN(promptId)) { return new Response("Invalid prompt ID", { status: 400 }); } if (url.pathname.endsWith("/approve") && req.method === "POST") { const prompt = getPrompt(promptId); if (!prompt) { return new Response("Prompt not found", { status: 404 }); } if (prompt.response !== null) { return new Response("Prompt already responded", { status: 400 }); } respondToPrompt(promptId, "approve"); // Notify CLI via WebSocket const ws = sessionWebSockets.get(prompt.session_id); if (ws) { const message: ServerMessage = { type: "input", data: "y\n" }; ws.send(JSON.stringify(message)); } // Broadcast to dashboards broadcastSSE({ type: "prompt_response", prompt_id: promptId, response: "approve", }); return Response.json({ success: true }); } if (url.pathname.endsWith("/reject") && req.method === "POST") { const prompt = getPrompt(promptId); if (!prompt) { return new Response("Prompt not found", { status: 404 }); } if (prompt.response !== null) { return new Response("Prompt already responded", { status: 400 }); } respondToPrompt(promptId, "reject"); // Notify CLI via WebSocket const ws = sessionWebSockets.get(prompt.session_id); if (ws) { const message: ServerMessage = { type: "input", data: "n\n" }; ws.send(JSON.stringify(message)); } // Broadcast to dashboards broadcastSSE({ type: "prompt_response", prompt_id: promptId, response: "reject", }); return Response.json({ success: true }); } } // Serve static files from public/ if (url.pathname === "/") { return new Response(Bun.file("public/index.html")); } // Handle other static files const filePath = `public${url.pathname}`; const file = Bun.file(filePath); if (await file.exists()) { return new Response(file); } return new Response("Not found", { status: 404 }); }, websocket: { open(_ws) { console.debug("WebSocket connected, awaiting auth"); }, async message(ws, message) { try { const msg: ClientMessage = JSON.parse(message.toString()); // Handle auth message if (msg.type === "auth") { const device = getDeviceBySecret(msg.secret); if (!device) { ws.send( JSON.stringify({ type: "error", message: "Invalid secret" }), ); ws.close(1008, "Invalid secret"); return; } updateLastSeen(device.id); ws.data.deviceId = device.id; // Create new session const session = createSession( device.id, msg.cwd || null, msg.command || null, ); ws.data.sessionId = session.id; sessionWebSockets.set(session.id, ws); console.debug( `Session ${session.id} started for device ${device.id}`, ); // Broadcast session start broadcastSSE({ type: "session_start", session_id: session.id, cwd: session.cwd, command: session.command, }); ws.send( JSON.stringify({ type: "authenticated", session_id: session.id }), ); return; } // All other messages require authentication if (!ws.data.sessionId) { ws.send( JSON.stringify({ type: "error", message: "Not authenticated" }), ); return; } // Handle output message if (msg.type === "output") { appendOutput(ws.data.sessionId, msg.data); broadcastSSE({ type: "output", session_id: ws.data.sessionId, data: msg.data, }); return; } // Handle resize message if (msg.type === "resize") { // Store resize info if needed (not yet implemented) console.debug( `Session ${ws.data.sessionId} resized to ${msg.cols}x${msg.rows}`, ); return; } // Handle exit message if (msg.type === "exit") { endSession(ws.data.sessionId); sessionWebSockets.delete(ws.data.sessionId); console.debug( `Session ${ws.data.sessionId} ended with code ${msg.code}`, ); broadcastSSE({ type: "session_end", session_id: ws.data.sessionId, exit_code: msg.code, }); ws.close(1000, "Session ended"); return; } } catch (error) { console.error("WebSocket message error:", error); ws.send( JSON.stringify({ type: "error", message: "Invalid message format" }), ); } }, close(ws) { if (ws.data.sessionId) { sessionWebSockets.delete(ws.data.sessionId); // Mark session as ended if not already const sessionId = ws.data.sessionId; endSession(sessionId); console.debug(`WebSocket closed for session ${sessionId}`); } }, }, }); console.log(`Server listening on http://localhost:${port}`); // Periodic ping to keep WebSocket connections alive (every 30s) setInterval(() => { for (const ws of sessionWebSockets.values()) { try { const ping: ServerMessage = { type: "ping" }; ws.send(JSON.stringify(ping)); } catch (error) { console.debug("Ping error:", error); } } }, 30_000); export { server };