Compare commits

..

No commits in common. "05629a00a033bbf4d1444af29ce852b886d91daf" and "3cf16586aa7abe25a7230e13d957cd55e30ef24e" have entirely different histories.

9 changed files with 44 additions and 855 deletions

106
CLAUDE.md
View file

@ -1,106 +0,0 @@
# collabd - editor-agnostic collaborative editing daemon
daemon + thin adapters architecture. daemon handles yjs crdt, adapters just
hook buffer events and apply remote changes.
## status
working:
- daemon with room-based sessions
- multi-peer sync (tested with 2+ clients)
- vim9 adapter with live buffer sync
- proper crdt diffing (not delete-all-insert-all)
- integration tests for concurrent edits
not yet:
- cursor/selection sync (awareness protocol stubbed but unused)
- other editor adapters (helix, kakoune, zed)
- persistence (rooms are ephemeral)
- reconnection handling
## stack
- bun runtime
- yjs for crdt
- websocket transport
- vim9script adapter (with bun bridge since vim cant do websocket)
## running
```bash
just dev # daemon on :4040
bun test # unit tests
just check # biome lint
```
## vim adapter usage
requires Vim 9.0+ (uses vim9script)
```vim
:source adapters/vim/collab.vim
:CollabJoin roomname
:CollabLeave
:CollabStatus
```
the vim plugin spawns adapters/vim/bridge.ts which handles yjs and speaks
json lines to vim via channels.
## protocol
daemon speaks json over websocket at /ws
client -> server:
{ type: "join", room: "name" }
{ type: "leave" }
{ type: "update", data: [...] } // yjs update bytes
server -> client:
{ type: "sync", data: [...] } // full yjs state on join
{ type: "update", data: [...] } // remote changes
{ type: "peers", count: N }
## key files
- src/index.ts - websocket server, room routing
- src/session.ts - yjs doc per room, peer management
- src/protocol.ts - message types
- adapters/vim/bridge.ts - bun process vim spawns
- adapters/vim/collab.vim - vim9script plugin
## adding new editor adapters
each adapter needs:
1. hook buffer change events
2. send changes to daemon as yjs updates (or use a bridge like vim does)
3. receive remote updates and apply to buffer
4. optionally show peer cursors
see NOTES.txt for cell-grid vs text-crdt mode discussion.
see docs/ for full research and architecture breakdown.
## security
current state (research prototype):
- room name validation (alphanumeric, 1-64 chars)
- message type validation via protocol decoder
- websocket.send error handling
known gaps (not production ready):
- no authentication (anyone can join any room by name)
- no authorization (all peers see all edits)
- no rate limiting on messages or room creation
- no message size limits
- room names guessable via brute force
- no encryption (deploy behind wss, not ws)
- no audit logging
- no persistence (data lost on daemon restart)
before production:
1. auth layer (jwt tokens or unix socket for local-only)
2. per-room authorization
3. rate limiting (msgs/sec, rooms/minute)
4. message size caps
5. tls via reverse proxy
6. access logging

View file

