Compare commits
23 commits
3cf16586aa
...
05629a00a0
| Author | SHA1 | Date | |
|---|---|---|---|
| 05629a00a0 | |||
| 4c0f0fbf52 | |||
| 8ce1e098e6 | |||
| 2de33370cd | |||
| bad4cdac51 | |||
| dcd97a451f | |||
| c8020154e7 | |||
| a07618deb5 | |||
| 063564b1cd | |||
| ece0618a17 | |||
| 8671265479 | |||
| ee358c1e84 | |||
| e5cb351a1a | |||
| 7e92cf251a | |||
| 0a71e7b321 | |||
| 83dac38f29 | |||
| 2e370afe2c | |||
| 7b81777d9d | |||
| 73ee70c52a | |||
| 68a7517dec | |||
| bbfc9998a5 | |||
| 4d6ecf78cd | |||
| 925c7a3c0d |
9 changed files with 855 additions and 44 deletions
106
CLAUDE.md
Normal file
106
CLAUDE.md
Normal file
|
|
@ -0,0 +1,106 @@
|
||||||
|
# collabd - editor-agnostic collaborative editing daemon
|
||||||
|
|
||||||
|
daemon + thin adapters architecture. daemon handles yjs crdt, adapters just
|
||||||
|
hook buffer events and apply remote changes.
|
||||||
|
|
||||||
|
## status
|
||||||
|
|
||||||
|
working:
|
||||||
|
- daemon with room-based sessions
|
||||||
|
- multi-peer sync (tested with 2+ clients)
|
||||||
|
- vim9 adapter with live buffer sync
|
||||||
|
- proper crdt diffing (not delete-all-insert-all)
|
||||||
|
- integration tests for concurrent edits
|
||||||
|
|
||||||
|
not yet:
|
||||||
|
- cursor/selection sync (awareness protocol stubbed but unused)
|
||||||
|
- other editor adapters (helix, kakoune, zed)
|
||||||
|
- persistence (rooms are ephemeral)
|
||||||
|
- reconnection handling
|
||||||
|
|
||||||
|
## stack
|
||||||
|
|
||||||
|
- bun runtime
|
||||||
|
- yjs for crdt
|
||||||
|
- websocket transport
|
||||||
|
- vim9script adapter (with bun bridge since vim cant do websocket)
|
||||||
|
|
||||||
|
## running
|
||||||
|
|
||||||
|
```bash
|
||||||
|
just dev # daemon on :4040
|
||||||
|
bun test # unit tests
|
||||||
|
just check # biome lint
|
||||||
|
```
|
||||||
|
|
||||||
|
## vim adapter usage
|
||||||
|
|
||||||
|
requires Vim 9.0+ (uses vim9script)
|
||||||
|
|
||||||
|
```vim
|
||||||
|
:source adapters/vim/collab.vim
|
||||||
|
:CollabJoin roomname
|
||||||
|
:CollabLeave
|
||||||
|
:CollabStatus
|
||||||
|
```
|
||||||
|
|
||||||
|
the vim plugin spawns adapters/vim/bridge.ts which handles yjs and speaks
|
||||||
|
json lines to vim via channels.
|
||||||
|
|
||||||
|
## protocol
|
||||||
|
|
||||||
|
daemon speaks json over websocket at /ws
|
||||||
|
|
||||||
|
client -> server:
|
||||||
|
{ type: "join", room: "name" }
|
||||||
|
{ type: "leave" }
|
||||||
|
{ type: "update", data: [...] } // yjs update bytes
|
||||||
|
|
||||||
|
server -> client:
|
||||||
|
{ type: "sync", data: [...] } // full yjs state on join
|
||||||
|
{ type: "update", data: [...] } // remote changes
|
||||||
|
{ type: "peers", count: N }
|
||||||
|
|
||||||
|
## key files
|
||||||
|
|
||||||
|
- src/index.ts - websocket server, room routing
|
||||||
|
- src/session.ts - yjs doc per room, peer management
|
||||||
|
- src/protocol.ts - message types
|
||||||
|
- adapters/vim/bridge.ts - bun process vim spawns
|
||||||
|
- adapters/vim/collab.vim - vim9script plugin
|
||||||
|
|
||||||
|
## adding new editor adapters
|
||||||
|
|
||||||
|
each adapter needs:
|
||||||
|
1. hook buffer change events
|
||||||
|
2. send changes to daemon as yjs updates (or use a bridge like vim does)
|
||||||
|
3. receive remote updates and apply to buffer
|
||||||
|
4. optionally show peer cursors
|
||||||
|
|
||||||
|
see NOTES.txt for cell-grid vs text-crdt mode discussion.
|
||||||
|
see docs/ for full research and architecture breakdown.
|
||||||
|
|
||||||
|
## security
|
||||||
|
|
||||||
|
current state (research prototype):
|
||||||
|
- room name validation (alphanumeric, 1-64 chars)
|
||||||
|
- message type validation via protocol decoder
|
||||||
|
- websocket.send error handling
|
||||||
|
|
||||||
|
known gaps (not production ready):
|
||||||
|
- no authentication (anyone can join any room by name)
|
||||||
|
- no authorization (all peers see all edits)
|
||||||
|
- no rate limiting on messages or room creation
|
||||||
|
- no message size limits
|
||||||
|
- room names guessable via brute force
|
||||||
|
- no encryption (deploy behind wss, not ws)
|
||||||
|
- no audit logging
|
||||||
|
- no persistence (data lost on daemon restart)
|
||||||
|
|
||||||
|
before production:
|
||||||
|
1. auth layer (jwt tokens or unix socket for local-only)
|
||||||
|
2. per-room authorization
|
||||||
|
3. rate limiting (msgs/sec, rooms/minute)
|
||||||
|
4. message size caps
|
||||||
|
5. tls via reverse proxy
|
||||||
|
6. access logging
|
||||||
19
README.md
19
README.md
|
|
@ -1,15 +1,22 @@
|
||||||
# collabd
|
# collabd
|
||||||
|
|
||||||
To install dependencies:
|
editor-agnostic collaborative editing daemon. two vims, one buffer.
|
||||||
|
|
||||||
|
## quick start
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
bun install
|
bun install
|
||||||
|
just dev # starts daemon on :4040
|
||||||
```
|
```
|
||||||
|
|
||||||
To run:
|
in vim (requires 9.0+):
|
||||||
|
```vim
|
||||||
```bash
|
:source adapters/vim/collab.vim
|
||||||
bun run src/index.ts
|
:CollabJoin roomname
|
||||||
```
|
```
|
||||||
|
|
||||||
This project was created using `bun init` in bun v1.3.5. [Bun](https://bun.com) is a fast all-in-one JavaScript runtime.
|
open another vim, join the same room, type in either. magic.
|
||||||
|
|
||||||
|
## more info
|
||||||
|
|
||||||
|
see CLAUDE.md for architecture, protocol, and how to add new editor adapters.
|
||||||
|
|
|
||||||
194
adapters/vim/bridge.test.ts
Normal file
194
adapters/vim/bridge.test.ts
Normal file
|
|
@ -0,0 +1,194 @@
|
||||||
|
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
|
||||||
|
import { spawn } from "bun";
|
||||||
|
|
||||||
|
describe("Bridge lifecycle", () => {
|
||||||
|
let daemon: ReturnType<typeof Bun.serve>;
|
||||||
|
const DAEMON_PORT = 4042;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
const { decode } = await import("../../src/protocol");
|
||||||
|
const { getOrCreateSession, getSession, removeSession } = await import(
|
||||||
|
"../../src/session"
|
||||||
|
);
|
||||||
|
|
||||||
|
// start daemon for bridge to connect to
|
||||||
|
daemon = Bun.serve({
|
||||||
|
port: DAEMON_PORT,
|
||||||
|
fetch(req, server) {
|
||||||
|
const url = new URL(req.url);
|
||||||
|
if (url.pathname === "/ws") {
|
||||||
|
const upgraded = server.upgrade(req, {
|
||||||
|
data: { room: null, client: null },
|
||||||
|
});
|
||||||
|
if (!upgraded) {
|
||||||
|
return new Response("websocket upgrade failed", { status: 400 });
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
return new Response("test daemon");
|
||||||
|
},
|
||||||
|
websocket: {
|
||||||
|
open(ws) {
|
||||||
|
const client = { ws };
|
||||||
|
ws.data.client = client;
|
||||||
|
},
|
||||||
|
message(ws, raw) {
|
||||||
|
const msg = decode(raw.toString());
|
||||||
|
if (!msg) return;
|
||||||
|
|
||||||
|
const client = ws.data.client;
|
||||||
|
if (!client) return;
|
||||||
|
|
||||||
|
switch (msg.type) {
|
||||||
|
case "join": {
|
||||||
|
const session = getOrCreateSession(msg.room);
|
||||||
|
ws.data.room = msg.room;
|
||||||
|
session.join(client);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "leave": {
|
||||||
|
if (ws.data.room) {
|
||||||
|
const session = getSession(ws.data.room);
|
||||||
|
session?.leave(client);
|
||||||
|
removeSession(ws.data.room);
|
||||||
|
ws.data.room = null;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "update": {
|
||||||
|
if (ws.data.room) {
|
||||||
|
const session = getSession(ws.data.room);
|
||||||
|
session?.applyUpdate(new Uint8Array(msg.data), client);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
close(ws) {
|
||||||
|
if (ws.data.room && ws.data.client) {
|
||||||
|
const session = getSession(ws.data.room);
|
||||||
|
session?.leave(ws.data.client);
|
||||||
|
removeSession(ws.data.room);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
daemon.stop();
|
||||||
|
});
|
||||||
|
|
||||||
|
test("bridge starts and signals ready", async () => {
|
||||||
|
const bridge = spawn({
|
||||||
|
cmd: ["bun", "adapters/vim/bridge.ts"],
|
||||||
|
env: {
|
||||||
|
...process.env,
|
||||||
|
COLLABD_URL: `ws://localhost:${DAEMON_PORT}/ws`,
|
||||||
|
},
|
||||||
|
stdin: "pipe",
|
||||||
|
stdout: "pipe",
|
||||||
|
stderr: "pipe",
|
||||||
|
});
|
||||||
|
|
||||||
|
// read first line from stdout
|
||||||
|
const reader = bridge.stdout.getReader();
|
||||||
|
const decoder = new TextDecoder();
|
||||||
|
|
||||||
|
const { value } = await reader.read();
|
||||||
|
expect(value).toBeDefined();
|
||||||
|
|
||||||
|
const output = decoder.decode(value);
|
||||||
|
const firstLine = output.trim().split("\n")[0];
|
||||||
|
const msg = JSON.parse(firstLine);
|
||||||
|
|
||||||
|
expect(msg.type).toBe("ready");
|
||||||
|
|
||||||
|
bridge.kill();
|
||||||
|
await bridge.exited;
|
||||||
|
});
|
||||||
|
|
||||||
|
test("bridge connects to daemon and disconnects cleanly", async () => {
|
||||||
|
const bridge = spawn({
|
||||||
|
cmd: ["bun", "adapters/vim/bridge.ts"],
|
||||||
|
env: {
|
||||||
|
...process.env,
|
||||||
|
COLLABD_URL: `ws://localhost:${DAEMON_PORT}/ws`,
|
||||||
|
},
|
||||||
|
stdin: "pipe",
|
||||||
|
stdout: "pipe",
|
||||||
|
stderr: "pipe",
|
||||||
|
});
|
||||||
|
|
||||||
|
const reader = bridge.stdout.getReader();
|
||||||
|
const decoder = new TextDecoder();
|
||||||
|
|
||||||
|
// wait for ready
|
||||||
|
let { value } = await reader.read();
|
||||||
|
expect(value).toBeDefined();
|
||||||
|
let output = decoder.decode(value);
|
||||||
|
expect(output).toContain('"type":"ready"');
|
||||||
|
|
||||||
|
// send connect message
|
||||||
|
bridge.stdin.write(
|
||||||
|
`${JSON.stringify({ type: "connect", room: "test" })}\n`,
|
||||||
|
);
|
||||||
|
|
||||||
|
// wait for connected message
|
||||||
|
({ value } = await reader.read());
|
||||||
|
expect(value).toBeDefined();
|
||||||
|
output = decoder.decode(value);
|
||||||
|
expect(output).toContain('"type":"connected"');
|
||||||
|
expect(output).toContain('"room":"test"');
|
||||||
|
|
||||||
|
// send disconnect message
|
||||||
|
bridge.stdin.write(`${JSON.stringify({ type: "disconnect" })}\n`);
|
||||||
|
bridge.stdin.end();
|
||||||
|
|
||||||
|
// bridge should exit cleanly
|
||||||
|
const exitCode = await bridge.exited;
|
||||||
|
expect(exitCode).toBe(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("bridge handles content synchronization", async () => {
|
||||||
|
const bridge = spawn({
|
||||||
|
cmd: ["bun", "adapters/vim/bridge.ts"],
|
||||||
|
env: {
|
||||||
|
...process.env,
|
||||||
|
COLLABD_URL: `ws://localhost:${DAEMON_PORT}/ws`,
|
||||||
|
},
|
||||||
|
stdin: "pipe",
|
||||||
|
stdout: "pipe",
|
||||||
|
stderr: "pipe",
|
||||||
|
});
|
||||||
|
|
||||||
|
const reader = bridge.stdout.getReader();
|
||||||
|
|
||||||
|
// wait for ready
|
||||||
|
await reader.read();
|
||||||
|
|
||||||
|
// connect to room
|
||||||
|
bridge.stdin.write(
|
||||||
|
`${JSON.stringify({ type: "connect", room: "content-test" })}\n`,
|
||||||
|
);
|
||||||
|
|
||||||
|
// wait for connected and peers messages
|
||||||
|
await reader.read();
|
||||||
|
await reader.read();
|
||||||
|
|
||||||
|
// send content
|
||||||
|
bridge.stdin.write(
|
||||||
|
`${JSON.stringify({ type: "content", text: "hello world" })}\n`,
|
||||||
|
);
|
||||||
|
|
||||||
|
// give it time to process
|
||||||
|
await new Promise((r) => setTimeout(r, 100));
|
||||||
|
|
||||||
|
// disconnect
|
||||||
|
bridge.stdin.write(`${JSON.stringify({ type: "disconnect" })}\n`);
|
||||||
|
bridge.stdin.end();
|
||||||
|
|
||||||
|
const exitCode = await bridge.exited;
|
||||||
|
expect(exitCode).toBe(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
@ -9,19 +9,22 @@ const DAEMON_URL = process.env.COLLABD_URL || "ws://localhost:4040/ws";
|
||||||
let ws: WebSocket | null = null;
|
let ws: WebSocket | null = null;
|
||||||
let doc: Y.Doc | null = null;
|
let doc: Y.Doc | null = null;
|
||||||
let text: Y.Text | null = null;
|
let text: Y.Text | null = null;
|
||||||
let suppressLocal = false;
|
let suppressLocal = 0;
|
||||||
|
|
||||||
function send(msg: object) {
|
function send(msg: object) {
|
||||||
console.log(JSON.stringify(msg));
|
console.log(JSON.stringify(msg));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// signal to vim that bridge is ready
|
||||||
|
send({ type: "ready" });
|
||||||
|
|
||||||
function connect(roomName: string) {
|
function connect(roomName: string) {
|
||||||
doc = new Y.Doc();
|
doc = new Y.Doc();
|
||||||
text = doc.getText("content");
|
text = doc.getText("content");
|
||||||
|
|
||||||
// when remote changes come in, notify vim
|
// when remote changes come in, notify vim
|
||||||
text.observe(() => {
|
text.observe(() => {
|
||||||
if (!suppressLocal) {
|
if (suppressLocal === 0) {
|
||||||
send({ type: "content", text: text?.toString() || "" });
|
send({ type: "content", text: text?.toString() || "" });
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
@ -39,10 +42,16 @@ function connect(roomName: string) {
|
||||||
case "sync":
|
case "sync":
|
||||||
case "update": {
|
case "update": {
|
||||||
if (!doc) break;
|
if (!doc) break;
|
||||||
suppressLocal = true;
|
try {
|
||||||
Y.applyUpdate(doc, new Uint8Array(msg.data));
|
suppressLocal++;
|
||||||
suppressLocal = false;
|
Y.applyUpdate(doc, new Uint8Array(msg.data));
|
||||||
send({ type: "content", text: text?.toString() || "" });
|
suppressLocal--;
|
||||||
|
send({ type: "content", text: text?.toString() || "" });
|
||||||
|
} catch (err) {
|
||||||
|
suppressLocal--;
|
||||||
|
console.error("failed to apply yjs update:", err);
|
||||||
|
send({ type: "error", message: "failed to apply remote update" });
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case "peers": {
|
case "peers": {
|
||||||
|
|
@ -61,19 +70,56 @@ function connect(roomName: string) {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// compute minimal edit operations using LCS-based diff
|
||||||
|
function computeDiff(oldText: string, newText: string) {
|
||||||
|
// find common prefix
|
||||||
|
let prefixLen = 0;
|
||||||
|
while (
|
||||||
|
prefixLen < oldText.length &&
|
||||||
|
prefixLen < newText.length &&
|
||||||
|
oldText[prefixLen] === newText[prefixLen]
|
||||||
|
) {
|
||||||
|
prefixLen++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// find common suffix
|
||||||
|
let suffixLen = 0;
|
||||||
|
while (
|
||||||
|
suffixLen < oldText.length - prefixLen &&
|
||||||
|
suffixLen < newText.length - prefixLen &&
|
||||||
|
oldText[oldText.length - 1 - suffixLen] ===
|
||||||
|
newText[newText.length - 1 - suffixLen]
|
||||||
|
) {
|
||||||
|
suffixLen++;
|
||||||
|
}
|
||||||
|
|
||||||
|
const deleteStart = prefixLen;
|
||||||
|
const deleteLen = oldText.length - prefixLen - suffixLen;
|
||||||
|
const insertText = newText.slice(prefixLen, newText.length - suffixLen);
|
||||||
|
|
||||||
|
return { deleteStart, deleteLen, insertText };
|
||||||
|
}
|
||||||
|
|
||||||
function setContent(newContent: string) {
|
function setContent(newContent: string) {
|
||||||
if (!doc || !text || !ws) return;
|
if (!doc || !text || !ws) return;
|
||||||
|
|
||||||
const oldContent = text.toString();
|
const oldContent = text.toString();
|
||||||
if (oldContent === newContent) return;
|
if (oldContent === newContent) return;
|
||||||
|
|
||||||
// compute diff and apply
|
// compute minimal diff and apply
|
||||||
// simple approach: delete all, insert all
|
const { deleteStart, deleteLen, insertText } = computeDiff(
|
||||||
// TODO: proper diff for efficiency
|
oldContent,
|
||||||
|
newContent,
|
||||||
|
);
|
||||||
|
|
||||||
const t = text;
|
const t = text;
|
||||||
doc.transact(() => {
|
doc.transact(() => {
|
||||||
t.delete(0, t.length);
|
if (deleteLen > 0) {
|
||||||
t.insert(0, newContent);
|
t.delete(deleteStart, deleteLen);
|
||||||
|
}
|
||||||
|
if (insertText.length > 0) {
|
||||||
|
t.insert(deleteStart, insertText);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// send update to daemon
|
// send update to daemon
|
||||||
|
|
@ -82,11 +128,14 @@ function setContent(newContent: string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// read json lines from stdin
|
// read json lines from stdin
|
||||||
|
let buffer = "";
|
||||||
const decoder = new TextDecoder();
|
const decoder = new TextDecoder();
|
||||||
for await (const chunk of Bun.stdin.stream()) {
|
for await (const chunk of Bun.stdin.stream()) {
|
||||||
const lines = decoder.decode(chunk).trim().split("\n");
|
buffer += decoder.decode(chunk);
|
||||||
|
const lines = buffer.split("\n");
|
||||||
|
buffer = lines.pop() ?? ""; // keep incomplete last line
|
||||||
for (const line of lines) {
|
for (const line of lines) {
|
||||||
if (!line) continue;
|
if (!line.trim()) continue;
|
||||||
try {
|
try {
|
||||||
const msg = JSON.parse(line);
|
const msg = JSON.parse(line);
|
||||||
switch (msg.type) {
|
switch (msg.type) {
|
||||||
|
|
@ -98,7 +147,8 @@ for await (const chunk of Bun.stdin.stream()) {
|
||||||
break;
|
break;
|
||||||
case "disconnect":
|
case "disconnect":
|
||||||
ws?.close();
|
ws?.close();
|
||||||
break;
|
send({ type: "disconnected" });
|
||||||
|
process.exit(0);
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
send({ type: "error", message: "invalid json" });
|
send({ type: "error", message: "invalid json" });
|
||||||
|
|
|
||||||
|
|
@ -3,9 +3,10 @@ vim9script
|
||||||
# collab.vim - collaborative editing adapter for collabd
|
# collab.vim - collaborative editing adapter for collabd
|
||||||
# requires: bun, collabd daemon running
|
# requires: bun, collabd daemon running
|
||||||
|
|
||||||
var bridge_job: job
|
var bridge_job: job = null_job
|
||||||
var bridge_channel: channel
|
var bridge_channel: channel = null_channel
|
||||||
var connected = false
|
var connected = false
|
||||||
|
var ready = false
|
||||||
var room = ""
|
var room = ""
|
||||||
var suppressing = false
|
var suppressing = false
|
||||||
|
|
||||||
|
|
@ -13,7 +14,7 @@ var suppressing = false
|
||||||
const bridge_script = expand('<sfile>:p:h') .. '/bridge.ts'
|
const bridge_script = expand('<sfile>:p:h') .. '/bridge.ts'
|
||||||
|
|
||||||
def Send(msg: dict<any>)
|
def Send(msg: dict<any>)
|
||||||
if bridge_channel != null
|
if bridge_channel != null_channel
|
||||||
ch_sendraw(bridge_channel, json_encode(msg) .. "\n")
|
ch_sendraw(bridge_channel, json_encode(msg) .. "\n")
|
||||||
endif
|
endif
|
||||||
enddef
|
enddef
|
||||||
|
|
@ -29,7 +30,9 @@ def OnOutput(ch: channel, msg: string)
|
||||||
return
|
return
|
||||||
endtry
|
endtry
|
||||||
|
|
||||||
if data.type == 'connected'
|
if data.type == 'ready'
|
||||||
|
ready = true
|
||||||
|
elseif data.type == 'connected'
|
||||||
connected = true
|
connected = true
|
||||||
echom '[collab] connected to room: ' .. data.room
|
echom '[collab] connected to room: ' .. data.room
|
||||||
elseif data.type == 'disconnected'
|
elseif data.type == 'disconnected'
|
||||||
|
|
@ -67,11 +70,12 @@ def SendBuffer()
|
||||||
enddef
|
enddef
|
||||||
|
|
||||||
export def Connect(room_name: string)
|
export def Connect(room_name: string)
|
||||||
if bridge_job != null
|
if bridge_job != null_job
|
||||||
Disconnect()
|
Disconnect()
|
||||||
endif
|
endif
|
||||||
|
|
||||||
room = room_name
|
room = room_name
|
||||||
|
ready = false
|
||||||
bridge_job = job_start(['bun', 'run', bridge_script], {
|
bridge_job = job_start(['bun', 'run', bridge_script], {
|
||||||
mode: 'nl',
|
mode: 'nl',
|
||||||
out_cb: OnOutput,
|
out_cb: OnOutput,
|
||||||
|
|
@ -79,8 +83,19 @@ export def Connect(room_name: string)
|
||||||
})
|
})
|
||||||
bridge_channel = job_getchannel(bridge_job)
|
bridge_channel = job_getchannel(bridge_job)
|
||||||
|
|
||||||
# give it a moment to start
|
# wait for bridge ready signal
|
||||||
sleep 100m
|
var timeout = 50
|
||||||
|
while !ready && timeout > 0
|
||||||
|
sleep 10m
|
||||||
|
timeout -= 1
|
||||||
|
endwhile
|
||||||
|
|
||||||
|
if !ready
|
||||||
|
echoerr '[collab] bridge failed to start'
|
||||||
|
Disconnect()
|
||||||
|
return
|
||||||
|
endif
|
||||||
|
|
||||||
Send({type: 'connect', room: room_name})
|
Send({type: 'connect', room: room_name})
|
||||||
|
|
||||||
# set up autocmds to send changes
|
# set up autocmds to send changes
|
||||||
|
|
@ -91,13 +106,14 @@ export def Connect(room_name: string)
|
||||||
enddef
|
enddef
|
||||||
|
|
||||||
export def Disconnect()
|
export def Disconnect()
|
||||||
if bridge_job != null
|
if bridge_job != null_job
|
||||||
Send({type: 'disconnect'})
|
Send({type: 'disconnect'})
|
||||||
job_stop(bridge_job)
|
job_stop(bridge_job)
|
||||||
bridge_job = null
|
bridge_job = null_job
|
||||||
bridge_channel = null
|
bridge_channel = null_channel
|
||||||
endif
|
endif
|
||||||
connected = false
|
connected = false
|
||||||
|
ready = false
|
||||||
room = ""
|
room = ""
|
||||||
augroup CollabVim
|
augroup CollabVim
|
||||||
autocmd!
|
autocmd!
|
||||||
|
|
|
||||||
33
src/index.ts
33
src/index.ts
|
|
@ -3,12 +3,20 @@ import { type Client, getOrCreateSession, getSession } from "./session";
|
||||||
|
|
||||||
const PORT = Number(process.env.PORT) || 4040;
|
const PORT = Number(process.env.PORT) || 4040;
|
||||||
|
|
||||||
|
function isValidRoomName(name: unknown): name is string {
|
||||||
|
if (typeof name !== "string") return false;
|
||||||
|
if (name.length === 0 || name.length > 64) return false;
|
||||||
|
return /^[a-zA-Z0-9_-]+$/.test(name);
|
||||||
|
}
|
||||||
|
|
||||||
Bun.serve({
|
Bun.serve({
|
||||||
port: PORT,
|
port: PORT,
|
||||||
fetch(req, server) {
|
fetch(req, server) {
|
||||||
const url = new URL(req.url);
|
const url = new URL(req.url);
|
||||||
if (url.pathname === "/ws") {
|
if (url.pathname === "/ws") {
|
||||||
const upgraded = server.upgrade(req, { data: { room: null } });
|
const upgraded = server.upgrade(req, {
|
||||||
|
data: { room: null, client: null },
|
||||||
|
});
|
||||||
if (!upgraded) {
|
if (!upgraded) {
|
||||||
return new Response("websocket upgrade failed", { status: 400 });
|
return new Response("websocket upgrade failed", { status: 400 });
|
||||||
}
|
}
|
||||||
|
|
@ -17,17 +25,32 @@ Bun.serve({
|
||||||
return new Response("collabd running");
|
return new Response("collabd running");
|
||||||
},
|
},
|
||||||
websocket: {
|
websocket: {
|
||||||
open() {
|
open(ws) {
|
||||||
|
// create client object once and store in ws.data
|
||||||
|
const client: Client = { ws };
|
||||||
|
ws.data.client = client;
|
||||||
console.debug("client connected");
|
console.debug("client connected");
|
||||||
},
|
},
|
||||||
message(ws, raw) {
|
message(ws, raw) {
|
||||||
const msg = decode(raw.toString());
|
const msg = decode(raw.toString());
|
||||||
if (!msg) return;
|
if (!msg) return;
|
||||||
|
|
||||||
const client: Client = { ws };
|
// reuse the client object from ws.data
|
||||||
|
const client = ws.data.client;
|
||||||
|
if (!client) return;
|
||||||
|
|
||||||
switch (msg.type) {
|
switch (msg.type) {
|
||||||
case "join": {
|
case "join": {
|
||||||
|
if (!isValidRoomName(msg.room)) {
|
||||||
|
try {
|
||||||
|
ws.send(
|
||||||
|
JSON.stringify({ type: "error", message: "invalid room name" }),
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
console.debug("failed to send error to client:", err);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
const session = getOrCreateSession(msg.room);
|
const session = getOrCreateSession(msg.room);
|
||||||
ws.data.room = msg.room;
|
ws.data.room = msg.room;
|
||||||
session.join(client);
|
session.join(client);
|
||||||
|
|
@ -52,9 +75,9 @@ Bun.serve({
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
close(ws) {
|
close(ws) {
|
||||||
if (ws.data.room) {
|
if (ws.data.room && ws.data.client) {
|
||||||
const session = getSession(ws.data.room);
|
const session = getSession(ws.data.room);
|
||||||
session?.leave({ ws });
|
session?.leave(ws.data.client);
|
||||||
}
|
}
|
||||||
console.debug("client disconnected");
|
console.debug("client disconnected");
|
||||||
},
|
},
|
||||||
|
|
|
||||||
338
src/integration.test.ts
Normal file
338
src/integration.test.ts
Normal file
|
|
@ -0,0 +1,338 @@
|
||||||
|
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
|
||||||
|
import type { ServerMessage } from "./protocol";
|
||||||
|
import { decode } from "./protocol";
|
||||||
|
import { getOrCreateSession, getSession, removeSession } from "./session";
|
||||||
|
|
||||||
|
describe("WebSocket concurrent edits integration", () => {
|
||||||
|
let server: ReturnType<typeof Bun.serve>;
|
||||||
|
const PORT = 4041; // use different port for tests
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
// start server for each test
|
||||||
|
server = Bun.serve({
|
||||||
|
port: PORT,
|
||||||
|
fetch(req, server) {
|
||||||
|
const url = new URL(req.url);
|
||||||
|
if (url.pathname === "/ws") {
|
||||||
|
const upgraded = server.upgrade(req, {
|
||||||
|
data: { room: null, client: null },
|
||||||
|
});
|
||||||
|
if (!upgraded) {
|
||||||
|
return new Response("websocket upgrade failed", { status: 400 });
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
return new Response("collabd test server");
|
||||||
|
},
|
||||||
|
websocket: {
|
||||||
|
open(ws) {
|
||||||
|
const client = { ws };
|
||||||
|
ws.data.client = client;
|
||||||
|
},
|
||||||
|
message(ws, raw) {
|
||||||
|
const msg = decode(raw.toString());
|
||||||
|
if (!msg) return;
|
||||||
|
|
||||||
|
const client = ws.data.client;
|
||||||
|
if (!client) return;
|
||||||
|
|
||||||
|
switch (msg.type) {
|
||||||
|
case "join": {
|
||||||
|
const session = getOrCreateSession(msg.room);
|
||||||
|
ws.data.room = msg.room;
|
||||||
|
session.join(client);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "leave": {
|
||||||
|
if (ws.data.room) {
|
||||||
|
const session = getSession(ws.data.room);
|
||||||
|
session?.leave(client);
|
||||||
|
removeSession(ws.data.room);
|
||||||
|
ws.data.room = null;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "update": {
|
||||||
|
if (ws.data.room) {
|
||||||
|
const session = getSession(ws.data.room);
|
||||||
|
session?.applyUpdate(new Uint8Array(msg.data), client);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
close(ws) {
|
||||||
|
if (ws.data.room && ws.data.client) {
|
||||||
|
const session = getSession(ws.data.room);
|
||||||
|
session?.leave(ws.data.client);
|
||||||
|
removeSession(ws.data.room);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
server.stop();
|
||||||
|
});
|
||||||
|
|
||||||
|
test("two clients concurrent edits converge to same state", async () => {
|
||||||
|
const Y = await import("yjs");
|
||||||
|
|
||||||
|
// create two clients with their own yjs docs
|
||||||
|
const doc1 = new Y.Doc();
|
||||||
|
const text1 = doc1.getText("content");
|
||||||
|
|
||||||
|
const doc2 = new Y.Doc();
|
||||||
|
const text2 = doc2.getText("content");
|
||||||
|
|
||||||
|
// track messages received by each client
|
||||||
|
const client1Messages: ServerMessage[] = [];
|
||||||
|
const client2Messages: ServerMessage[] = [];
|
||||||
|
|
||||||
|
let ws1Ready = false;
|
||||||
|
let ws2Ready = false;
|
||||||
|
|
||||||
|
const ws1 = new WebSocket(`ws://localhost:${PORT}/ws`);
|
||||||
|
const ws2 = new WebSocket(`ws://localhost:${PORT}/ws`);
|
||||||
|
|
||||||
|
// setup all handlers immediately
|
||||||
|
ws1.onopen = () => {
|
||||||
|
ws1.send(JSON.stringify({ type: "join", room: "test-room" }));
|
||||||
|
};
|
||||||
|
|
||||||
|
ws1.onmessage = (ev) => {
|
||||||
|
const msg = JSON.parse(ev.data);
|
||||||
|
client1Messages.push(msg);
|
||||||
|
if (msg.type === "sync") {
|
||||||
|
ws1Ready = true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
ws2.onopen = () => {
|
||||||
|
ws2.send(JSON.stringify({ type: "join", room: "test-room" }));
|
||||||
|
};
|
||||||
|
|
||||||
|
ws2.onmessage = (ev) => {
|
||||||
|
const msg = JSON.parse(ev.data);
|
||||||
|
client2Messages.push(msg);
|
||||||
|
if (msg.type === "sync") {
|
||||||
|
ws2Ready = true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// wait for both to be ready
|
||||||
|
await new Promise<void>((resolve, reject) => {
|
||||||
|
const timeout = setTimeout(() => {
|
||||||
|
reject(new Error("clients join timeout"));
|
||||||
|
}, 2000);
|
||||||
|
|
||||||
|
const interval = setInterval(() => {
|
||||||
|
if (ws1Ready && ws2Ready) {
|
||||||
|
clearInterval(interval);
|
||||||
|
clearTimeout(timeout);
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
}, 10);
|
||||||
|
});
|
||||||
|
|
||||||
|
// wait a bit for peers message
|
||||||
|
await new Promise((r) => setTimeout(r, 50));
|
||||||
|
|
||||||
|
// client 1 inserts "hello" at position 0
|
||||||
|
text1.insert(0, "hello");
|
||||||
|
const update1 = Y.encodeStateAsUpdate(doc1);
|
||||||
|
ws1.send(JSON.stringify({ type: "update", data: Array.from(update1) }));
|
||||||
|
|
||||||
|
// client 2 inserts "world" at position 0 (concurrent edit)
|
||||||
|
text2.insert(0, "world");
|
||||||
|
const update2 = Y.encodeStateAsUpdate(doc2);
|
||||||
|
ws2.send(JSON.stringify({ type: "update", data: Array.from(update2) }));
|
||||||
|
|
||||||
|
// wait for updates to propagate
|
||||||
|
await new Promise((r) => setTimeout(r, 200));
|
||||||
|
|
||||||
|
// apply received updates to both clients
|
||||||
|
for (const msg of client1Messages) {
|
||||||
|
if (msg.type === "update" || msg.type === "sync") {
|
||||||
|
Y.applyUpdate(doc1, new Uint8Array(msg.data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const msg of client2Messages) {
|
||||||
|
if (msg.type === "update" || msg.type === "sync") {
|
||||||
|
Y.applyUpdate(doc2, new Uint8Array(msg.data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// both clients should have the same final content
|
||||||
|
const final1 = text1.toString();
|
||||||
|
const final2 = text2.toString();
|
||||||
|
|
||||||
|
expect(final1).toBe(final2);
|
||||||
|
|
||||||
|
// both words should be present
|
||||||
|
expect(final1).toContain("hello");
|
||||||
|
expect(final1).toContain("world");
|
||||||
|
|
||||||
|
// total length should be sum of both inserts
|
||||||
|
expect(final1.length).toBe(10); // "hello" + "world"
|
||||||
|
|
||||||
|
ws1.close();
|
||||||
|
ws2.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
test("three clients with sequential edits converge", async () => {
|
||||||
|
const Y = await import("yjs");
|
||||||
|
|
||||||
|
const doc1 = new Y.Doc();
|
||||||
|
const text1 = doc1.getText("content");
|
||||||
|
|
||||||
|
const doc2 = new Y.Doc();
|
||||||
|
const text2 = doc2.getText("content");
|
||||||
|
|
||||||
|
const doc3 = new Y.Doc();
|
||||||
|
const text3 = doc3.getText("content");
|
||||||
|
|
||||||
|
const client1Messages: ServerMessage[] = [];
|
||||||
|
const client2Messages: ServerMessage[] = [];
|
||||||
|
const client3Messages: ServerMessage[] = [];
|
||||||
|
|
||||||
|
let ws1Ready = false;
|
||||||
|
let ws2Ready = false;
|
||||||
|
let ws3Ready = false;
|
||||||
|
|
||||||
|
const ws1 = new WebSocket(`ws://localhost:${PORT}/ws`);
|
||||||
|
const ws2 = new WebSocket(`ws://localhost:${PORT}/ws`);
|
||||||
|
const ws3 = new WebSocket(`ws://localhost:${PORT}/ws`);
|
||||||
|
|
||||||
|
// setup all handlers immediately
|
||||||
|
ws1.onopen = () => {
|
||||||
|
ws1.send(JSON.stringify({ type: "join", room: "multi-room" }));
|
||||||
|
};
|
||||||
|
|
||||||
|
ws1.onmessage = (ev) => {
|
||||||
|
const msg = JSON.parse(ev.data);
|
||||||
|
client1Messages.push(msg);
|
||||||
|
if (msg.type === "sync") {
|
||||||
|
ws1Ready = true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
ws2.onopen = () => {
|
||||||
|
ws2.send(JSON.stringify({ type: "join", room: "multi-room" }));
|
||||||
|
};
|
||||||
|
|
||||||
|
ws2.onmessage = (ev) => {
|
||||||
|
const msg = JSON.parse(ev.data);
|
||||||
|
client2Messages.push(msg);
|
||||||
|
if (msg.type === "sync") {
|
||||||
|
ws2Ready = true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
ws3.onopen = () => {
|
||||||
|
ws3.send(JSON.stringify({ type: "join", room: "multi-room" }));
|
||||||
|
};
|
||||||
|
|
||||||
|
ws3.onmessage = (ev) => {
|
||||||
|
const msg = JSON.parse(ev.data);
|
||||||
|
client3Messages.push(msg);
|
||||||
|
if (msg.type === "sync") {
|
||||||
|
ws3Ready = true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// wait for all three to be ready
|
||||||
|
await new Promise<void>((resolve, reject) => {
|
||||||
|
const timeout = setTimeout(() => {
|
||||||
|
reject(new Error("clients join timeout"));
|
||||||
|
}, 2000);
|
||||||
|
|
||||||
|
const interval = setInterval(() => {
|
||||||
|
if (ws1Ready && ws2Ready && ws3Ready) {
|
||||||
|
clearInterval(interval);
|
||||||
|
clearTimeout(timeout);
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
}, 10);
|
||||||
|
});
|
||||||
|
|
||||||
|
await new Promise((r) => setTimeout(r, 50));
|
||||||
|
|
||||||
|
// client 1: insert "a"
|
||||||
|
text1.insert(0, "a");
|
||||||
|
ws1.send(
|
||||||
|
JSON.stringify({
|
||||||
|
type: "update",
|
||||||
|
data: Array.from(Y.encodeStateAsUpdate(doc1)),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
await new Promise((r) => setTimeout(r, 30));
|
||||||
|
|
||||||
|
// client 2: insert "b"
|
||||||
|
// first apply any updates received
|
||||||
|
for (const msg of client2Messages) {
|
||||||
|
if (msg.type === "update" || msg.type === "sync") {
|
||||||
|
Y.applyUpdate(doc2, new Uint8Array(msg.data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
text2.insert(text2.length, "b");
|
||||||
|
ws2.send(
|
||||||
|
JSON.stringify({
|
||||||
|
type: "update",
|
||||||
|
data: Array.from(Y.encodeStateAsUpdate(doc2)),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
await new Promise((r) => setTimeout(r, 30));
|
||||||
|
|
||||||
|
// client 3: insert "c"
|
||||||
|
for (const msg of client3Messages) {
|
||||||
|
if (msg.type === "update" || msg.type === "sync") {
|
||||||
|
Y.applyUpdate(doc3, new Uint8Array(msg.data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
text3.insert(text3.length, "c");
|
||||||
|
ws3.send(
|
||||||
|
JSON.stringify({
|
||||||
|
type: "update",
|
||||||
|
data: Array.from(Y.encodeStateAsUpdate(doc3)),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
await new Promise((r) => setTimeout(r, 200));
|
||||||
|
|
||||||
|
// apply all updates
|
||||||
|
for (const msg of client1Messages) {
|
||||||
|
if (msg.type === "update" || msg.type === "sync") {
|
||||||
|
Y.applyUpdate(doc1, new Uint8Array(msg.data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const msg of client2Messages) {
|
||||||
|
if (msg.type === "update" || msg.type === "sync") {
|
||||||
|
Y.applyUpdate(doc2, new Uint8Array(msg.data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const msg of client3Messages) {
|
||||||
|
if (msg.type === "update" || msg.type === "sync") {
|
||||||
|
Y.applyUpdate(doc3, new Uint8Array(msg.data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const final1 = text1.toString();
|
||||||
|
const final2 = text2.toString();
|
||||||
|
const final3 = text3.toString();
|
||||||
|
|
||||||
|
expect(final1).toBe(final2);
|
||||||
|
expect(final2).toBe(final3);
|
||||||
|
expect(final1).toBe("abc");
|
||||||
|
|
||||||
|
ws1.close();
|
||||||
|
ws2.close();
|
||||||
|
ws3.close();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
@ -10,15 +10,48 @@ export type ServerMessage =
|
||||||
| { type: "sync"; data: number[] } // full yjs state
|
| { type: "sync"; data: number[] } // full yjs state
|
||||||
| { type: "update"; data: number[] }
|
| { type: "update"; data: number[] }
|
||||||
| { type: "awareness"; data: number[] }
|
| { type: "awareness"; data: number[] }
|
||||||
| { type: "peers"; count: number };
|
| { type: "peers"; count: number }
|
||||||
|
| { type: "error"; message: string };
|
||||||
|
|
||||||
|
export type BridgeMessage =
|
||||||
|
| { type: "ready" }
|
||||||
|
| { type: "connected"; room: string }
|
||||||
|
| { type: "disconnected" }
|
||||||
|
| { type: "content"; text: string }
|
||||||
|
| { type: "peers"; count: number }
|
||||||
|
| { type: "error"; message: string };
|
||||||
|
|
||||||
export function encode(msg: ServerMessage): string {
|
export function encode(msg: ServerMessage): string {
|
||||||
return JSON.stringify(msg);
|
return JSON.stringify(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function isClientMessage(obj: unknown): obj is ClientMessage {
|
||||||
|
if (typeof obj !== "object" || obj === null) return false;
|
||||||
|
const msg = obj as Record<string, unknown>;
|
||||||
|
|
||||||
|
switch (msg.type) {
|
||||||
|
case "join":
|
||||||
|
return typeof msg.room === "string";
|
||||||
|
case "leave":
|
||||||
|
return true;
|
||||||
|
case "update":
|
||||||
|
return (
|
||||||
|
Array.isArray(msg.data) && msg.data.every((n) => typeof n === "number")
|
||||||
|
);
|
||||||
|
case "awareness":
|
||||||
|
return (
|
||||||
|
Array.isArray(msg.data) && msg.data.every((n) => typeof n === "number")
|
||||||
|
);
|
||||||
|
default:
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export function decode(raw: string): ClientMessage | null {
|
export function decode(raw: string): ClientMessage | null {
|
||||||
try {
|
try {
|
||||||
return JSON.parse(raw) as ClientMessage;
|
const parsed = JSON.parse(raw);
|
||||||
|
if (!isClientMessage(parsed)) return null;
|
||||||
|
return parsed;
|
||||||
} catch {
|
} catch {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,21 +18,52 @@ export class Session {
|
||||||
this.clients.add(client);
|
this.clients.add(client);
|
||||||
// send full state to new client
|
// send full state to new client
|
||||||
const state = Y.encodeStateAsUpdate(this.doc);
|
const state = Y.encodeStateAsUpdate(this.doc);
|
||||||
client.ws.send(encode({ type: "sync", data: Array.from(state) }));
|
try {
|
||||||
|
client.ws.send(encode({ type: "sync", data: Array.from(state) }));
|
||||||
|
} catch (err) {
|
||||||
|
console.debug("failed to send sync to client, removing:", err);
|
||||||
|
this.clients.delete(client);
|
||||||
|
return;
|
||||||
|
}
|
||||||
this.broadcastPeerCount();
|
this.broadcastPeerCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
leave(client: Client) {
|
leave(client: Client) {
|
||||||
this.clients.delete(client);
|
this.clients.delete(client);
|
||||||
this.broadcastPeerCount();
|
this.broadcastPeerCount();
|
||||||
|
if (this.isEmpty()) {
|
||||||
|
sessions.delete(this.name);
|
||||||
|
console.debug(`session removed: ${this.name}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
isEmpty(): boolean {
|
||||||
|
return this.clients.size === 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
applyUpdate(update: Uint8Array, from: Client) {
|
applyUpdate(update: Uint8Array, from: Client) {
|
||||||
Y.applyUpdate(this.doc, update);
|
try {
|
||||||
// broadcast to others
|
Y.applyUpdate(this.doc, update);
|
||||||
for (const client of this.clients) {
|
// broadcast to others
|
||||||
if (client !== from) {
|
for (const client of this.clients) {
|
||||||
client.ws.send(encode({ type: "update", data: Array.from(update) }));
|
if (client !== from) {
|
||||||
|
try {
|
||||||
|
client.ws.send(
|
||||||
|
encode({ type: "update", data: Array.from(update) }),
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
console.debug("failed to send update to client, removing:", err);
|
||||||
|
this.clients.delete(client);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error(`failed to apply update in session ${this.name}:`, err);
|
||||||
|
// optionally notify the client that sent the bad update
|
||||||
|
try {
|
||||||
|
from.ws.send(encode({ type: "error", message: "invalid update" }));
|
||||||
|
} catch {
|
||||||
|
// ignore send errors
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -40,7 +71,12 @@ export class Session {
|
||||||
broadcastPeerCount() {
|
broadcastPeerCount() {
|
||||||
const msg = encode({ type: "peers", count: this.clients.size });
|
const msg = encode({ type: "peers", count: this.clients.size });
|
||||||
for (const client of this.clients) {
|
for (const client of this.clients) {
|
||||||
client.ws.send(msg);
|
try {
|
||||||
|
client.ws.send(msg);
|
||||||
|
} catch (err) {
|
||||||
|
console.debug("failed to send peer count to client, removing:", err);
|
||||||
|
this.clients.delete(client);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -61,3 +97,11 @@ export function getOrCreateSession(name: string): Session {
|
||||||
export function getSession(name: string): Session | undefined {
|
export function getSession(name: string): Session | undefined {
|
||||||
return sessions.get(name);
|
return sessions.get(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function removeSession(name: string): void {
|
||||||
|
const session = sessions.get(name);
|
||||||
|
if (session?.isEmpty()) {
|
||||||
|
sessions.delete(name);
|
||||||
|
console.debug(`session removed: ${name}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue