clarc/src/server.ts
Jared Miller acd75b0303
Implement key sequence execution for tab instructions
Execute the tab instruction flow:
- Navigate to target option using arrow keys
- Send Tab to enter instruction mode
- Type the instruction text
- Send Enter to submit

The sequence runs asynchronously with proper delays between steps
(50ms between arrows, 100ms before Tab, 100ms before instruction).
2026-01-28 13:13:51 -05:00

472 lines
14 KiB
TypeScript

// Core server: HTTP + WebSocket + SSE
import type { ServerWebSocket } from "bun";
import {
appendOutput,
createPrompt,
createSession,
endSession,
getActiveSessions,
getDeviceBySecret,
getPrompt,
getSession,
initDb,
respondToPrompt,
updateLastSeen,
} from "./db";
import type {
AnswerResponse,
ClientMessage,
ServerMessage,
SSEEvent,
} from "./types";
// Server state
const sseClients = new Set<ReadableStreamDefaultController<string>>();
const sessionWebSockets = new Map<number, ServerWebSocket<SessionData>>();
interface SessionData {
deviceId: number;
sessionId: number | null;
}
// Broadcast SSE event to all connected dashboards
function broadcastSSE(event: SSEEvent): void {
const eventStr = `event: ${event.type}\ndata: ${JSON.stringify(event)}\n\n`;
for (const controller of sseClients) {
try {
controller.enqueue(eventStr);
} catch (error) {
// Client disconnected, clean up
console.debug("SSE client write error:", error);
sseClients.delete(controller);
}
}
}
// Initialize database
const port = Number.parseInt(process.env.PORT || "3000", 10);
initDb();
// Start server
const server = Bun.serve<SessionData>({
port,
async fetch(req, server) {
const url = new URL(req.url);
// Upgrade WebSocket connections
if (url.pathname === "/ws") {
const upgraded = server.upgrade(req, {
data: { deviceId: 0, sessionId: null },
});
if (!upgraded) {
return new Response("WebSocket upgrade failed", { status: 400 });
}
return undefined;
}
// SSE endpoint for dashboard
if (url.pathname === "/events") {
let ctrl: ReadableStreamDefaultController<string>;
const stream = new ReadableStream<string>({
start(controller) {
ctrl = controller;
sseClients.add(controller);
// Send initial headers
controller.enqueue(": connected\n\n");
},
cancel() {
sseClients.delete(ctrl);
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
// REST API endpoints
if (url.pathname === "/api/sessions" && req.method === "GET") {
const sessions = getActiveSessions();
return Response.json(sessions);
}
// Create prompt for a session
if (
url.pathname.match(/^\/api\/sessions\/\d+\/prompts$/) &&
req.method === "POST"
) {
const parts = url.pathname.split("/");
const sessionId = Number.parseInt(parts[3] || "", 10);
if (Number.isNaN(sessionId)) {
return new Response("Invalid session ID", { status: 400 });
}
const session = getSession(sessionId);
if (!session) {
return new Response("Session not found", { status: 404 });
}
const body = (await req.json()) as { prompt_text?: unknown };
if (!body.prompt_text || typeof body.prompt_text !== "string") {
return new Response("Missing or invalid prompt_text", { status: 400 });
}
const prompt = createPrompt(sessionId, body.prompt_text);
// Broadcast to dashboards
broadcastSSE({
type: "prompt",
prompt_id: prompt.id,
session_id: sessionId,
prompt_text: prompt.prompt_text,
prompt_json: prompt.prompt_json
? JSON.stringify(prompt.prompt_json)
: null,
});
return Response.json(prompt);
}
if (url.pathname.startsWith("/api/prompts/")) {
const parts = url.pathname.split("/");
const promptId = Number.parseInt(parts[3] || "", 10);
if (Number.isNaN(promptId)) {
return new Response("Invalid prompt ID", { status: 400 });
}
// New unified answer endpoint
if (url.pathname.endsWith("/answer") && req.method === "POST") {
const prompt = getPrompt(promptId);
if (!prompt) {
return new Response("Prompt not found", { status: 404 });
}
if (prompt.response !== null) {
return new Response("Prompt already responded", { status: 400 });
}
const body = (await req.json()) as { response?: unknown };
if (!body.response || typeof body.response !== "object") {
return new Response("Missing or invalid response", { status: 400 });
}
const answer = body.response as AnswerResponse;
let inputData: string;
// Handle each response type
if (answer.type === "option") {
inputData = `${answer.value}\n`;
} else if (answer.type === "text") {
inputData = `${answer.value}\n`;
} else if (answer.type === "tab_instructions") {
// Execute key sequence for tab instructions with delays:
// 1. Navigate to selected_option (up/down arrows)
// 2. Wait 100ms
// 3. Send Tab key
// 4. Wait 100ms
// 5. Write instruction text
// 6. Send Enter key
const promptData = prompt.prompt_json;
if (!promptData || promptData.prompt_type !== "permission") {
return new Response("Tab instructions only valid for permission prompts", { status: 400 });
}
const currentOption = promptData.selected_option;
const targetOption = answer.selected_option;
// Mark as responded in DB first
respondToPrompt(
promptId,
`tab:${answer.selected_option}`,
);
// Broadcast to dashboards
broadcastSSE({
type: "prompt_response",
prompt_id: promptId,
response: answer.selected_option === 1 ? "approve" : "reject",
});
// Get WebSocket for async execution
const ws = sessionWebSockets.get(prompt.session_id);
if (!ws) {
return new Response("Session WebSocket not found", { status: 404 });
}
// Execute key sequence asynchronously with delays
(async () => {
const sendInput = (data: string) => {
const message: ServerMessage = { type: "input", data };
ws.send(JSON.stringify(message));
};
// Navigate to target option
const diff = targetOption - currentOption;
const arrow = diff > 0 ? '\x1b[B' : '\x1b[A';
for (let i = 0; i < Math.abs(diff); i++) {
sendInput(arrow);
await Bun.sleep(50); // Small delay between arrows
}
await Bun.sleep(100);
sendInput('\t'); // Tab key
await Bun.sleep(100);
sendInput(answer.instruction); // Instruction text
sendInput('\n'); // Enter key
})().catch((err) => {
console.error("Error executing tab instruction sequence:", err);
});
// Return immediately (async execution continues in background)
return Response.json({ success: true });
} else {
return new Response("Invalid response type", { status: 400 });
}
// Mark as responded in DB
respondToPrompt(
promptId,
answer.type === "option" ? answer.value : "custom",
);
// Notify CLI via WebSocket
const ws = sessionWebSockets.get(prompt.session_id);
if (ws) {
const message: ServerMessage = { type: "input", data: inputData };
ws.send(JSON.stringify(message));
}
// Broadcast to dashboards
broadcastSSE({
type: "prompt_response",
prompt_id: promptId,
response:
answer.type === "option" && answer.value === "1"
? "approve"
: "reject",
});
return Response.json({ success: true });
}
// Legacy approve endpoint (backward compatibility)
if (url.pathname.endsWith("/approve") && req.method === "POST") {
const prompt = getPrompt(promptId);
if (!prompt) {
return new Response("Prompt not found", { status: 404 });
}
if (prompt.response !== null) {
return new Response("Prompt already responded", { status: 400 });
}
respondToPrompt(promptId, "approve");
// Notify CLI via WebSocket (using "1\n" for option 1)
const ws = sessionWebSockets.get(prompt.session_id);
if (ws) {
const message: ServerMessage = { type: "input", data: "1\n" };
ws.send(JSON.stringify(message));
}
// Broadcast to dashboards
broadcastSSE({
type: "prompt_response",
prompt_id: promptId,
response: "approve",
});
return Response.json({ success: true });
}
// Legacy reject endpoint (backward compatibility)
if (url.pathname.endsWith("/reject") && req.method === "POST") {
const prompt = getPrompt(promptId);
if (!prompt) {
return new Response("Prompt not found", { status: 404 });
}
if (prompt.response !== null) {
return new Response("Prompt already responded", { status: 400 });
}
respondToPrompt(promptId, "reject");
// Notify CLI via WebSocket (using "3\n" for option 3)
const ws = sessionWebSockets.get(prompt.session_id);
if (ws) {
const message: ServerMessage = { type: "input", data: "3\n" };
ws.send(JSON.stringify(message));
}
// Broadcast to dashboards
broadcastSSE({
type: "prompt_response",
prompt_id: promptId,
response: "reject",
});
return Response.json({ success: true });
}
}
// Serve static files from public/
if (url.pathname === "/") {
return new Response(Bun.file("public/index.html"));
}
// Handle other static files
const filePath = `public${url.pathname}`;
const file = Bun.file(filePath);
if (await file.exists()) {
return new Response(file);
}
return new Response("Not found", { status: 404 });
},
websocket: {
open(_ws) {
console.debug("WebSocket connected, awaiting auth");
},
async message(ws, message) {
try {
const msg: ClientMessage = JSON.parse(message.toString());
// Handle auth message
if (msg.type === "auth") {
const device = getDeviceBySecret(msg.secret);
if (!device) {
ws.close(1008, "Invalid secret");
return;
}
updateLastSeen(device.id);
ws.data.deviceId = device.id;
// Create new session
const session = createSession(
device.id,
msg.cwd || null,
msg.command || null,
);
ws.data.sessionId = session.id;
sessionWebSockets.set(session.id, ws);
console.debug(
`Session ${session.id} started for device ${device.id}`,
);
// Broadcast session start
broadcastSSE({
type: "session_start",
session_id: session.id,
cwd: session.cwd,
command: session.command,
});
ws.send(
JSON.stringify({ type: "authenticated", session_id: session.id }),
);
return;
}
// All other messages require authentication
if (!ws.data.sessionId) {
ws.close(1008, "Not authenticated");
return;
}
// Handle output message
if (msg.type === "output") {
appendOutput(ws.data.sessionId, msg.data);
broadcastSSE({
type: "output",
session_id: ws.data.sessionId,
data: msg.data,
});
return;
}
// Handle prompt message
if (msg.type === "prompt") {
const prompt = createPrompt(msg.session_id, msg.prompt_text);
broadcastSSE({
type: "prompt",
prompt_id: prompt.id,
session_id: msg.session_id,
prompt_text: msg.prompt_text,
prompt_json: prompt.prompt_json
? JSON.stringify(prompt.prompt_json)
: null,
});
return;
}
// Handle resize message
if (msg.type === "resize") {
// Store resize info if needed (not yet implemented)
console.debug(
`Session ${ws.data.sessionId} resized to ${msg.cols}x${msg.rows}`,
);
return;
}
// Handle exit message
if (msg.type === "exit") {
endSession(ws.data.sessionId);
sessionWebSockets.delete(ws.data.sessionId);
console.debug(
`Session ${ws.data.sessionId} ended with code ${msg.code}`,
);
broadcastSSE({
type: "session_end",
session_id: ws.data.sessionId,
exit_code: msg.code,
});
ws.close(1000, "Session ended");
return;
}
} catch (error) {
console.error("WebSocket message error:", error);
ws.close(1008, "Invalid message format");
}
},
close(ws) {
if (ws.data.sessionId) {
sessionWebSockets.delete(ws.data.sessionId);
// Mark session as ended if not already
const sessionId = ws.data.sessionId;
endSession(sessionId);
console.debug(`WebSocket closed for session ${sessionId}`);
}
},
},
});
console.log(`Server listening on http://localhost:${port}`);
// Periodic ping to keep WebSocket connections alive (every 30s)
setInterval(() => {
for (const ws of sessionWebSockets.values()) {
try {
const ping: ServerMessage = { type: "ping" };
ws.send(JSON.stringify(ping));
} catch (error) {
console.debug("Ping error:", error);
}
}
}, 30_000);
export { server };