colabbd/adapters/vim/bridge.ts

185 lines
4.7 KiB
TypeScript

#!/usr/bin/env bun
// bridge between vim (stdin/stdout json lines) and collabd (websocket)
// vim spawns this process and communicates via channels
import * as Y from "yjs";
const DAEMON_URL = process.env.COLLABD_URL || "ws://localhost:4040/ws";
let ws: WebSocket | null = null;
let doc: Y.Doc | null = null;
let text: Y.Text | null = null;
let suppressLocal = 0;
function send(msg: object) {
console.log(JSON.stringify(msg));
}
// signal to vim that bridge is ready
send({ type: "ready" });
function connect(roomName: string) {
doc = new Y.Doc();
text = doc.getText("content");
// when remote changes come in, notify vim
text.observe(() => {
if (suppressLocal === 0) {
send({ type: "content", text: text?.toString() || "" });
}
});
ws = new WebSocket(DAEMON_URL);
ws.onopen = () => {
ws?.send(JSON.stringify({ type: "join", room: roomName }));
send({ type: "connected", room: roomName });
};
ws.onmessage = (ev) => {
const msg = JSON.parse(ev.data.toString());
switch (msg.type) {
case "sync":
case "update": {
if (!doc) break;
try {
suppressLocal++;
Y.applyUpdate(doc, new Uint8Array(msg.data));
suppressLocal--;
send({ type: "content", text: text?.toString() || "" });
} catch (err) {
suppressLocal--;
console.error("failed to apply yjs update:", err);
send({ type: "error", message: "failed to apply remote update" });
}
break;
}
case "peers": {
send({ type: "peers", count: msg.count });
break;
}
case "awareness": {
// Forward cursor info to vim
if (msg.data?.cursor) {
send({
type: "cursor",
data: {
clientId: msg.data.clientId,
line: msg.data.cursor.line,
col: msg.data.cursor.col,
name: msg.data.name || `peer-${msg.data.clientId}`,
},
});
}
break;
}
}
};
ws.onclose = () => {
send({ type: "disconnected" });
};
ws.onerror = () => {
send({ type: "error", message: "websocket error" });
};
}
// compute minimal edit operations using LCS-based diff
function computeDiff(oldText: string, newText: string) {
// find common prefix
let prefixLen = 0;
while (
prefixLen < oldText.length &&
prefixLen < newText.length &&
oldText[prefixLen] === newText[prefixLen]
) {
prefixLen++;
}
// find common suffix
let suffixLen = 0;
while (
suffixLen < oldText.length - prefixLen &&
suffixLen < newText.length - prefixLen &&
oldText[oldText.length - 1 - suffixLen] ===
newText[newText.length - 1 - suffixLen]
) {
suffixLen++;
}
const deleteStart = prefixLen;
const deleteLen = oldText.length - prefixLen - suffixLen;
const insertText = newText.slice(prefixLen, newText.length - suffixLen);
return { deleteStart, deleteLen, insertText };
}
function setContent(newContent: string) {
if (!doc || !text || !ws) return;
const oldContent = text.toString();
if (oldContent === newContent) return;
// compute minimal diff and apply
const { deleteStart, deleteLen, insertText } = computeDiff(
oldContent,
newContent,
);
const t = text;
doc.transact(() => {
if (deleteLen > 0) {
t.delete(deleteStart, deleteLen);
}
if (insertText.length > 0) {
t.insert(deleteStart, insertText);
}
});
// send update to daemon
const update = Y.encodeStateAsUpdate(doc);
ws.send(JSON.stringify({ type: "update", data: Array.from(update) }));
}
// read json lines from stdin
let buffer = "";
const decoder = new TextDecoder();
for await (const chunk of Bun.stdin.stream()) {
buffer += decoder.decode(chunk);
const lines = buffer.split("\n");
buffer = lines.pop() ?? ""; // keep incomplete last line
for (const line of lines) {
if (!line.trim()) continue;
try {
const msg = JSON.parse(line);
switch (msg.type) {
case "connect":
connect(msg.room);
break;
case "content":
setContent(msg.text);
break;
case "cursor":
if (ws && msg.line !== undefined && msg.col !== undefined) {
ws.send(
JSON.stringify({
type: "awareness",
data: {
clientId: Math.floor(Math.random() * 1000000),
cursor: { line: msg.line, col: msg.col },
},
}),
);
}
break;
case "disconnect":
ws?.close();
send({ type: "disconnected" });
process.exit(0);
}
} catch {
send({ type: "error", message: "invalid json" });
}
}
}