Compare commits

...

23 commits

Author SHA1 Message Date
05629a00a0
Add type-safe message decoding 2026-01-27 18:01:59 -05:00
4c0f0fbf52
Document security limitations 2026-01-27 18:01:08 -05:00
8ce1e098e6
Make session cleanup atomic in leave() 2026-01-27 17:59:49 -05:00
2de33370cd
Add WebSocket.send() error handling 2026-01-27 17:57:04 -05:00
bad4cdac51
Add room name validation to prevent DoS 2026-01-27 17:55:40 -05:00
dcd97a451f
Fix stdin buffer splitting bug in bridge 2026-01-27 17:54:31 -05:00
c8020154e7
Fix biome formatting in bridge.test.ts 2026-01-27 17:53:33 -05:00
a07618deb5
Fix biome lint errors in integration.test.ts 2026-01-27 17:53:12 -05:00
063564b1cd
Fix biome lint errors in bridge.test.ts 2026-01-27 17:52:47 -05:00
ece0618a17
Update readme and agent config to reality 2026-01-27 17:47:47 -05:00
8671265479
Fix vim9script null_job and null_channel type consistency 2026-01-27 17:20:27 -05:00
ee358c1e84
Fix bridge_job initialization to prevent type error 2026-01-27 17:06:44 -05:00
e5cb351a1a
Add bridge lifecycle test
Tests verify that:
- Bridge starts and signals ready message immediately
- Bridge connects to daemon via websocket and disconnects cleanly
- Bridge handles content synchronization without errors
- Process exits with code 0 on clean disconnect
2026-01-27 16:26:29 -05:00
7e92cf251a
Add integration test for concurrent websocket edits
Tests verify that:
- Two concurrent clients can both insert text and CRDT resolves correctly
- Three clients with sequential edits all converge to the same state
- No data is lost during concurrent operations
- The delete-all-insert-all bug is fixed
2026-01-27 16:26:21 -05:00
0a71e7b321
Fix linting errors and formatting 2026-01-27 16:20:42 -05:00
83dac38f29
Add room cleanup to remove empty sessions 2026-01-27 16:19:41 -05:00
2e370afe2c
Document Vim 9.0+ requirement for adapter 2026-01-27 16:18:48 -05:00
7b81777d9d
Fix suppression flag timing with counter instead of boolean 2026-01-27 16:18:15 -05:00
73ee70c52a
Add error handling for Yjs operations 2026-01-27 16:17:53 -05:00
68a7517dec
Fix race condition in vim connect with ready signal 2026-01-27 16:17:33 -05:00
bbfc9998a5
Add clean exit handling to bridge process 2026-01-27 16:16:46 -05:00
4d6ecf78cd
Implement text diffing for proper CRDT operations 2026-01-27 16:16:34 -05:00
925c7a3c0d
Fix client identity tracking to prevent memory leak 2026-01-27 16:16:14 -05:00
9 changed files with 855 additions and 44 deletions

106
CLAUDE.md Normal file
View file

