diff --git a/src/server.ts b/src/server.ts index 2a5e4b8..f5d087f 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1 +1,300 @@ -console.log("Hello via Bun!"); +// Core server: HTTP + WebSocket + SSE + +import type { ServerWebSocket } from "bun"; +import { + appendOutput, + createSession, + endSession, + getActiveSessions, + getDeviceBySecret, + getPrompt, + initDb, + respondToPrompt, + updateLastSeen, +} from "./db"; +import type { ClientMessage, ServerMessage, SSEEvent } from "./types"; + +// Server state +const sseClients = new Set>(); +const sessionWebSockets = new Map>(); + +interface SessionData { + deviceId: number; + sessionId: number | null; +} + +// Broadcast SSE event to all connected dashboards +function broadcastSSE(event: SSEEvent): void { + const eventStr = `event: ${event.type}\ndata: ${JSON.stringify(event)}\n\n`; + for (const controller of sseClients) { + try { + controller.enqueue(eventStr); + } catch (error) { + // Client disconnected, will be cleaned up + console.debug("SSE client write error:", error); + } + } +} + +// Initialize database +const port = Number.parseInt(process.env.PORT || "3000", 10); +initDb(); + +// Start server +const server = Bun.serve({ + port, + async fetch(req, server) { + const url = new URL(req.url); + + // Upgrade WebSocket connections + if (url.pathname === "/ws") { + const upgraded = server.upgrade(req, { + data: { deviceId: 0, sessionId: null }, + }); + if (!upgraded) { + return new Response("WebSocket upgrade failed", { status: 400 }); + } + return undefined; + } + + // SSE endpoint for dashboard + if (url.pathname === "/events") { + const stream = new ReadableStream({ + start(controller) { + sseClients.add(controller); + // Send initial headers + controller.enqueue(": connected\n\n"); + }, + cancel() { + sseClients.delete( + this as unknown as ReadableStreamDefaultController, + ); + }, + }); + + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }, + }); + } + + // REST API endpoints + if (url.pathname === "/api/sessions" && req.method === "GET") { + const sessions = getActiveSessions(); + return Response.json(sessions); + } + + if (url.pathname.startsWith("/api/prompts/")) { + const parts = url.pathname.split("/"); + const promptId = Number.parseInt(parts[3] || "", 10); + + if (Number.isNaN(promptId)) { + return new Response("Invalid prompt ID", { status: 400 }); + } + + if (url.pathname.endsWith("/approve") && req.method === "POST") { + const prompt = getPrompt(promptId); + if (!prompt) { + return new Response("Prompt not found", { status: 404 }); + } + if (prompt.response !== null) { + return new Response("Prompt already responded", { status: 400 }); + } + + respondToPrompt(promptId, "approve"); + + // Notify CLI via WebSocket + const ws = sessionWebSockets.get(prompt.session_id); + if (ws) { + const message: ServerMessage = { type: "input", data: "y\n" }; + ws.send(JSON.stringify(message)); + } + + // Broadcast to dashboards + broadcastSSE({ + type: "prompt_response", + prompt_id: promptId, + response: "approve", + }); + + return Response.json({ success: true }); + } + + if (url.pathname.endsWith("/reject") && req.method === "POST") { + const prompt = getPrompt(promptId); + if (!prompt) { + return new Response("Prompt not found", { status: 404 }); + } + if (prompt.response !== null) { + return new Response("Prompt already responded", { status: 400 }); + } + + respondToPrompt(promptId, "reject"); + + // Notify CLI via WebSocket + const ws = sessionWebSockets.get(prompt.session_id); + if (ws) { + const message: ServerMessage = { type: "input", data: "n\n" }; + ws.send(JSON.stringify(message)); + } + + // Broadcast to dashboards + broadcastSSE({ + type: "prompt_response", + prompt_id: promptId, + response: "reject", + }); + + return Response.json({ success: true }); + } + } + + // Serve static files from public/ + if (url.pathname === "/") { + return new Response(Bun.file("public/index.html")); + } + + // Handle other static files + const filePath = `public${url.pathname}`; + const file = Bun.file(filePath); + if (await file.exists()) { + return new Response(file); + } + + return new Response("Not found", { status: 404 }); + }, + + websocket: { + open(_ws) { + console.debug("WebSocket connected, awaiting auth"); + }, + + async message(ws, message) { + try { + const msg: ClientMessage = JSON.parse(message.toString()); + + // Handle auth message + if (msg.type === "auth") { + const device = getDeviceBySecret(msg.secret); + if (!device) { + ws.send( + JSON.stringify({ type: "error", message: "Invalid secret" }), + ); + ws.close(1008, "Invalid secret"); + return; + } + + updateLastSeen(device.id); + ws.data.deviceId = device.id; + + // Create new session + const session = createSession( + device.id, + msg.cwd || null, + msg.command || null, + ); + ws.data.sessionId = session.id; + sessionWebSockets.set(session.id, ws); + + console.debug( + `Session ${session.id} started for device ${device.id}`, + ); + + // Broadcast session start + broadcastSSE({ + type: "session_start", + session_id: session.id, + cwd: session.cwd, + command: session.command, + }); + + ws.send( + JSON.stringify({ type: "authenticated", session_id: session.id }), + ); + return; + } + + // All other messages require authentication + if (!ws.data.sessionId) { + ws.send( + JSON.stringify({ type: "error", message: "Not authenticated" }), + ); + return; + } + + // Handle output message + if (msg.type === "output") { + appendOutput(ws.data.sessionId, msg.data); + broadcastSSE({ + type: "output", + session_id: ws.data.sessionId, + data: msg.data, + }); + return; + } + + // Handle resize message + if (msg.type === "resize") { + // Store resize info if needed (not yet implemented) + console.debug( + `Session ${ws.data.sessionId} resized to ${msg.cols}x${msg.rows}`, + ); + return; + } + + // Handle exit message + if (msg.type === "exit") { + endSession(ws.data.sessionId); + sessionWebSockets.delete(ws.data.sessionId); + + console.debug( + `Session ${ws.data.sessionId} ended with code ${msg.code}`, + ); + + broadcastSSE({ + type: "session_end", + session_id: ws.data.sessionId, + exit_code: msg.code, + }); + + ws.close(1000, "Session ended"); + return; + } + } catch (error) { + console.error("WebSocket message error:", error); + ws.send( + JSON.stringify({ type: "error", message: "Invalid message format" }), + ); + } + }, + + close(ws) { + if (ws.data.sessionId) { + sessionWebSockets.delete(ws.data.sessionId); + // Mark session as ended if not already + const sessionId = ws.data.sessionId; + endSession(sessionId); + console.debug(`WebSocket closed for session ${sessionId}`); + } + }, + }, +}); + +console.log(`Server listening on http://localhost:${port}`); + +// Periodic ping to keep WebSocket connections alive (every 30s) +setInterval(() => { + for (const ws of sessionWebSockets.values()) { + try { + const ping: ServerMessage = { type: "ping" }; + ws.send(JSON.stringify(ping)); + } catch (error) { + console.debug("Ping error:", error); + } + } +}, 30_000); + +export { server };