import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import { decode } from "./protocol"; import type { ServerMessage } from "./protocol"; import { getOrCreateSession, getSession, removeSession } from "./session"; describe("WebSocket concurrent edits integration", () => { let server: ReturnType; const PORT = 4041; // use different port for tests beforeEach(() => { // start server for each test server = Bun.serve({ port: PORT, fetch(req, server) { const url = new URL(req.url); if (url.pathname === "/ws") { const upgraded = server.upgrade(req, { data: { room: null, client: null }, }); if (!upgraded) { return new Response("websocket upgrade failed", { status: 400 }); } return; } return new Response("collabd test server"); }, websocket: { open(ws) { const client = { ws }; ws.data.client = client; }, message(ws, raw) { const msg = decode(raw.toString()); if (!msg) return; const client = ws.data.client; if (!client) return; switch (msg.type) { case "join": { const session = getOrCreateSession(msg.room); ws.data.room = msg.room; session.join(client); break; } case "leave": { if (ws.data.room) { const session = getSession(ws.data.room); session?.leave(client); removeSession(ws.data.room); ws.data.room = null; } break; } case "update": { if (ws.data.room) { const session = getSession(ws.data.room); session?.applyUpdate(new Uint8Array(msg.data), client); } break; } } }, close(ws) { if (ws.data.room && ws.data.client) { const session = getSession(ws.data.room); session?.leave(ws.data.client); removeSession(ws.data.room); } }, }, }); }); afterEach(() => { server.stop(); }); test("two clients concurrent edits converge to same state", async () => { const Y = await import("yjs"); // create two clients with their own yjs docs const doc1 = new Y.Doc(); const text1 = doc1.getText("content"); const doc2 = new Y.Doc(); const text2 = doc2.getText("content"); // track messages received by each client const client1Messages: ServerMessage[] = []; const client2Messages: ServerMessage[] = []; let ws1Ready = false; let ws2Ready = false; const ws1 = new WebSocket(`ws://localhost:${PORT}/ws`); const ws2 = new WebSocket(`ws://localhost:${PORT}/ws`); // setup all handlers immediately ws1.onopen = () => { ws1.send(JSON.stringify({ type: "join", room: "test-room" })); }; ws1.onmessage = (ev) => { const msg = JSON.parse(ev.data); client1Messages.push(msg); if (msg.type === "sync") { ws1Ready = true; } }; ws2.onopen = () => { ws2.send(JSON.stringify({ type: "join", room: "test-room" })); }; ws2.onmessage = (ev) => { const msg = JSON.parse(ev.data); client2Messages.push(msg); if (msg.type === "sync") { ws2Ready = true; } }; // wait for both to be ready await new Promise((resolve, reject) => { const timeout = setTimeout(() => { reject(new Error("clients join timeout")); }, 2000); const interval = setInterval(() => { if (ws1Ready && ws2Ready) { clearInterval(interval); clearTimeout(timeout); resolve(); } }, 10); }); // wait a bit for peers message await new Promise((r) => setTimeout(r, 50)); // client 1 inserts "hello" at position 0 text1.insert(0, "hello"); const update1 = Y.encodeStateAsUpdate(doc1); ws1.send(JSON.stringify({ type: "update", data: Array.from(update1) })); // client 2 inserts "world" at position 0 (concurrent edit) text2.insert(0, "world"); const update2 = Y.encodeStateAsUpdate(doc2); ws2.send(JSON.stringify({ type: "update", data: Array.from(update2) })); // wait for updates to propagate await new Promise((r) => setTimeout(r, 200)); // apply received updates to both clients for (const msg of client1Messages) { if (msg.type === "update" || msg.type === "sync") { Y.applyUpdate(doc1, new Uint8Array(msg.data)); } } for (const msg of client2Messages) { if (msg.type === "update" || msg.type === "sync") { Y.applyUpdate(doc2, new Uint8Array(msg.data)); } } // both clients should have the same final content const final1 = text1.toString(); const final2 = text2.toString(); expect(final1).toBe(final2); // both words should be present expect(final1).toContain("hello"); expect(final1).toContain("world"); // total length should be sum of both inserts expect(final1.length).toBe(10); // "hello" + "world" ws1.close(); ws2.close(); }); test("three clients with sequential edits converge", async () => { const Y = await import("yjs"); const doc1 = new Y.Doc(); const text1 = doc1.getText("content"); const doc2 = new Y.Doc(); const text2 = doc2.getText("content"); const doc3 = new Y.Doc(); const text3 = doc3.getText("content"); const client1Messages: ServerMessage[] = []; const client2Messages: ServerMessage[] = []; const client3Messages: ServerMessage[] = []; let ws1Ready = false; let ws2Ready = false; let ws3Ready = false; const ws1 = new WebSocket(`ws://localhost:${PORT}/ws`); const ws2 = new WebSocket(`ws://localhost:${PORT}/ws`); const ws3 = new WebSocket(`ws://localhost:${PORT}/ws`); // setup all handlers immediately ws1.onopen = () => { ws1.send(JSON.stringify({ type: "join", room: "multi-room" })); }; ws1.onmessage = (ev) => { const msg = JSON.parse(ev.data); client1Messages.push(msg); if (msg.type === "sync") { ws1Ready = true; } }; ws2.onopen = () => { ws2.send(JSON.stringify({ type: "join", room: "multi-room" })); }; ws2.onmessage = (ev) => { const msg = JSON.parse(ev.data); client2Messages.push(msg); if (msg.type === "sync") { ws2Ready = true; } }; ws3.onopen = () => { ws3.send(JSON.stringify({ type: "join", room: "multi-room" })); }; ws3.onmessage = (ev) => { const msg = JSON.parse(ev.data); client3Messages.push(msg); if (msg.type === "sync") { ws3Ready = true; } }; // wait for all three to be ready await new Promise((resolve, reject) => { const timeout = setTimeout(() => { reject(new Error("clients join timeout")); }, 2000); const interval = setInterval(() => { if (ws1Ready && ws2Ready && ws3Ready) { clearInterval(interval); clearTimeout(timeout); resolve(); } }, 10); }); await new Promise((r) => setTimeout(r, 50)); // client 1: insert "a" text1.insert(0, "a"); ws1.send( JSON.stringify({ type: "update", data: Array.from(Y.encodeStateAsUpdate(doc1)) }), ); await new Promise((r) => setTimeout(r, 30)); // client 2: insert "b" // first apply any updates received for (const msg of client2Messages) { if (msg.type === "update" || msg.type === "sync") { Y.applyUpdate(doc2, new Uint8Array(msg.data)); } } text2.insert(text2.length, "b"); ws2.send( JSON.stringify({ type: "update", data: Array.from(Y.encodeStateAsUpdate(doc2)) }), ); await new Promise((r) => setTimeout(r, 30)); // client 3: insert "c" for (const msg of client3Messages) { if (msg.type === "update" || msg.type === "sync") { Y.applyUpdate(doc3, new Uint8Array(msg.data)); } } text3.insert(text3.length, "c"); ws3.send( JSON.stringify({ type: "update", data: Array.from(Y.encodeStateAsUpdate(doc3)) }), ); await new Promise((r) => setTimeout(r, 200)); // apply all updates for (const msg of client1Messages) { if (msg.type === "update" || msg.type === "sync") { Y.applyUpdate(doc1, new Uint8Array(msg.data)); } } for (const msg of client2Messages) { if (msg.type === "update" || msg.type === "sync") { Y.applyUpdate(doc2, new Uint8Array(msg.data)); } } for (const msg of client3Messages) { if (msg.type === "update" || msg.type === "sync") { Y.applyUpdate(doc3, new Uint8Array(msg.data)); } } const final1 = text1.toString(); const final2 = text2.toString(); const final3 = text3.toString(); expect(final1).toBe(final2); expect(final2).toBe(final3); expect(final1).toBe("abc"); ws1.close(); ws2.close(); ws3.close(); }); });