diff --git a/src/integration.test.ts b/src/integration.test.ts new file mode 100644 index 0000000..f2e3d23 --- /dev/null +++ b/src/integration.test.ts @@ -0,0 +1,329 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { decode } from "./protocol"; +import type { ServerMessage } from "./protocol"; +import { getOrCreateSession, getSession, removeSession } from "./session"; + +describe("WebSocket concurrent edits integration", () => { + let server: ReturnType; + const PORT = 4041; // use different port for tests + + beforeEach(() => { + // start server for each test + server = Bun.serve({ + port: PORT, + fetch(req, server) { + const url = new URL(req.url); + if (url.pathname === "/ws") { + const upgraded = server.upgrade(req, { + data: { room: null, client: null }, + }); + if (!upgraded) { + return new Response("websocket upgrade failed", { status: 400 }); + } + return; + } + return new Response("collabd test server"); + }, + websocket: { + open(ws) { + const client = { ws }; + ws.data.client = client; + }, + message(ws, raw) { + const msg = decode(raw.toString()); + if (!msg) return; + + const client = ws.data.client; + if (!client) return; + + switch (msg.type) { + case "join": { + const session = getOrCreateSession(msg.room); + ws.data.room = msg.room; + session.join(client); + break; + } + case "leave": { + if (ws.data.room) { + const session = getSession(ws.data.room); + session?.leave(client); + removeSession(ws.data.room); + ws.data.room = null; + } + break; + } + case "update": { + if (ws.data.room) { + const session = getSession(ws.data.room); + session?.applyUpdate(new Uint8Array(msg.data), client); + } + break; + } + } + }, + close(ws) { + if (ws.data.room && ws.data.client) { + const session = getSession(ws.data.room); + session?.leave(ws.data.client); + removeSession(ws.data.room); + } + }, + }, + }); + }); + + afterEach(() => { + server.stop(); + }); + + test("two clients concurrent edits converge to same state", async () => { + const Y = await import("yjs"); + + // create two clients with their own yjs docs + const doc1 = new Y.Doc(); + const text1 = doc1.getText("content"); + + const doc2 = new Y.Doc(); + const text2 = doc2.getText("content"); + + // track messages received by each client + const client1Messages: ServerMessage[] = []; + const client2Messages: ServerMessage[] = []; + + let ws1Ready = false; + let ws2Ready = false; + + const ws1 = new WebSocket(`ws://localhost:${PORT}/ws`); + const ws2 = new WebSocket(`ws://localhost:${PORT}/ws`); + + // setup all handlers immediately + ws1.onopen = () => { + ws1.send(JSON.stringify({ type: "join", room: "test-room" })); + }; + + ws1.onmessage = (ev) => { + const msg = JSON.parse(ev.data); + client1Messages.push(msg); + if (msg.type === "sync") { + ws1Ready = true; + } + }; + + ws2.onopen = () => { + ws2.send(JSON.stringify({ type: "join", room: "test-room" })); + }; + + ws2.onmessage = (ev) => { + const msg = JSON.parse(ev.data); + client2Messages.push(msg); + if (msg.type === "sync") { + ws2Ready = true; + } + }; + + // wait for both to be ready + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error("clients join timeout")); + }, 2000); + + const interval = setInterval(() => { + if (ws1Ready && ws2Ready) { + clearInterval(interval); + clearTimeout(timeout); + resolve(); + } + }, 10); + }); + + // wait a bit for peers message + await new Promise((r) => setTimeout(r, 50)); + + // client 1 inserts "hello" at position 0 + text1.insert(0, "hello"); + const update1 = Y.encodeStateAsUpdate(doc1); + ws1.send(JSON.stringify({ type: "update", data: Array.from(update1) })); + + // client 2 inserts "world" at position 0 (concurrent edit) + text2.insert(0, "world"); + const update2 = Y.encodeStateAsUpdate(doc2); + ws2.send(JSON.stringify({ type: "update", data: Array.from(update2) })); + + // wait for updates to propagate + await new Promise((r) => setTimeout(r, 200)); + + // apply received updates to both clients + for (const msg of client1Messages) { + if (msg.type === "update" || msg.type === "sync") { + Y.applyUpdate(doc1, new Uint8Array(msg.data)); + } + } + + for (const msg of client2Messages) { + if (msg.type === "update" || msg.type === "sync") { + Y.applyUpdate(doc2, new Uint8Array(msg.data)); + } + } + + // both clients should have the same final content + const final1 = text1.toString(); + const final2 = text2.toString(); + + expect(final1).toBe(final2); + + // both words should be present + expect(final1).toContain("hello"); + expect(final1).toContain("world"); + + // total length should be sum of both inserts + expect(final1.length).toBe(10); // "hello" + "world" + + ws1.close(); + ws2.close(); + }); + + test("three clients with sequential edits converge", async () => { + const Y = await import("yjs"); + + const doc1 = new Y.Doc(); + const text1 = doc1.getText("content"); + + const doc2 = new Y.Doc(); + const text2 = doc2.getText("content"); + + const doc3 = new Y.Doc(); + const text3 = doc3.getText("content"); + + const client1Messages: ServerMessage[] = []; + const client2Messages: ServerMessage[] = []; + const client3Messages: ServerMessage[] = []; + + let ws1Ready = false; + let ws2Ready = false; + let ws3Ready = false; + + const ws1 = new WebSocket(`ws://localhost:${PORT}/ws`); + const ws2 = new WebSocket(`ws://localhost:${PORT}/ws`); + const ws3 = new WebSocket(`ws://localhost:${PORT}/ws`); + + // setup all handlers immediately + ws1.onopen = () => { + ws1.send(JSON.stringify({ type: "join", room: "multi-room" })); + }; + + ws1.onmessage = (ev) => { + const msg = JSON.parse(ev.data); + client1Messages.push(msg); + if (msg.type === "sync") { + ws1Ready = true; + } + }; + + ws2.onopen = () => { + ws2.send(JSON.stringify({ type: "join", room: "multi-room" })); + }; + + ws2.onmessage = (ev) => { + const msg = JSON.parse(ev.data); + client2Messages.push(msg); + if (msg.type === "sync") { + ws2Ready = true; + } + }; + + ws3.onopen = () => { + ws3.send(JSON.stringify({ type: "join", room: "multi-room" })); + }; + + ws3.onmessage = (ev) => { + const msg = JSON.parse(ev.data); + client3Messages.push(msg); + if (msg.type === "sync") { + ws3Ready = true; + } + }; + + // wait for all three to be ready + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error("clients join timeout")); + }, 2000); + + const interval = setInterval(() => { + if (ws1Ready && ws2Ready && ws3Ready) { + clearInterval(interval); + clearTimeout(timeout); + resolve(); + } + }, 10); + }); + + await new Promise((r) => setTimeout(r, 50)); + + // client 1: insert "a" + text1.insert(0, "a"); + ws1.send( + JSON.stringify({ type: "update", data: Array.from(Y.encodeStateAsUpdate(doc1)) }), + ); + + await new Promise((r) => setTimeout(r, 30)); + + // client 2: insert "b" + // first apply any updates received + for (const msg of client2Messages) { + if (msg.type === "update" || msg.type === "sync") { + Y.applyUpdate(doc2, new Uint8Array(msg.data)); + } + } + text2.insert(text2.length, "b"); + ws2.send( + JSON.stringify({ type: "update", data: Array.from(Y.encodeStateAsUpdate(doc2)) }), + ); + + await new Promise((r) => setTimeout(r, 30)); + + // client 3: insert "c" + for (const msg of client3Messages) { + if (msg.type === "update" || msg.type === "sync") { + Y.applyUpdate(doc3, new Uint8Array(msg.data)); + } + } + text3.insert(text3.length, "c"); + ws3.send( + JSON.stringify({ type: "update", data: Array.from(Y.encodeStateAsUpdate(doc3)) }), + ); + + await new Promise((r) => setTimeout(r, 200)); + + // apply all updates + for (const msg of client1Messages) { + if (msg.type === "update" || msg.type === "sync") { + Y.applyUpdate(doc1, new Uint8Array(msg.data)); + } + } + + for (const msg of client2Messages) { + if (msg.type === "update" || msg.type === "sync") { + Y.applyUpdate(doc2, new Uint8Array(msg.data)); + } + } + + for (const msg of client3Messages) { + if (msg.type === "update" || msg.type === "sync") { + Y.applyUpdate(doc3, new Uint8Array(msg.data)); + } + } + + const final1 = text1.toString(); + const final2 = text2.toString(); + const final3 = text3.toString(); + + expect(final1).toBe(final2); + expect(final2).toBe(final3); + expect(final1).toBe("abc"); + + ws1.close(); + ws2.close(); + ws3.close(); + }); +});