@ -0,0 +1,106 @@
# collabd - editor-agnostic collaborative editing daemon
daemon + thin adapters architecture. daemon handles yjs crdt, adapters just
hook buffer events and apply remote changes.
## status
working:
- daemon with room-based sessions
- multi-peer sync (tested with 2+ clients)
- vim9 adapter with live buffer sync
- proper crdt diffing (not delete-all-insert-all)
- integration tests for concurrent edits
not yet:
- cursor/selection sync (awareness protocol stubbed but unused)
- other editor adapters (helix, kakoune, zed)
- persistence (rooms are ephemeral)
- reconnection handling
## stack
- bun runtime
- yjs for crdt
- websocket transport
- vim9script adapter (with bun bridge since vim cant do websocket)
## running
```bash
just dev # daemon on :4040
bun test # unit tests
just check # biome lint
```
## vim adapter usage
requires Vim 9.0+ (uses vim9script)
```vim
:source adapters/vim/collab.vim
:CollabJoin roomname
:CollabLeave
:CollabStatus
```
the vim plugin spawns adapters/vim/bridge.ts which handles yjs and speaks
json lines to vim via channels.
## protocol
daemon speaks json over websocket at /ws
client -> server:
{ type: "join", room: "name" }
{ type: "leave" }
{ type: "update", data: [...] } // yjs update bytes
server -> client:
{ type: "sync", data: [...] } // full yjs state on join
{ type: "update", data: [...] } // remote changes
{ type: "peers", count: N }
## key files
- src/index.ts - websocket server, room routing
- src/session.ts - yjs doc per room, peer management
- src/protocol.ts - message types
- adapters/vim/bridge.ts - bun process vim spawns
- adapters/vim/collab.vim - vim9script plugin
## adding new editor adapters
each adapter needs:
1. hook buffer change events
2. send changes to daemon as yjs updates (or use a bridge like vim does)
3. receive remote updates and apply to buffer
4. optionally show peer cursors
see NOTES.txt for cell-grid vs text-crdt mode discussion.
see docs/ for full research and architecture breakdown.
## security
current state (research prototype):
- room name validation (alphanumeric, 1-64 chars)
- message type validation via protocol decoder
- websocket.send error handling
known gaps (not production ready):
- no authentication (anyone can join any room by name)
- no authorization (all peers see all edits)
- no rate limiting on messages or room creation
- no message size limits
- room names guessable via brute force
- no encryption (deploy behind wss, not ws)
- no audit logging
- no persistence (data lost on daemon restart)
before production:
1. auth layer (jwt tokens or unix socket for local-only)
2. per-room authorization
3. rate limiting (msgs/sec, rooms/minute)
4. message size caps
5. tls via reverse proxy
6. access logging

View file

