Add SSE cleanup

This commit is contained in:
Jared Miller 2026-01-31 09:06:16 -05:00
parent 721bff81d0
commit a8eea4e694
Signed by: shmup
GPG key ID: 22B5C6D66A38B06C

View file

@ -30,6 +30,8 @@ import type {
const sseClients = new Set<ReadableStreamDefaultController<string>>(); const sseClients = new Set<ReadableStreamDefaultController<string>>();
const sessionWebSockets = new Map<number, ServerWebSocket<SessionData>>(); const sessionWebSockets = new Map<number, ServerWebSocket<SessionData>>();
const sessionStates = new Map<number, SessionState>(); const sessionStates = new Map<number, SessionState>();
// Buffer for incomplete ANSI sequences per session to avoid leaking partial control codes
const ansiCarryovers = new Map<number, string>();
interface SessionData { interface SessionData {
deviceId: number; deviceId: number;
@ -79,6 +81,55 @@ function broadcastSSE(event: SSEEvent): void {
} }
} }
// Detect and split any incomplete ANSI control sequence at the end of a chunk.
// Returns [body, carry] where carry should be saved and prepended to the next chunk.
function splitAnsiCarryover(chunk: string): [string, string] {
if (!chunk) return ["", ""];
const ESC = 0x1b;
const len = chunk.length;
// If last char is ESC, entire ESC starts a sequence we can't complete
if (chunk.charCodeAt(len - 1) === ESC) {
return [chunk.slice(0, -1), "\x1b"];
}
// Search from the last ESC backwards for a potentially incomplete sequence
for (let i = len - 2; i >= 0; i--) {
if (chunk.charCodeAt(i) !== ESC) continue;
const next = chunk[i + 1];
// OSC: ESC ] ... (terminated by BEL 0x07 or ST = ESC \
if (next === "]") {
const tail = chunk.slice(i + 2);
const hasBEL = tail.indexOf("\x07") !== -1;
const hasST = tail.indexOf("\x1b\\") !== -1;
if (!hasBEL && !hasST) {
return [chunk.slice(0, i), chunk.slice(i)];
}
// else complete; continue scanning
continue;
}
// CSI: ESC [ params final — ensure we have a final byte 0x40-0x7E
if (next === "[") {
let j = i + 2;
while (j < len && /[0-9;?]/.test(chunk[j] || "")) j++;
// If we reached end without a final byte, it's incomplete
if (j >= len) {
return [chunk.slice(0, i), chunk.slice(i)];
}
// Final byte exists at chunk[j]; sequence complete
continue;
}
// DCS/PM/APC and other ESC-prefixed two-char intros — if ESC is near end and likely incomplete, carry
// If ESC is the penultimate and we're at end, treat as incomplete unknown sequence
if (i >= len - 2) {
return [chunk.slice(0, i), chunk.slice(i)];
}
// Otherwise, unknown but complete; continue
}
return [chunk, ""];
}
// Initialize database // Initialize database
const port = Number.parseInt(process.env.PORT || "7200", 10); const port = Number.parseInt(process.env.PORT || "7200", 10);
initDb(); initDb();
@ -484,11 +535,25 @@ const server = Bun.serve<SessionData>({
// Handle output message // Handle output message
if (msg.type === "output") { if (msg.type === "output") {
appendOutput(ws.data.sessionId, msg.data); // Store raw ANSI const sessionId = ws.data.sessionId;
// Join with any saved carryover from previous chunk
const prevCarry = ansiCarryovers.get(sessionId) || "";
const combined = prevCarry ? prevCarry + msg.data : msg.data;
// Determine if new tail is an incomplete control sequence and split
const [body, carry] = splitAnsiCarryover(combined);
if (carry) {
ansiCarryovers.set(sessionId, carry);
} else if (prevCarry) {
// Clear carry if previously set and now resolved
ansiCarryovers.delete(sessionId);
}
appendOutput(sessionId, body); // Store raw ANSI without trailing incomplete fragment
broadcastSSE({ broadcastSSE({
type: "output", type: "output",
session_id: ws.data.sessionId, session_id: sessionId,
data: ansiToHtml(msg.data), // Parse for display data: ansiToHtml(body), // Parse for display
}); });
return; return;
} }