@ -1,22 +1,15 @@
# collabd # collabd
editor-agnostic collaborative editing daemon. two vims, one buffer. To install dependencies:
## quick start
```bash ```bash
bun install bun install
just dev # starts daemon on :4040
``` ```
in vim (requires 9.0+): To run:
```vim
:source adapters/vim/collab.vim ```bash
:CollabJoin roomname bun run src/index.ts
``` ```
open another vim, join the same room, type in either. magic. This project was created using `bun init` in bun v1.3.5. [Bun](https://bun.com) is a fast all-in-one JavaScript runtime.
## more info
see CLAUDE.md for architecture, protocol, and how to add new editor adapters.

View file

@ -1,194 +0,0 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { spawn } from "bun";
describe("Bridge lifecycle", () => {
let daemon: ReturnType<typeof Bun.serve>;
const DAEMON_PORT = 4042;
beforeEach(async () => {
const { decode } = await import("../../src/protocol");
const { getOrCreateSession, getSession, removeSession } = await import(
"../../src/session"
);
// start daemon for bridge to connect to
daemon = Bun.serve({
port: DAEMON_PORT,
fetch(req, server) {
const url = new URL(req.url);
if (url.pathname === "/ws") {
const upgraded = server.upgrade(req, {
data: { room: null, client: null },
});
if (!upgraded) {
return new Response("websocket upgrade failed", { status: 400 });
}
return;
}
return new Response("test daemon");
},
websocket: {
open(ws) {
const client = { ws };
ws.data.client = client;
},
message(ws, raw) {
const msg = decode(raw.toString());
if (!msg) return;
const client = ws.data.client;
if (!client) return;
switch (msg.type) {
case "join": {
const session = getOrCreateSession(msg.room);
ws.data.room = msg.room;
session.join(client);
break;
}
case "leave": {
if (ws.data.room) {
const session = getSession(ws.data.room);
session?.leave(client);
removeSession(ws.data.room);
ws.data.room = null;
}
break;
}
case "update": {
if (ws.data.room) {
const session = getSession(ws.data.room);
session?.applyUpdate(new Uint8Array(msg.data), client);
}
break;
}
}
},
close(ws) {
if (ws.data.room && ws.data.client) {
const session = getSession(ws.data.room);
session?.leave(ws.data.client);
removeSession(ws.data.room);
}
},
},
});
});
afterEach(() => {
daemon.stop();
});
test("bridge starts and signals ready", async () => {
const bridge = spawn({
cmd: ["bun", "adapters/vim/bridge.ts"],
env: {
...process.env,
COLLABD_URL: `ws://localhost:${DAEMON_PORT}/ws`,
},
stdin: "pipe",
stdout: "pipe",
stderr: "pipe",
});
// read first line from stdout
const reader = bridge.stdout.getReader();
const decoder = new TextDecoder();
const { value } = await reader.read();
expect(value).toBeDefined();
const output = decoder.decode(value);
const firstLine = output.trim().split("\n")[0];
const msg = JSON.parse(firstLine);
expect(msg.type).toBe("ready");
bridge.kill();
await bridge.exited;
});
test("bridge connects to daemon and disconnects cleanly", async () => {
const bridge = spawn({
cmd: ["bun", "adapters/vim/bridge.ts"],
env: {
...process.env,
COLLABD_URL: `ws://localhost:${DAEMON_PORT}/ws`,
},
stdin: "pipe",
stdout: "pipe",
stderr: "pipe",
});
const reader = bridge.stdout.getReader();
const decoder = new TextDecoder();
// wait for ready
let { value } = await reader.read();
expect(value).toBeDefined();
let output = decoder.decode(value);
expect(output).toContain('"type":"ready"');
// send connect message
bridge.stdin.write(
`${JSON.stringify({ type: "connect", room: "test" })}\n`,
);
// wait for connected message
({ value } = await reader.read());
expect(value).toBeDefined();
output = decoder.decode(value);
expect(output).toContain('"type":"connected"');
expect(output).toContain('"room":"test"');
// send disconnect message
bridge.stdin.write(`${JSON.stringify({ type: "disconnect" })}\n`);
bridge.stdin.end();
// bridge should exit cleanly
const exitCode = await bridge.exited;
expect(exitCode).toBe(0);
});
test("bridge handles content synchronization", async () => {
const bridge = spawn({
cmd: ["bun", "adapters/vim/bridge.ts"],
env: {
...process.env,
COLLABD_URL: `ws://localhost:${DAEMON_PORT}/ws`,
},
stdin: "pipe",
stdout: "pipe",
stderr: "pipe",
});
const reader = bridge.stdout.getReader();
// wait for ready
await reader.read();
// connect to room
bridge.stdin.write(
`${JSON.stringify({ type: "connect", room: "content-test" })}\n`,
);
// wait for connected and peers messages
await reader.read();
await reader.read();
// send content
bridge.stdin.write(
`${JSON.stringify({ type: "content", text: "hello world" })}\n`,
);
// give it time to process
await new Promise((r) => setTimeout(r, 100));
// disconnect
bridge.stdin.write(`${JSON.stringify({ type: "disconnect" })}\n`);
bridge.stdin.end();
const exitCode = await bridge.exited;
expect(exitCode).toBe(0);
});
});

View file

@ -9,22 +9,19 @@ const DAEMON_URL = process.env.COLLABD_URL || "ws://localhost:4040/ws";
let ws: WebSocket | null = null; let ws: WebSocket | null = null;
let doc: Y.Doc | null = null; let doc: Y.Doc | null = null;
let text: Y.Text | null = null; let text: Y.Text | null = null;
let suppressLocal = 0; let suppressLocal = false;
function send(msg: object) { function send(msg: object) {
console.log(JSON.stringify(msg)); console.log(JSON.stringify(msg));
} }
// signal to vim that bridge is ready
send({ type: "ready" });
function connect(roomName: string) { function connect(roomName: string) {
doc = new Y.Doc(); doc = new Y.Doc();
text = doc.getText("content"); text = doc.getText("content");
// when remote changes come in, notify vim // when remote changes come in, notify vim
text.observe(() => { text.observe(() => {
if (suppressLocal === 0) { if (!suppressLocal) {
send({ type: "content", text: text?.toString() || "" }); send({ type: "content", text: text?.toString() || "" });
} }
}); });
@ -42,16 +39,10 @@ function connect(roomName: string) {
case "sync": case "sync":
case "update": { case "update": {
if (!doc) break; if (!doc) break;
try { suppressLocal = true;
suppressLocal++;
Y.applyUpdate(doc, new Uint8Array(msg.data)); Y.applyUpdate(doc, new Uint8Array(msg.data));
suppressLocal--; suppressLocal = false;
send({ type: "content", text: text?.toString() || "" }); send({ type: "content", text: text?.toString() || "" });
} catch (err) {
suppressLocal--;
console.error("failed to apply yjs update:", err);
send({ type: "error", message: "failed to apply remote update" });
}
break; break;
} }
case "peers": { case "peers": {
@ -70,56 +61,19 @@ function connect(roomName: string) {
}; };
} }
// compute minimal edit operations using LCS-based diff
function computeDiff(oldText: string, newText: string) {
// find common prefix
let prefixLen = 0;
while (
prefixLen < oldText.length &&
prefixLen < newText.length &&
oldText[prefixLen] === newText[prefixLen]
) {
prefixLen++;
}
// find common suffix
let suffixLen = 0;
while (
suffixLen < oldText.length - prefixLen &&
suffixLen < newText.length - prefixLen &&
oldText[oldText.length - 1 - suffixLen] ===
newText[newText.length - 1 - suffixLen]
) {
suffixLen++;
}
const deleteStart = prefixLen;
const deleteLen = oldText.length - prefixLen - suffixLen;
const insertText = newText.slice(prefixLen, newText.length - suffixLen);
return { deleteStart, deleteLen, insertText };
}
function setContent(newContent: string) { function setContent(newContent: string) {
if (!doc || !text || !ws) return; if (!doc || !text || !ws) return;
const oldContent = text.toString(); const oldContent = text.toString();
if (oldContent === newContent) return; if (oldContent === newContent) return;
// compute minimal diff and apply // compute diff and apply
const { deleteStart, deleteLen, insertText } = computeDiff( // simple approach: delete all, insert all
oldContent, // TODO: proper diff for efficiency
newContent,
);
const t = text; const t = text;
doc.transact(() => { doc.transact(() => {
if (deleteLen > 0) { t.delete(0, t.length);
t.delete(deleteStart, deleteLen); t.insert(0, newContent);
}
if (insertText.length > 0) {
t.insert(deleteStart, insertText);
}
}); });
// send update to daemon // send update to daemon
@ -128,14 +82,11 @@ function setContent(newContent: string) {
} }
// read json lines from stdin // read json lines from stdin
let buffer = "";
const decoder = new TextDecoder(); const decoder = new TextDecoder();
for await (const chunk of Bun.stdin.stream()) { for await (const chunk of Bun.stdin.stream()) {
buffer += decoder.decode(chunk); const lines = decoder.decode(chunk).trim().split("\n");
const lines = buffer.split("\n");
buffer = lines.pop() ?? ""; // keep incomplete last line
for (const line of lines) { for (const line of lines) {
if (!line.trim()) continue; if (!line) continue;
try { try {
const msg = JSON.parse(line); const msg = JSON.parse(line);
switch (msg.type) { switch (msg.type) {
@ -147,8 +98,7 @@ for await (const chunk of Bun.stdin.stream()) {
break; break;
case "disconnect": case "disconnect":
ws?.close(); ws?.close();
send({ type: "disconnected" }); break;
process.exit(0);
} }
} catch { } catch {
send({ type: "error", message: "invalid json" }); send({ type: "error", message: "invalid json" });

View file

@ -3,10 +3,9 @@ vim9script
# collab.vim - collaborative editing adapter for collabd # collab.vim - collaborative editing adapter for collabd
# requires: bun, collabd daemon running # requires: bun, collabd daemon running
var bridge_job: job = null_job var bridge_job: job
var bridge_channel: channel = null_channel var bridge_channel: channel
var connected = false var connected = false
var ready = false
var room = "" var room = ""
var suppressing = false var suppressing = false
@ -14,7 +13,7 @@ var suppressing = false
const bridge_script = expand('<sfile>:p:h') .. '/bridge.ts' const bridge_script = expand('<sfile>:p:h') .. '/bridge.ts'
def Send(msg: dict<any>) def Send(msg: dict<any>)
if bridge_channel != null_channel if bridge_channel != null
ch_sendraw(bridge_channel, json_encode(msg) .. "\n") ch_sendraw(bridge_channel, json_encode(msg) .. "\n")
endif endif
enddef enddef
@ -30,9 +29,7 @@ def OnOutput(ch: channel, msg: string)
return return
endtry endtry
if data.type == 'ready' if data.type == 'connected'
ready = true
elseif data.type == 'connected'
connected = true connected = true
echom '[collab] connected to room: ' .. data.room echom '[collab] connected to room: ' .. data.room
elseif data.type == 'disconnected' elseif data.type == 'disconnected'
@ -70,12 +67,11 @@ def SendBuffer()
enddef enddef
export def Connect(room_name: string) export def Connect(room_name: string)
if bridge_job != null_job if bridge_job != null
Disconnect() Disconnect()
endif endif
room = room_name room = room_name
ready = false
bridge_job = job_start(['bun', 'run', bridge_script], { bridge_job = job_start(['bun', 'run', bridge_script], {
mode: 'nl', mode: 'nl',
out_cb: OnOutput, out_cb: OnOutput,
@ -83,19 +79,8 @@ export def Connect(room_name: string)
}) })
bridge_channel = job_getchannel(bridge_job) bridge_channel = job_getchannel(bridge_job)
# wait for bridge ready signal # give it a moment to start
var timeout = 50 sleep 100m
while !ready && timeout > 0
sleep 10m
timeout -= 1
endwhile
if !ready
echoerr '[collab] bridge failed to start'
Disconnect()
return
endif
Send({type: 'connect', room: room_name}) Send({type: 'connect', room: room_name})
# set up autocmds to send changes # set up autocmds to send changes
@ -106,14 +91,13 @@ export def Connect(room_name: string)
enddef enddef
export def Disconnect() export def Disconnect()
if bridge_job != null_job if bridge_job != null
Send({type: 'disconnect'}) Send({type: 'disconnect'})
job_stop(bridge_job) job_stop(bridge_job)
bridge_job = null_job bridge_job = null
bridge_channel = null_channel bridge_channel = null
endif endif
connected = false connected = false
ready = false
room = "" room = ""
augroup CollabVim augroup CollabVim
autocmd! autocmd!

View file

@ -3,20 +3,12 @@ import { type Client, getOrCreateSession, getSession } from "./session";
const PORT = Number(process.env.PORT) || 4040; const PORT = Number(process.env.PORT) || 4040;
function isValidRoomName(name: unknown): name is string {
if (typeof name !== "string") return false;
if (name.length === 0 || name.length > 64) return false;
return /^[a-zA-Z0-9_-]+$/.test(name);
}
Bun.serve({ Bun.serve({
port: PORT, port: PORT,
fetch(req, server) { fetch(req, server) {
const url = new URL(req.url); const url = new URL(req.url);
if (url.pathname === "/ws") { if (url.pathname === "/ws") {
const upgraded = server.upgrade(req, { const upgraded = server.upgrade(req, { data: { room: null } });
data: { room: null, client: null },
});
if (!upgraded) { if (!upgraded) {
return new Response("websocket upgrade failed", { status: 400 }); return new Response("websocket upgrade failed", { status: 400 });
} }
@ -25,32 +17,17 @@ Bun.serve({
return new Response("collabd running"); return new Response("collabd running");
}, },
websocket: { websocket: {
open(ws) { open() {
// create client object once and store in ws.data
const client: Client = { ws };
ws.data.client = client;
console.debug("client connected"); console.debug("client connected");
}, },
message(ws, raw) { message(ws, raw) {
const msg = decode(raw.toString()); const msg = decode(raw.toString());
if (!msg) return; if (!msg) return;
// reuse the client object from ws.data const client: Client = { ws };
const client = ws.data.client;
if (!client) return;
switch (msg.type) { switch (msg.type) {
case "join": { case "join": {
if (!isValidRoomName(msg.room)) {
try {
ws.send(
JSON.stringify({ type: "error", message: "invalid room name" }),
);
} catch (err) {
console.debug("failed to send error to client:", err);
}
break;
}
const session = getOrCreateSession(msg.room); const session = getOrCreateSession(msg.room);
ws.data.room = msg.room; ws.data.room = msg.room;
session.join(client); session.join(client);
@ -75,9 +52,9 @@ Bun.serve({
} }
}, },
close(ws) { close(ws) {
if (ws.data.room && ws.data.client) { if (ws.data.room) {
const session = getSession(ws.data.room); const session = getSession(ws.data.room);
session?.leave(ws.data.client); session?.leave({ ws });
} }
console.debug("client disconnected"); console.debug("client disconnected");
}, },

View file

@ -1,338 +0,0 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import type { ServerMessage } from "./protocol";
import { decode } from "./protocol";
import { getOrCreateSession, getSession, removeSession } from "./session";
describe("WebSocket concurrent edits integration", () => {
let server: ReturnType<typeof Bun.serve>;
const PORT = 4041; // use different port for tests
beforeEach(() => {
// start server for each test
server = Bun.serve({
port: PORT,
fetch(req, server) {
const url = new URL(req.url);
if (url.pathname === "/ws") {
const upgraded = server.upgrade(req, {
data: { room: null, client: null },
});
if (!upgraded) {
return new Response("websocket upgrade failed", { status: 400 });
}
return;
}
return new Response("collabd test server");
},
websocket: {
open(ws) {
const client = { ws };
ws.data.client = client;
},
message(ws, raw) {
const msg = decode(raw.toString());
if (!msg) return;
const client = ws.data.client;
if (!client) return;
switch (msg.type) {
case "join": {
const session = getOrCreateSession(msg.room);
ws.data.room = msg.room;
session.join(client);
break;
}
case "leave": {
if (ws.data.room) {
const session = getSession(ws.data.room);
session?.leave(client);
removeSession(ws.data.room);
ws.data.room = null;
}
break;
}
case "update": {
if (ws.data.room) {
const session = getSession(ws.data.room);
session?.applyUpdate(new Uint8Array(msg.data), client);
}
break;
}
}
},
close(ws) {
if (ws.data.room && ws.data.client) {
const session = getSession(ws.data.room);
session?.leave(ws.data.client);
removeSession(ws.data.room);
}
},
},
});
});
afterEach(() => {
server.stop();
});
test("two clients concurrent edits converge to same state", async () => {
const Y = await import("yjs");
// create two clients with their own yjs docs
const doc1 = new Y.Doc();
const text1 = doc1.getText("content");
const doc2 = new Y.Doc();
const text2 = doc2.getText("content");
// track messages received by each client
const client1Messages: ServerMessage[] = [];
const client2Messages: ServerMessage[] = [];
let ws1Ready = false;
let ws2Ready = false;
const ws1 = new WebSocket(`ws://localhost:${PORT}/ws`);
const ws2 = new WebSocket(`ws://localhost:${PORT}/ws`);
// setup all handlers immediately
ws1.onopen = () => {
ws1.send(JSON.stringify({ type: "join", room: "test-room" }));
};
ws1.onmessage = (ev) => {
const msg = JSON.parse(ev.data);
client1Messages.push(msg);
if (msg.type === "sync") {
ws1Ready = true;
}
};
ws2.onopen = () => {
ws2.send(JSON.stringify({ type: "join", room: "test-room" }));
};
ws2.onmessage = (ev) => {
const msg = JSON.parse(ev.data);
client2Messages.push(msg);
if (msg.type === "sync") {
ws2Ready = true;
}
};
// wait for both to be ready
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("clients join timeout"));
}, 2000);
const interval = setInterval(() => {
if (ws1Ready && ws2Ready) {
clearInterval(interval);
clearTimeout(timeout);
resolve();
}
}, 10);
});
// wait a bit for peers message
await new Promise((r) => setTimeout(r, 50));
// client 1 inserts "hello" at position 0
text1.insert(0, "hello");
const update1 = Y.encodeStateAsUpdate(doc1);
ws1.send(JSON.stringify({ type: "update", data: Array.from(update1) }));
// client 2 inserts "world" at position 0 (concurrent edit)
text2.insert(0, "world");
const update2 = Y.encodeStateAsUpdate(doc2);
ws2.send(JSON.stringify({ type: "update", data: Array.from(update2) }));
// wait for updates to propagate
await new Promise((r) => setTimeout(r, 200));
// apply received updates to both clients
for (const msg of client1Messages) {
if (msg.type === "update" || msg.type === "sync") {
Y.applyUpdate(doc1, new Uint8Array(msg.data));
}
}
for (const msg of client2Messages) {
if (msg.type === "update" || msg.type === "sync") {
Y.applyUpdate(doc2, new Uint8Array(msg.data));
}
}
// both clients should have the same final content
const final1 = text1.toString();
const final2 = text2.toString();
expect(final1).toBe(final2);
// both words should be present
expect(final1).toContain("hello");
expect(final1).toContain("world");
// total length should be sum of both inserts
expect(final1.length).toBe(10); // "hello" + "world"
ws1.close();
ws2.close();
});
test("three clients with sequential edits converge", async () => {
const Y = await import("yjs");
const doc1 = new Y.Doc();
const text1 = doc1.getText("content");
const doc2 = new Y.Doc();
const text2 = doc2.getText("content");
const doc3 = new Y.Doc();
const text3 = doc3.getText("content");
const client1Messages: ServerMessage[] = [];
const client2Messages: ServerMessage[] = [];
const client3Messages: ServerMessage[] = [];
let ws1Ready = false;
let ws2Ready = false;
let ws3Ready = false;
const ws1 = new WebSocket(`ws://localhost:${PORT}/ws`);
const ws2 = new WebSocket(`ws://localhost:${PORT}/ws`);
const ws3 = new WebSocket(`ws://localhost:${PORT}/ws`);
// setup all handlers immediately
ws1.onopen = () => {
ws1.send(JSON.stringify({ type: "join", room: "multi-room" }));
};
ws1.onmessage = (ev) => {
const msg = JSON.parse(ev.data);
client1Messages.push(msg);
if (msg.type === "sync") {
ws1Ready = true;
}
};
ws2.onopen = () => {
ws2.send(JSON.stringify({ type: "join", room: "multi-room" }));
};
ws2.onmessage = (ev) => {
const msg = JSON.parse(ev.data);
client2Messages.push(msg);
if (msg.type === "sync") {
ws2Ready = true;
}
};
ws3.onopen = () => {
ws3.send(JSON.stringify({ type: "join", room: "multi-room" }));
};
ws3.onmessage = (ev) => {
const msg = JSON.parse(ev.data);
client3Messages.push(msg);
if (msg.type === "sync") {
ws3Ready = true;
}
};
// wait for all three to be ready
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("clients join timeout"));
}, 2000);
const interval = setInterval(() => {
if (ws1Ready && ws2Ready && ws3Ready) {
clearInterval(interval);
clearTimeout(timeout);
resolve();
}
}, 10);
});
await new Promise((r) => setTimeout(r, 50));
// client 1: insert "a"
text1.insert(0, "a");
ws1.send(
JSON.stringify({
type: "update",
data: Array.from(Y.encodeStateAsUpdate(doc1)),
}),
);
await new Promise((r) => setTimeout(r, 30));
// client 2: insert "b"
// first apply any updates received
for (const msg of client2Messages) {
if (msg.type === "update" || msg.type === "sync") {
Y.applyUpdate(doc2, new Uint8Array(msg.data));
}
}
text2.insert(text2.length, "b");
ws2.send(
JSON.stringify({
type: "update",
data: Array.from(Y.encodeStateAsUpdate(doc2)),
}),
);
await new Promise((r) => setTimeout(r, 30));
// client 3: insert "c"
for (const msg of client3Messages) {
if (msg.type === "update" || msg.type === "sync") {
Y.applyUpdate(doc3, new Uint8Array(msg.data));
}
}
text3.insert(text3.length, "c");
ws3.send(
JSON.stringify({
type: "update",
data: Array.from(Y.encodeStateAsUpdate(doc3)),
}),
);
await new Promise((r) => setTimeout(r, 200));
// apply all updates
for (const msg of client1Messages) {
if (msg.type === "update" || msg.type === "sync") {
Y.applyUpdate(doc1, new Uint8Array(msg.data));
}
}
for (const msg of client2Messages) {
if (msg.type === "update" || msg.type === "sync") {
Y.applyUpdate(doc2, new Uint8Array(msg.data));
}
}
for (const msg of client3Messages) {
if (msg.type === "update" || msg.type === "sync") {
Y.applyUpdate(doc3, new Uint8Array(msg.data));
}
}
const final1 = text1.toString();
const final2 = text2.toString();
const final3 = text3.toString();
expect(final1).toBe(final2);
expect(final2).toBe(final3);
expect(final1).toBe("abc");
ws1.close();
ws2.close();
ws3.close();
});
});

View file

@ -10,48 +10,15 @@ export type ServerMessage =
| { type: "sync"; data: number[] } // full yjs state | { type: "sync"; data: number[] } // full yjs state
| { type: "update"; data: number[] } | { type: "update"; data: number[] }
| { type: "awareness"; data: number[] } | { type: "awareness"; data: number[] }
| { type: "peers"; count: number } | { type: "peers"; count: number };
| { type: "error"; message: string };
export type BridgeMessage =
| { type: "ready" }
| { type: "connected"; room: string }
| { type: "disconnected" }
| { type: "content"; text: string }
| { type: "peers"; count: number }
| { type: "error"; message: string };
export function encode(msg: ServerMessage): string { export function encode(msg: ServerMessage): string {
return JSON.stringify(msg); return JSON.stringify(msg);
} }
function isClientMessage(obj: unknown): obj is ClientMessage {
if (typeof obj !== "object" || obj === null) return false;
const msg = obj as Record<string, unknown>;
switch (msg.type) {
case "join":
return typeof msg.room === "string";
case "leave":
return true;
case "update":
return (
Array.isArray(msg.data) && msg.data.every((n) => typeof n === "number")
);
case "awareness":
return (
Array.isArray(msg.data) && msg.data.every((n) => typeof n === "number")
);
default:
return false;
}
}
export function decode(raw: string): ClientMessage | null { export function decode(raw: string): ClientMessage | null {
try { try {
const parsed = JSON.parse(raw); return JSON.parse(raw) as ClientMessage;
if (!isClientMessage(parsed)) return null;
return parsed;
} catch { } catch {
return null; return null;
} }

View file

@ -18,52 +18,21 @@ export class Session {
this.clients.add(client); this.clients.add(client);
// send full state to new client // send full state to new client
const state = Y.encodeStateAsUpdate(this.doc); const state = Y.encodeStateAsUpdate(this.doc);
try {
client.ws.send(encode({ type: "sync", data: Array.from(state) })); client.ws.send(encode({ type: "sync", data: Array.from(state) }));
} catch (err) {
console.debug("failed to send sync to client, removing:", err);
this.clients.delete(client);
return;
}
this.broadcastPeerCount(); this.broadcastPeerCount();
} }
leave(client: Client) { leave(client: Client) {
this.clients.delete(client); this.clients.delete(client);
this.broadcastPeerCount(); this.broadcastPeerCount();
if (this.isEmpty()) {
sessions.delete(this.name);
console.debug(`session removed: ${this.name}`);
}
}
isEmpty(): boolean {
return this.clients.size === 0;
} }
applyUpdate(update: Uint8Array, from: Client) { applyUpdate(update: Uint8Array, from: Client) {
try {
Y.applyUpdate(this.doc, update); Y.applyUpdate(this.doc, update);
// broadcast to others // broadcast to others
for (const client of this.clients) { for (const client of this.clients) {
if (client !== from) { if (client !== from) {
try { client.ws.send(encode({ type: "update", data: Array.from(update) }));
client.ws.send(
encode({ type: "update", data: Array.from(update) }),
);
} catch (err) {
console.debug("failed to send update to client, removing:", err);
this.clients.delete(client);
}
}
}
} catch (err) {
console.error(`failed to apply update in session ${this.name}:`, err);
// optionally notify the client that sent the bad update
try {
from.ws.send(encode({ type: "error", message: "invalid update" }));
} catch {
// ignore send errors
} }
} }
} }
@ -71,12 +40,7 @@ export class Session {
broadcastPeerCount() { broadcastPeerCount() {
const msg = encode({ type: "peers", count: this.clients.size }); const msg = encode({ type: "peers", count: this.clients.size });
for (const client of this.clients) { for (const client of this.clients) {
try {
client.ws.send(msg); client.ws.send(msg);
} catch (err) {
console.debug("failed to send peer count to client, removing:", err);
this.clients.delete(client);
}
} }
} }
} }
@ -97,11 +61,3 @@ export function getOrCreateSession(name: string): Session {
export function getSession(name: string): Session | undefined { export function getSession(name: string): Session | undefined {
return sessions.get(name); return sessions.get(name);
} }
export function removeSession(name: string): void {
const session = sessions.get(name);
if (session?.isEmpty()) {
sessions.delete(name);
console.debug(`session removed: ${name}`);
}
}