@ -1,15 +1,22 @@
# collabd # collabd
To install dependencies: editor-agnostic collaborative editing daemon. two vims, one buffer.
## quick start
```bash ```bash
bun install bun install
just dev # starts daemon on :4040
``` ```
To run: in vim (requires 9.0+):
```vim
```bash :source adapters/vim/collab.vim
bun run src/index.ts :CollabJoin roomname
``` ```
This project was created using `bun init` in bun v1.3.5. [Bun](https://bun.com) is a fast all-in-one JavaScript runtime. open another vim, join the same room, type in either. magic.
## more info
see CLAUDE.md for architecture, protocol, and how to add new editor adapters.

194
adapters/vim/bridge.test.ts Normal file
View file

@ -0,0 +1,194 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { spawn } from "bun";
describe("Bridge lifecycle", () => {
let daemon: ReturnType<typeof Bun.serve>;
const DAEMON_PORT = 4042;
beforeEach(async () => {
const { decode } = await import("../../src/protocol");
const { getOrCreateSession, getSession, removeSession } = await import(
"../../src/session"
);
// start daemon for bridge to connect to
daemon = Bun.serve({
port: DAEMON_PORT,
fetch(req, server) {
const url = new URL(req.url);
if (url.pathname === "/ws") {
const upgraded = server.upgrade(req, {
data: { room: null, client: null },
});
if (!upgraded) {
return new Response("websocket upgrade failed", { status: 400 });
}
return;
}
return new Response("test daemon");
},
websocket: {
open(ws) {
const client = { ws };
ws.data.client = client;
},
message(ws, raw) {
const msg = decode(raw.toString());
if (!msg) return;
const client = ws.data.client;
if (!client) return;
switch (msg.type) {
case "join": {
const session = getOrCreateSession(msg.room);
ws.data.room = msg.room;
session.join(client);
break;
}
case "leave": {
if (ws.data.room) {
const session = getSession(ws.data.room);
session?.leave(client);
removeSession(ws.data.room);
ws.data.room = null;
}
break;
}
case "update": {
if (ws.data.room) {
const session = getSession(ws.data.room);
session?.applyUpdate(new Uint8Array(msg.data), client);
}
break;
}
}
},
close(ws) {
if (ws.data.room && ws.data.client) {
const session = getSession(ws.data.room);
session?.leave(ws.data.client);
removeSession(ws.data.room);
}
},
},
});
});
afterEach(() => {
daemon.stop();
});
test("bridge starts and signals ready", async () => {
const bridge = spawn({
cmd: ["bun", "adapters/vim/bridge.ts"],
env: {
...process.env,
COLLABD_URL: `ws://localhost:${DAEMON_PORT}/ws`,
},
stdin: "pipe",
stdout: "pipe",
stderr: "pipe",
});
// read first line from stdout
const reader = bridge.stdout.getReader();
const decoder = new TextDecoder();
const { value } = await reader.read();
expect(value).toBeDefined();
const output = decoder.decode(value);
const firstLine = output.trim().split("\n")[0];
const msg = JSON.parse(firstLine);
expect(msg.type).toBe("ready");
bridge.kill();
await bridge.exited;
});
test("bridge connects to daemon and disconnects cleanly", async () => {
const bridge = spawn({
cmd: ["bun", "adapters/vim/bridge.ts"],
env: {
...process.env,
COLLABD_URL: `ws://localhost:${DAEMON_PORT}/ws`,
},
stdin: "pipe",
stdout: "pipe",
stderr: "pipe",
});
const reader = bridge.stdout.getReader();
const decoder = new TextDecoder();
// wait for ready
let { value } = await reader.read();
expect(value).toBeDefined();
let output = decoder.decode(value);
expect(output).toContain('"type":"ready"');
// send connect message
bridge.stdin.write(
`${JSON.stringify({ type: "connect", room: "test" })}\n`,
);
// wait for connected message
({ value } = await reader.read());
expect(value).toBeDefined();
output = decoder.decode(value);
expect(output).toContain('"type":"connected"');
expect(output).toContain('"room":"test"');
// send disconnect message
bridge.stdin.write(`${JSON.stringify({ type: "disconnect" })}\n`);
bridge.stdin.end();
// bridge should exit cleanly
const exitCode = await bridge.exited;
expect(exitCode).toBe(0);
});
test("bridge handles content synchronization", async () => {
const bridge = spawn({
cmd: ["bun", "adapters/vim/bridge.ts"],
env: {
...process.env,
COLLABD_URL: `ws://localhost:${DAEMON_PORT}/ws`,
},
stdin: "pipe",
stdout: "pipe",
stderr: "pipe",
});
const reader = bridge.stdout.getReader();
// wait for ready
await reader.read();
// connect to room
bridge.stdin.write(
`${JSON.stringify({ type: "connect", room: "content-test" })}\n`,
);
// wait for connected and peers messages
await reader.read();
await reader.read();
// send content
bridge.stdin.write(
`${JSON.stringify({ type: "content", text: "hello world" })}\n`,
);
// give it time to process
await new Promise((r) => setTimeout(r, 100));
// disconnect
bridge.stdin.write(`${JSON.stringify({ type: "disconnect" })}\n`);
bridge.stdin.end();
const exitCode = await bridge.exited;
expect(exitCode).toBe(0);
});
});

View file

@ -9,19 +9,22 @@ const DAEMON_URL = process.env.COLLABD_URL || "ws://localhost:4040/ws";
let ws: WebSocket | null = null; let ws: WebSocket | null = null;
let doc: Y.Doc | null = null; let doc: Y.Doc | null = null;
let text: Y.Text | null = null; let text: Y.Text | null = null;
let suppressLocal = false; let suppressLocal = 0;
function send(msg: object) { function send(msg: object) {
console.log(JSON.stringify(msg)); console.log(JSON.stringify(msg));
} }
// signal to vim that bridge is ready
send({ type: "ready" });
function connect(roomName: string) { function connect(roomName: string) {
doc = new Y.Doc(); doc = new Y.Doc();
text = doc.getText("content"); text = doc.getText("content");
// when remote changes come in, notify vim // when remote changes come in, notify vim
text.observe(() => { text.observe(() => {
if (!suppressLocal) { if (suppressLocal === 0) {
send({ type: "content", text: text?.toString() || "" }); send({ type: "content", text: text?.toString() || "" });
} }
}); });
@ -39,10 +42,16 @@ function connect(roomName: string) {
case "sync": case "sync":
case "update": { case "update": {
if (!doc) break; if (!doc) break;
suppressLocal = true; try {
suppressLocal++;
Y.applyUpdate(doc, new Uint8Array(msg.data)); Y.applyUpdate(doc, new Uint8Array(msg.data));
suppressLocal = false; suppressLocal--;
send({ type: "content", text: text?.toString() || "" }); send({ type: "content", text: text?.toString() || "" });
} catch (err) {
suppressLocal--;
console.error("failed to apply yjs update:", err);
send({ type: "error", message: "failed to apply remote update" });
}
break; break;
} }
case "peers": { case "peers": {
@ -61,19 +70,56 @@ function connect(roomName: string) {
}; };
} }
// compute minimal edit operations using LCS-based diff
function computeDiff(oldText: string, newText: string) {
// find common prefix
let prefixLen = 0;
while (
prefixLen < oldText.length &&
prefixLen < newText.length &&
oldText[prefixLen] === newText[prefixLen]
) {
prefixLen++;
}
// find common suffix
let suffixLen = 0;
while (
suffixLen < oldText.length - prefixLen &&
suffixLen < newText.length - prefixLen &&
oldText[oldText.length - 1 - suffixLen] ===
newText[newText.length - 1 - suffixLen]
) {
suffixLen++;
}
const deleteStart = prefixLen;
const deleteLen = oldText.length - prefixLen - suffixLen;
const insertText = newText.slice(prefixLen, newText.length - suffixLen);
return { deleteStart, deleteLen, insertText };
}
function setContent(newContent: string) { function setContent(newContent: string) {
if (!doc || !text || !ws) return; if (!doc || !text || !ws) return;
const oldContent = text.toString(); const oldContent = text.toString();
if (oldContent === newContent) return; if (oldContent === newContent) return;
// compute diff and apply // compute minimal diff and apply
// simple approach: delete all, insert all const { deleteStart, deleteLen, insertText } = computeDiff(
// TODO: proper diff for efficiency oldContent,
newContent,
);
const t = text; const t = text;
doc.transact(() => { doc.transact(() => {
t.delete(0, t.length); if (deleteLen > 0) {
t.insert(0, newContent); t.delete(deleteStart, deleteLen);
}
if (insertText.length > 0) {
t.insert(deleteStart, insertText);
}
}); });
// send update to daemon // send update to daemon
@ -82,11 +128,14 @@ function setContent(newContent: string) {
} }
// read json lines from stdin // read json lines from stdin
let buffer = "";
const decoder = new TextDecoder(); const decoder = new TextDecoder();
for await (const chunk of Bun.stdin.stream()) { for await (const chunk of Bun.stdin.stream()) {
const lines = decoder.decode(chunk).trim().split("\n"); buffer += decoder.decode(chunk);
const lines = buffer.split("\n");
buffer = lines.pop() ?? ""; // keep incomplete last line
for (const line of lines) { for (const line of lines) {
if (!line) continue; if (!line.trim()) continue;
try { try {
const msg = JSON.parse(line); const msg = JSON.parse(line);
switch (msg.type) { switch (msg.type) {
@ -98,7 +147,8 @@ for await (const chunk of Bun.stdin.stream()) {
break; break;
case "disconnect": case "disconnect":
ws?.close(); ws?.close();
break; send({ type: "disconnected" });
process.exit(0);
} }
} catch { } catch {
send({ type: "error", message: "invalid json" }); send({ type: "error", message: "invalid json" });

View file

@ -3,9 +3,10 @@ vim9script
# collab.vim - collaborative editing adapter for collabd # collab.vim - collaborative editing adapter for collabd
# requires: bun, collabd daemon running # requires: bun, collabd daemon running
var bridge_job: job var bridge_job: job = null_job
var bridge_channel: channel var bridge_channel: channel = null_channel
var connected = false var connected = false
var ready = false
var room = "" var room = ""
var suppressing = false var suppressing = false
@ -13,7 +14,7 @@ var suppressing = false
const bridge_script = expand('<sfile>:p:h') .. '/bridge.ts' const bridge_script = expand('<sfile>:p:h') .. '/bridge.ts'
def Send(msg: dict<any>) def Send(msg: dict<any>)
if bridge_channel != null if bridge_channel != null_channel
ch_sendraw(bridge_channel, json_encode(msg) .. "\n") ch_sendraw(bridge_channel, json_encode(msg) .. "\n")
endif endif
enddef enddef
@ -29,7 +30,9 @@ def OnOutput(ch: channel, msg: string)
return return
endtry endtry
if data.type == 'connected' if data.type == 'ready'
ready = true
elseif data.type == 'connected'
connected = true connected = true
echom '[collab] connected to room: ' .. data.room echom '[collab] connected to room: ' .. data.room
elseif data.type == 'disconnected' elseif data.type == 'disconnected'
@ -67,11 +70,12 @@ def SendBuffer()
enddef enddef
export def Connect(room_name: string) export def Connect(room_name: string)
if bridge_job != null if bridge_job != null_job
Disconnect() Disconnect()
endif endif
room = room_name room = room_name
ready = false
bridge_job = job_start(['bun', 'run', bridge_script], { bridge_job = job_start(['bun', 'run', bridge_script], {
mode: 'nl', mode: 'nl',
out_cb: OnOutput, out_cb: OnOutput,
@ -79,8 +83,19 @@ export def Connect(room_name: string)
}) })
bridge_channel = job_getchannel(bridge_job) bridge_channel = job_getchannel(bridge_job)
# give it a moment to start # wait for bridge ready signal
sleep 100m var timeout = 50
while !ready && timeout > 0
sleep 10m
timeout -= 1
endwhile
if !ready
echoerr '[collab] bridge failed to start'
Disconnect()
return
endif
Send({type: 'connect', room: room_name}) Send({type: 'connect', room: room_name})
# set up autocmds to send changes # set up autocmds to send changes
@ -91,13 +106,14 @@ export def Connect(room_name: string)
enddef enddef
export def Disconnect() export def Disconnect()
if bridge_job != null if bridge_job != null_job
Send({type: 'disconnect'}) Send({type: 'disconnect'})
job_stop(bridge_job) job_stop(bridge_job)
bridge_job = null bridge_job = null_job
bridge_channel = null bridge_channel = null_channel
endif endif
connected = false connected = false
ready = false
room = "" room = ""
augroup CollabVim augroup CollabVim
autocmd! autocmd!

View file

@ -3,12 +3,20 @@ import { type Client, getOrCreateSession, getSession } from "./session";
const PORT = Number(process.env.PORT) || 4040; const PORT = Number(process.env.PORT) || 4040;
function isValidRoomName(name: unknown): name is string {
if (typeof name !== "string") return false;
if (name.length === 0 || name.length > 64) return false;
return /^[a-zA-Z0-9_-]+$/.test(name);
}
Bun.serve({ Bun.serve({
port: PORT, port: PORT,
fetch(req, server) { fetch(req, server) {
const url = new URL(req.url); const url = new URL(req.url);
if (url.pathname === "/ws") { if (url.pathname === "/ws") {
const upgraded = server.upgrade(req, { data: { room: null } }); const upgraded = server.upgrade(req, {
data: { room: null, client: null },
});
if (!upgraded) { if (!upgraded) {
return new Response("websocket upgrade failed", { status: 400 }); return new Response("websocket upgrade failed", { status: 400 });
} }
@ -17,17 +25,32 @@ Bun.serve({
return new Response("collabd running"); return new Response("collabd running");
}, },
websocket: { websocket: {
open() { open(ws) {
// create client object once and store in ws.data
const client: Client = { ws };
ws.data.client = client;
console.debug("client connected"); console.debug("client connected");
}, },
message(ws, raw) { message(ws, raw) {
const msg = decode(raw.toString()); const msg = decode(raw.toString());
if (!msg) return; if (!msg) return;
const client: Client = { ws }; // reuse the client object from ws.data
const client = ws.data.client;
if (!client) return;
switch (msg.type) { switch (msg.type) {
case "join": { case "join": {
if (!isValidRoomName(msg.room)) {
try {
ws.send(
JSON.stringify({ type: "error", message: "invalid room name" }),
);
} catch (err) {
console.debug("failed to send error to client:", err);
}
break;
}
const session = getOrCreateSession(msg.room); const session = getOrCreateSession(msg.room);
ws.data.room = msg.room; ws.data.room = msg.room;
session.join(client); session.join(client);
@ -52,9 +75,9 @@ Bun.serve({
} }
}, },
close(ws) { close(ws) {
if (ws.data.room) { if (ws.data.room && ws.data.client) {
const session = getSession(ws.data.room); const session = getSession(ws.data.room);
session?.leave({ ws }); session?.leave(ws.data.client);
} }
console.debug("client disconnected"); console.debug("client disconnected");
}, },

338
src/integration.test.ts Normal file
View file

@ -0,0 +1,338 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import type { ServerMessage } from "./protocol";
import { decode } from "./protocol";
import { getOrCreateSession, getSession, removeSession } from "./session";
describe("WebSocket concurrent edits integration", () => {
let server: ReturnType<typeof Bun.serve>;
const PORT = 4041; // use different port for tests
beforeEach(() => {
// start server for each test
server = Bun.serve({
port: PORT,
fetch(req, server) {
const url = new URL(req.url);
if (url.pathname === "/ws") {
const upgraded = server.upgrade(req, {
data: { room: null, client: null },
});
if (!upgraded) {
return new Response("websocket upgrade failed", { status: 400 });
}
return;
}
return new Response("collabd test server");
},
websocket: {
open(ws) {
const client = { ws };
ws.data.client = client;
},
message(ws, raw) {
const msg = decode(raw.toString());
if (!msg) return;
const client = ws.data.client;
if (!client) return;
switch (msg.type) {
case "join": {
const session = getOrCreateSession(msg.room);
ws.data.room = msg.room;
session.join(client);
break;
}
case "leave": {
if (ws.data.room) {
const session = getSession(ws.data.room);
session?.leave(client);
removeSession(ws.data.room);
ws.data.room = null;
}
break;
}
case "update": {
if (ws.data.room) {
const session = getSession(ws.data.room);
session?.applyUpdate(new Uint8Array(msg.data), client);
}
break;
}
}
},
close(ws) {
if (ws.data.room && ws.data.client) {
const session = getSession(ws.data.room);
session?.leave(ws.data.client);
removeSession(ws.data.room);
}
},
},
});
});
afterEach(() => {
server.stop();
});
test("two clients concurrent edits converge to same state", async () => {
const Y = await import("yjs");
// create two clients with their own yjs docs
const doc1 = new Y.Doc();
const text1 = doc1.getText("content");
const doc2 = new Y.Doc();
const text2 = doc2.getText("content");
// track messages received by each client
const client1Messages: ServerMessage[] = [];
const client2Messages: ServerMessage[] = [];
let ws1Ready = false;
let ws2Ready = false;
const ws1 = new WebSocket(`ws://localhost:${PORT}/ws`);
const ws2 = new WebSocket(`ws://localhost:${PORT}/ws`);
// setup all handlers immediately
ws1.onopen = () => {
ws1.send(JSON.stringify({ type: "join", room: "test-room" }));
};
ws1.onmessage = (ev) => {
const msg = JSON.parse(ev.data);
client1Messages.push(msg);
if (msg.type === "sync") {
ws1Ready = true;
}
};
ws2.onopen = () => {
ws2.send(JSON.stringify({ type: "join", room: "test-room" }));
};
ws2.onmessage = (ev) => {
const msg = JSON.parse(ev.data);
client2Messages.push(msg);
if (msg.type === "sync") {
ws2Ready = true;
}
};
// wait for both to be ready
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("clients join timeout"));
}, 2000);
const interval = setInterval(() => {
if (ws1Ready && ws2Ready) {
clearInterval(interval);
clearTimeout(timeout);
resolve();
}
}, 10);
});
// wait a bit for peers message
await new Promise((r) => setTimeout(r, 50));
// client 1 inserts "hello" at position 0
text1.insert(0, "hello");
const update1 = Y.encodeStateAsUpdate(doc1);
ws1.send(JSON.stringify({ type: "update", data: Array.from(update1) }));
// client 2 inserts "world" at position 0 (concurrent edit)
text2.insert(0, "world");
const update2 = Y.encodeStateAsUpdate(doc2);
ws2.send(JSON.stringify({ type: "update", data: Array.from(update2) }));
// wait for updates to propagate
await new Promise((r) => setTimeout(r, 200));
// apply received updates to both clients
for (const msg of client1Messages) {
if (msg.type === "update" || msg.type === "sync") {
Y.applyUpdate(doc1, new Uint8Array(msg.data));
}
}
for (const msg of client2Messages) {
if (msg.type === "update" || msg.type === "sync") {
Y.applyUpdate(doc2, new Uint8Array(msg.data));
}
}
// both clients should have the same final content
const final1 = text1.toString();
const final2 = text2.toString();
expect(final1).toBe(final2);
// both words should be present
expect(final1).toContain("hello");
expect(final1).toContain("world");
// total length should be sum of both inserts
expect(final1.length).toBe(10); // "hello" + "world"
ws1.close();
ws2.close();
});
test("three clients with sequential edits converge", async () => {
const Y = await import("yjs");
const doc1 = new Y.Doc();
const text1 = doc1.getText("content");
const doc2 = new Y.Doc();
const text2 = doc2.getText("content");
const doc3 = new Y.Doc();
const text3 = doc3.getText("content");
const client1Messages: ServerMessage[] = [];
const client2Messages: ServerMessage[] = [];
const client3Messages: ServerMessage[] = [];
let ws1Ready = false;
let ws2Ready = false;
let ws3Ready = false;
const ws1 = new WebSocket(`ws://localhost:${PORT}/ws`);
const ws2 = new WebSocket(`ws://localhost:${PORT}/ws`);
const ws3 = new WebSocket(`ws://localhost:${PORT}/ws`);
// setup all handlers immediately
ws1.onopen = () => {
ws1.send(JSON.stringify({ type: "join", room: "multi-room" }));
};
ws1.onmessage = (ev) => {
const msg = JSON.parse(ev.data);
client1Messages.push(msg);
if (msg.type === "sync") {
ws1Ready = true;
}
};
ws2.onopen = () => {
ws2.send(JSON.stringify({ type: "join", room: "multi-room" }));
};
ws2.onmessage = (ev) => {
const msg = JSON.parse(ev.data);
client2Messages.push(msg);
if (msg.type === "sync") {
ws2Ready = true;
}
};
ws3.onopen = () => {
ws3.send(JSON.stringify({ type: "join", room: "multi-room" }));
};
ws3.onmessage = (ev) => {
const msg = JSON.parse(ev.data);
client3Messages.push(msg);
if (msg.type === "sync") {
ws3Ready = true;
}
};
// wait for all three to be ready
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("clients join timeout"));
}, 2000);
const interval = setInterval(() => {
if (ws1Ready && ws2Ready && ws3Ready) {
clearInterval(interval);
clearTimeout(timeout);
resolve();
}
}, 10);
});
await new Promise((r) => setTimeout(r, 50));
// client 1: insert "a"
text1.insert(0, "a");
ws1.send(
JSON.stringify({
type: "update",
data: Array.from(Y.encodeStateAsUpdate(doc1)),
}),
);
await new Promise((r) => setTimeout(r, 30));
// client 2: insert "b"
// first apply any updates received
for (const msg of client2Messages) {
if (msg.type === "update" || msg.type === "sync") {
Y.applyUpdate(doc2, new Uint8Array(msg.data));
}
}
text2.insert(text2.length, "b");
ws2.send(
JSON.stringify({
type: "update",
data: Array.from(Y.encodeStateAsUpdate(doc2)),
}),
);
await new Promise((r) => setTimeout(r, 30));
// client 3: insert "c"
for (const msg of client3Messages) {
if (msg.type === "update" || msg.type === "sync") {
Y.applyUpdate(doc3, new Uint8Array(msg.data));
}
}
text3.insert(text3.length, "c");
ws3.send(
JSON.stringify({
type: "update",
data: Array.from(Y.encodeStateAsUpdate(doc3)),
}),
);
await new Promise((r) => setTimeout(r, 200));
// apply all updates
for (const msg of client1Messages) {
if (msg.type === "update" || msg.type === "sync") {
Y.applyUpdate(doc1, new Uint8Array(msg.data));
}
}
for (const msg of client2Messages) {
if (msg.type === "update" || msg.type === "sync") {
Y.applyUpdate(doc2, new Uint8Array(msg.data));
}
}
for (const msg of client3Messages) {
if (msg.type === "update" || msg.type === "sync") {
Y.applyUpdate(doc3, new Uint8Array(msg.data));
}
}
const final1 = text1.toString();
const final2 = text2.toString();
const final3 = text3.toString();
expect(final1).toBe(final2);
expect(final2).toBe(final3);
expect(final1).toBe("abc");
ws1.close();
ws2.close();
ws3.close();
});
});

View file

@ -10,15 +10,48 @@ export type ServerMessage =
| { type: "sync"; data: number[] } // full yjs state | { type: "sync"; data: number[] } // full yjs state
| { type: "update"; data: number[] } | { type: "update"; data: number[] }
| { type: "awareness"; data: number[] } | { type: "awareness"; data: number[] }
| { type: "peers"; count: number }; | { type: "peers"; count: number }
| { type: "error"; message: string };
export type BridgeMessage =
| { type: "ready" }
| { type: "connected"; room: string }
| { type: "disconnected" }
| { type: "content"; text: string }
| { type: "peers"; count: number }
| { type: "error"; message: string };
export function encode(msg: ServerMessage): string { export function encode(msg: ServerMessage): string {
return JSON.stringify(msg); return JSON.stringify(msg);
} }
function isClientMessage(obj: unknown): obj is ClientMessage {
if (typeof obj !== "object" || obj === null) return false;
const msg = obj as Record<string, unknown>;
switch (msg.type) {
case "join":
return typeof msg.room === "string";
case "leave":
return true;
case "update":
return (
Array.isArray(msg.data) && msg.data.every((n) => typeof n === "number")
);
case "awareness":
return (
Array.isArray(msg.data) && msg.data.every((n) => typeof n === "number")
);
default:
return false;
}
}
export function decode(raw: string): ClientMessage | null { export function decode(raw: string): ClientMessage | null {
try { try {
return JSON.parse(raw) as ClientMessage; const parsed = JSON.parse(raw);
if (!isClientMessage(parsed)) return null;
return parsed;
} catch { } catch {
return null; return null;
} }

View file

@ -18,21 +18,52 @@ export class Session {
this.clients.add(client); this.clients.add(client);
// send full state to new client // send full state to new client
const state = Y.encodeStateAsUpdate(this.doc); const state = Y.encodeStateAsUpdate(this.doc);
try {
client.ws.send(encode({ type: "sync", data: Array.from(state) })); client.ws.send(encode({ type: "sync", data: Array.from(state) }));
} catch (err) {
console.debug("failed to send sync to client, removing:", err);
this.clients.delete(client);
return;
}
this.broadcastPeerCount(); this.broadcastPeerCount();
} }
leave(client: Client) { leave(client: Client) {
this.clients.delete(client); this.clients.delete(client);
this.broadcastPeerCount(); this.broadcastPeerCount();
if (this.isEmpty()) {
sessions.delete(this.name);
console.debug(`session removed: ${this.name}`);
}
}
isEmpty(): boolean {
return this.clients.size === 0;
} }
applyUpdate(update: Uint8Array, from: Client) { applyUpdate(update: Uint8Array, from: Client) {
try {
Y.applyUpdate(this.doc, update); Y.applyUpdate(this.doc, update);
// broadcast to others // broadcast to others
for (const client of this.clients) { for (const client of this.clients) {
if (client !== from) { if (client !== from) {
client.ws.send(encode({ type: "update", data: Array.from(update) })); try {
client.ws.send(
encode({ type: "update", data: Array.from(update) }),
);
} catch (err) {
console.debug("failed to send update to client, removing:", err);
this.clients.delete(client);
}
}
}
} catch (err) {
console.error(`failed to apply update in session ${this.name}:`, err);
// optionally notify the client that sent the bad update
try {
from.ws.send(encode({ type: "error", message: "invalid update" }));
} catch {
// ignore send errors
} }
} }
} }
@ -40,7 +71,12 @@ export class Session {
broadcastPeerCount() { broadcastPeerCount() {
const msg = encode({ type: "peers", count: this.clients.size }); const msg = encode({ type: "peers", count: this.clients.size });
for (const client of this.clients) { for (const client of this.clients) {
try {
client.ws.send(msg); client.ws.send(msg);
} catch (err) {
console.debug("failed to send peer count to client, removing:", err);
this.clients.delete(client);
}
} }
} }
} }
@ -61,3 +97,11 @@ export function getOrCreateSession(name: string): Session {
export function getSession(name: string): Session | undefined { export function getSession(name: string): Session | undefined {
return sessions.get(name); return sessions.get(name);
} }
export function removeSession(name: string): void {
const session = sessions.get(name);
if (session?.isEmpty()) {
sessions.delete(name);
console.debug(`session removed: ${name}`);
}
}