Compare commits
No commits in common. "05629a00a033bbf4d1444af29ce852b886d91daf" and "3cf16586aa7abe25a7230e13d957cd55e30ef24e" have entirely different histories.
05629a00a0
...
3cf16586aa
9 changed files with 44 additions and 855 deletions
106
CLAUDE.md
106
CLAUDE.md
|
|
@ -1,106 +0,0 @@
|
||||||
# collabd - editor-agnostic collaborative editing daemon
|
|
||||||
|
|
||||||
daemon + thin adapters architecture. daemon handles yjs crdt, adapters just
|
|
||||||
hook buffer events and apply remote changes.
|
|
||||||
|
|
||||||
## status
|
|
||||||
|
|
||||||
working:
|
|
||||||
- daemon with room-based sessions
|
|
||||||
- multi-peer sync (tested with 2+ clients)
|
|
||||||
- vim9 adapter with live buffer sync
|
|
||||||
- proper crdt diffing (not delete-all-insert-all)
|
|
||||||
- integration tests for concurrent edits
|
|
||||||
|
|
||||||
not yet:
|
|
||||||
- cursor/selection sync (awareness protocol stubbed but unused)
|
|
||||||
- other editor adapters (helix, kakoune, zed)
|
|
||||||
- persistence (rooms are ephemeral)
|
|
||||||
- reconnection handling
|
|
||||||
|
|
||||||
## stack
|
|
||||||
|
|
||||||
- bun runtime
|
|
||||||
- yjs for crdt
|
|
||||||
- websocket transport
|
|
||||||
- vim9script adapter (with bun bridge since vim cant do websocket)
|
|
||||||
|
|
||||||
## running
|
|
||||||
|
|
||||||
```bash
|
|
||||||
just dev # daemon on :4040
|
|
||||||
bun test # unit tests
|
|
||||||
just check # biome lint
|
|
||||||
```
|
|
||||||
|
|
||||||
## vim adapter usage
|
|
||||||
|
|
||||||
requires Vim 9.0+ (uses vim9script)
|
|
||||||
|
|
||||||
```vim
|
|
||||||
:source adapters/vim/collab.vim
|
|
||||||
:CollabJoin roomname
|
|
||||||
:CollabLeave
|
|
||||||
:CollabStatus
|
|
||||||
```
|
|
||||||
|
|
||||||
the vim plugin spawns adapters/vim/bridge.ts which handles yjs and speaks
|
|
||||||
json lines to vim via channels.
|
|
||||||
|
|
||||||
## protocol
|
|
||||||
|
|
||||||
daemon speaks json over websocket at /ws
|
|
||||||
|
|
||||||
client -> server:
|
|
||||||
{ type: "join", room: "name" }
|
|
||||||
{ type: "leave" }
|
|
||||||
{ type: "update", data: [...] } // yjs update bytes
|
|
||||||
|
|
||||||
server -> client:
|
|
||||||
{ type: "sync", data: [...] } // full yjs state on join
|
|
||||||
{ type: "update", data: [...] } // remote changes
|
|
||||||
{ type: "peers", count: N }
|
|
||||||
|
|
||||||
## key files
|
|
||||||
|
|
||||||
- src/index.ts - websocket server, room routing
|
|
||||||
- src/session.ts - yjs doc per room, peer management
|
|
||||||
- src/protocol.ts - message types
|
|
||||||
- adapters/vim/bridge.ts - bun process vim spawns
|
|
||||||
- adapters/vim/collab.vim - vim9script plugin
|
|
||||||
|
|
||||||
## adding new editor adapters
|
|
||||||
|
|
||||||
each adapter needs:
|
|
||||||
1. hook buffer change events
|
|
||||||
2. send changes to daemon as yjs updates (or use a bridge like vim does)
|
|
||||||
3. receive remote updates and apply to buffer
|
|
||||||
4. optionally show peer cursors
|
|
||||||
|
|
||||||
see NOTES.txt for cell-grid vs text-crdt mode discussion.
|
|
||||||
see docs/ for full research and architecture breakdown.
|
|
||||||
|
|
||||||
## security
|
|
||||||
|
|
||||||
current state (research prototype):
|
|
||||||
- room name validation (alphanumeric, 1-64 chars)
|
|
||||||
- message type validation via protocol decoder
|
|
||||||
- websocket.send error handling
|
|
||||||
|
|
||||||
known gaps (not production ready):
|
|
||||||
- no authentication (anyone can join any room by name)
|
|
||||||
- no authorization (all peers see all edits)
|
|
||||||
- no rate limiting on messages or room creation
|
|
||||||
- no message size limits
|
|
||||||
- room names guessable via brute force
|
|
||||||
- no encryption (deploy behind wss, not ws)
|
|
||||||
- no audit logging
|
|
||||||
- no persistence (data lost on daemon restart)
|
|
||||||
|
|
||||||
before production:
|
|
||||||
1. auth layer (jwt tokens or unix socket for local-only)
|
|
||||||
2. per-room authorization
|
|
||||||
3. rate limiting (msgs/sec, rooms/minute)
|
|
||||||
4. message size caps
|
|
||||||
5. tls via reverse proxy
|
|
||||||
6. access logging
|
|
||||||
19
README.md
19
README.md
|
|
@ -1,22 +1,15 @@
|
||||||
# collabd
|
# collabd
|
||||||
|
|
||||||
editor-agnostic collaborative editing daemon. two vims, one buffer.
|
To install dependencies:
|
||||||
|
|
||||||
## quick start
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
bun install
|
bun install
|
||||||
just dev # starts daemon on :4040
|
|
||||||
```
|
```
|
||||||
|
|
||||||
in vim (requires 9.0+):
|
To run:
|
||||||
```vim
|
|
||||||
:source adapters/vim/collab.vim
|
```bash
|
||||||
:CollabJoin roomname
|
bun run src/index.ts
|
||||||
```
|
```
|
||||||
|
|
||||||
open another vim, join the same room, type in either. magic.
|
This project was created using `bun init` in bun v1.3.5. [Bun](https://bun.com) is a fast all-in-one JavaScript runtime.
|
||||||
|
|
||||||
## more info
|
|
||||||
|
|
||||||
see CLAUDE.md for architecture, protocol, and how to add new editor adapters.
|
|
||||||
|
|
|
||||||
|
|
@ -1,194 +0,0 @@
|
||||||
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
|
|
||||||
import { spawn } from "bun";
|
|
||||||
|
|
||||||
describe("Bridge lifecycle", () => {
|
|
||||||
let daemon: ReturnType<typeof Bun.serve>;
|
|
||||||
const DAEMON_PORT = 4042;
|
|
||||||
|
|
||||||
beforeEach(async () => {
|
|
||||||
const { decode } = await import("../../src/protocol");
|
|
||||||
const { getOrCreateSession, getSession, removeSession } = await import(
|
|
||||||
"../../src/session"
|
|
||||||
);
|
|
||||||
|
|
||||||
// start daemon for bridge to connect to
|
|
||||||
daemon = Bun.serve({
|
|
||||||
port: DAEMON_PORT,
|
|
||||||
fetch(req, server) {
|
|
||||||
const url = new URL(req.url);
|
|
||||||
if (url.pathname === "/ws") {
|
|
||||||
const upgraded = server.upgrade(req, {
|
|
||||||
data: { room: null, client: null },
|
|
||||||
});
|
|
||||||
if (!upgraded) {
|
|
||||||
return new Response("websocket upgrade failed", { status: 400 });
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
return new Response("test daemon");
|
|
||||||
},
|
|
||||||
websocket: {
|
|
||||||
open(ws) {
|
|
||||||
const client = { ws };
|
|
||||||
ws.data.client = client;
|
|
||||||
},
|
|
||||||
message(ws, raw) {
|
|
||||||
const msg = decode(raw.toString());
|
|
||||||
if (!msg) return;
|
|
||||||
|
|
||||||
const client = ws.data.client;
|
|
||||||
if (!client) return;
|
|
||||||
|
|
||||||
switch (msg.type) {
|
|
||||||
case "join": {
|
|
||||||
const session = getOrCreateSession(msg.room);
|
|
||||||
ws.data.room = msg.room;
|
|
||||||
session.join(client);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case "leave": {
|
|
||||||
if (ws.data.room) {
|
|
||||||
const session = getSession(ws.data.room);
|
|
||||||
session?.leave(client);
|
|
||||||
removeSession(ws.data.room);
|
|
||||||
ws.data.room = null;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case "update": {
|
|
||||||
if (ws.data.room) {
|
|
||||||
const session = getSession(ws.data.room);
|
|
||||||
session?.applyUpdate(new Uint8Array(msg.data), client);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
close(ws) {
|
|
||||||
if (ws.data.room && ws.data.client) {
|
|
||||||
const session = getSession(ws.data.room);
|
|
||||||
session?.leave(ws.data.client);
|
|
||||||
removeSession(ws.data.room);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
},
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(() => {
|
|
||||||
daemon.stop();
|
|
||||||
});
|
|
||||||
|
|
||||||
test("bridge starts and signals ready", async () => {
|
|
||||||
const bridge = spawn({
|
|
||||||
cmd: ["bun", "adapters/vim/bridge.ts"],
|
|
||||||
env: {
|
|
||||||
...process.env,
|
|
||||||
COLLABD_URL: `ws://localhost:${DAEMON_PORT}/ws`,
|
|
||||||
},
|
|
||||||
stdin: "pipe",
|
|
||||||
stdout: "pipe",
|
|
||||||
stderr: "pipe",
|
|
||||||
});
|
|
||||||
|
|
||||||
// read first line from stdout
|
|
||||||
const reader = bridge.stdout.getReader();
|
|
||||||
const decoder = new TextDecoder();
|
|
||||||
|
|
||||||
const { value } = await reader.read();
|
|
||||||
expect(value).toBeDefined();
|
|
||||||
|
|
||||||
const output = decoder.decode(value);
|
|
||||||
const firstLine = output.trim().split("\n")[0];
|
|
||||||
const msg = JSON.parse(firstLine);
|
|
||||||
|
|
||||||
expect(msg.type).toBe("ready");
|
|
||||||
|
|
||||||
bridge.kill();
|
|
||||||
await bridge.exited;
|
|
||||||
});
|
|
||||||
|
|
||||||
test("bridge connects to daemon and disconnects cleanly", async () => {
|
|
||||||
const bridge = spawn({
|
|
||||||
cmd: ["bun", "adapters/vim/bridge.ts"],
|
|
||||||
env: {
|
|
||||||
...process.env,
|
|
||||||
COLLABD_URL: `ws://localhost:${DAEMON_PORT}/ws`,
|
|
||||||
},
|
|
||||||
stdin: "pipe",
|
|
||||||
stdout: "pipe",
|
|
||||||
stderr: "pipe",
|
|
||||||
});
|
|
||||||
|
|
||||||
const reader = bridge.stdout.getReader();
|
|
||||||
const decoder = new TextDecoder();
|
|
||||||
|
|
||||||
// wait for ready
|
|
||||||
let { value } = await reader.read();
|
|
||||||
expect(value).toBeDefined();
|
|
||||||
let output = decoder.decode(value);
|
|
||||||
expect(output).toContain('"type":"ready"');
|
|
||||||
|
|
||||||
// send connect message
|
|
||||||
bridge.stdin.write(
|
|
||||||
`${JSON.stringify({ type: "connect", room: "test" })}\n`,
|
|
||||||
);
|
|
||||||
|
|
||||||
// wait for connected message
|
|
||||||
({ value } = await reader.read());
|
|
||||||
expect(value).toBeDefined();
|
|
||||||
output = decoder.decode(value);
|
|
||||||
expect(output).toContain('"type":"connected"');
|
|
||||||
expect(output).toContain('"room":"test"');
|
|
||||||
|
|
||||||
// send disconnect message
|
|
||||||
bridge.stdin.write(`${JSON.stringify({ type: "disconnect" })}\n`);
|
|
||||||
bridge.stdin.end();
|
|
||||||
|
|
||||||
// bridge should exit cleanly
|
|
||||||
const exitCode = await bridge.exited;
|
|
||||||
expect(exitCode).toBe(0);
|
|
||||||
});
|
|
||||||
|
|
||||||
test("bridge handles content synchronization", async () => {
|
|
||||||
const bridge = spawn({
|
|
||||||
cmd: ["bun", "adapters/vim/bridge.ts"],
|
|
||||||
env: {
|
|
||||||
...process.env,
|
|
||||||
COLLABD_URL: `ws://localhost:${DAEMON_PORT}/ws`,
|
|
||||||
},
|
|
||||||
stdin: "pipe",
|
|
||||||
stdout: "pipe",
|
|
||||||
stderr: "pipe",
|
|
||||||
});
|
|
||||||
|
|
||||||
const reader = bridge.stdout.getReader();
|
|
||||||
|
|
||||||
// wait for ready
|
|
||||||
await reader.read();
|
|
||||||
|
|
||||||
// connect to room
|
|
||||||
bridge.stdin.write(
|
|
||||||
`${JSON.stringify({ type: "connect", room: "content-test" })}\n`,
|
|
||||||
);
|
|
||||||
|
|
||||||
// wait for connected and peers messages
|
|
||||||
await reader.read();
|
|
||||||
await reader.read();
|
|
||||||
|
|
||||||
// send content
|
|
||||||
bridge.stdin.write(
|
|
||||||
`${JSON.stringify({ type: "content", text: "hello world" })}\n`,
|
|
||||||
);
|
|
||||||
|
|
||||||
// give it time to process
|
|
||||||
await new Promise((r) => setTimeout(r, 100));
|
|
||||||
|
|
||||||
// disconnect
|
|
||||||
bridge.stdin.write(`${JSON.stringify({ type: "disconnect" })}\n`);
|
|
||||||
bridge.stdin.end();
|
|
||||||
|
|
||||||
const exitCode = await bridge.exited;
|
|
||||||
expect(exitCode).toBe(0);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
@ -9,22 +9,19 @@ const DAEMON_URL = process.env.COLLABD_URL || "ws://localhost:4040/ws";
|
||||||
let ws: WebSocket | null = null;
|
let ws: WebSocket | null = null;
|
||||||
let doc: Y.Doc | null = null;
|
let doc: Y.Doc | null = null;
|
||||||
let text: Y.Text | null = null;
|
let text: Y.Text | null = null;
|
||||||
let suppressLocal = 0;
|
let suppressLocal = false;
|
||||||
|
|
||||||
function send(msg: object) {
|
function send(msg: object) {
|
||||||
console.log(JSON.stringify(msg));
|
console.log(JSON.stringify(msg));
|
||||||
}
|
}
|
||||||
|
|
||||||
// signal to vim that bridge is ready
|
|
||||||
send({ type: "ready" });
|
|
||||||
|
|
||||||
function connect(roomName: string) {
|
function connect(roomName: string) {
|
||||||
doc = new Y.Doc();
|
doc = new Y.Doc();
|
||||||
text = doc.getText("content");
|
text = doc.getText("content");
|
||||||
|
|
||||||
// when remote changes come in, notify vim
|
// when remote changes come in, notify vim
|
||||||
text.observe(() => {
|
text.observe(() => {
|
||||||
if (suppressLocal === 0) {
|
if (!suppressLocal) {
|
||||||
send({ type: "content", text: text?.toString() || "" });
|
send({ type: "content", text: text?.toString() || "" });
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
@ -42,16 +39,10 @@ function connect(roomName: string) {
|
||||||
case "sync":
|
case "sync":
|
||||||
case "update": {
|
case "update": {
|
||||||
if (!doc) break;
|
if (!doc) break;
|
||||||
try {
|
suppressLocal = true;
|
||||||
suppressLocal++;
|
Y.applyUpdate(doc, new Uint8Array(msg.data));
|
||||||
Y.applyUpdate(doc, new Uint8Array(msg.data));
|
suppressLocal = false;
|
||||||
suppressLocal--;
|
send({ type: "content", text: text?.toString() || "" });
|
||||||
send({ type: "content", text: text?.toString() || "" });
|
|
||||||
} catch (err) {
|
|
||||||
suppressLocal--;
|
|
||||||
console.error("failed to apply yjs update:", err);
|
|
||||||
send({ type: "error", message: "failed to apply remote update" });
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case "peers": {
|
case "peers": {
|
||||||
|
|
@ -70,56 +61,19 @@ function connect(roomName: string) {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// compute minimal edit operations using LCS-based diff
|
|
||||||
function computeDiff(oldText: string, newText: string) {
|
|
||||||
// find common prefix
|
|
||||||
let prefixLen = 0;
|
|
||||||
while (
|
|
||||||
prefixLen < oldText.length &&
|
|
||||||
prefixLen < newText.length &&
|
|
||||||
oldText[prefixLen] === newText[prefixLen]
|
|
||||||
) {
|
|
||||||
prefixLen++;
|
|
||||||
}
|
|
||||||
|
|
||||||
// find common suffix
|
|
||||||
let suffixLen = 0;
|
|
||||||
while (
|
|
||||||
suffixLen < oldText.length - prefixLen &&
|
|
||||||
suffixLen < newText.length - prefixLen &&
|
|
||||||
oldText[oldText.length - 1 - suffixLen] ===
|
|
||||||
newText[newText.length - 1 - suffixLen]
|
|
||||||
) {
|
|
||||||
suffixLen++;
|
|
||||||
}
|
|
||||||
|
|
||||||
const deleteStart = prefixLen;
|
|
||||||
const deleteLen = oldText.length - prefixLen - suffixLen;
|
|
||||||
const insertText = newText.slice(prefixLen, newText.length - suffixLen);
|
|
||||||
|
|
||||||
return { deleteStart, deleteLen, insertText };
|
|
||||||
}
|
|
||||||
|
|
||||||
function setContent(newContent: string) {
|
function setContent(newContent: string) {
|
||||||
if (!doc || !text || !ws) return;
|
if (!doc || !text || !ws) return;
|
||||||
|
|
||||||
const oldContent = text.toString();
|
const oldContent = text.toString();
|
||||||
if (oldContent === newContent) return;
|
if (oldContent === newContent) return;
|
||||||
|
|
||||||
// compute minimal diff and apply
|
// compute diff and apply
|
||||||
const { deleteStart, deleteLen, insertText } = computeDiff(
|
// simple approach: delete all, insert all
|
||||||
oldContent,
|
// TODO: proper diff for efficiency
|
||||||
newContent,
|
|
||||||
);
|
|
||||||
|
|
||||||
const t = text;
|
const t = text;
|
||||||
doc.transact(() => {
|
doc.transact(() => {
|
||||||
if (deleteLen > 0) {
|
t.delete(0, t.length);
|
||||||
t.delete(deleteStart, deleteLen);
|
t.insert(0, newContent);
|
||||||
}
|
|
||||||
if (insertText.length > 0) {
|
|
||||||
t.insert(deleteStart, insertText);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// send update to daemon
|
// send update to daemon
|
||||||
|
|
@ -128,14 +82,11 @@ function setContent(newContent: string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// read json lines from stdin
|
// read json lines from stdin
|
||||||
let buffer = "";
|
|
||||||
const decoder = new TextDecoder();
|
const decoder = new TextDecoder();
|
||||||
for await (const chunk of Bun.stdin.stream()) {
|
for await (const chunk of Bun.stdin.stream()) {
|
||||||
buffer += decoder.decode(chunk);
|
const lines = decoder.decode(chunk).trim().split("\n");
|
||||||
const lines = buffer.split("\n");
|
|
||||||
buffer = lines.pop() ?? ""; // keep incomplete last line
|
|
||||||
for (const line of lines) {
|
for (const line of lines) {
|
||||||
if (!line.trim()) continue;
|
if (!line) continue;
|
||||||
try {
|
try {
|
||||||
const msg = JSON.parse(line);
|
const msg = JSON.parse(line);
|
||||||
switch (msg.type) {
|
switch (msg.type) {
|
||||||
|
|
@ -147,8 +98,7 @@ for await (const chunk of Bun.stdin.stream()) {
|
||||||
break;
|
break;
|
||||||
case "disconnect":
|
case "disconnect":
|
||||||
ws?.close();
|
ws?.close();
|
||||||
send({ type: "disconnected" });
|
break;
|
||||||
process.exit(0);
|
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
send({ type: "error", message: "invalid json" });
|
send({ type: "error", message: "invalid json" });
|
||||||
|
|
|
||||||
|
|
@ -3,10 +3,9 @@ vim9script
|
||||||
# collab.vim - collaborative editing adapter for collabd
|
# collab.vim - collaborative editing adapter for collabd
|
||||||
# requires: bun, collabd daemon running
|
# requires: bun, collabd daemon running
|
||||||
|
|
||||||
var bridge_job: job = null_job
|
var bridge_job: job
|
||||||
var bridge_channel: channel = null_channel
|
var bridge_channel: channel
|
||||||
var connected = false
|
var connected = false
|
||||||
var ready = false
|
|
||||||
var room = ""
|
var room = ""
|
||||||
var suppressing = false
|
var suppressing = false
|
||||||
|
|
||||||
|
|
@ -14,7 +13,7 @@ var suppressing = false
|
||||||
const bridge_script = expand('<sfile>:p:h') .. '/bridge.ts'
|
const bridge_script = expand('<sfile>:p:h') .. '/bridge.ts'
|
||||||
|
|
||||||
def Send(msg: dict<any>)
|
def Send(msg: dict<any>)
|
||||||
if bridge_channel != null_channel
|
if bridge_channel != null
|
||||||
ch_sendraw(bridge_channel, json_encode(msg) .. "\n")
|
ch_sendraw(bridge_channel, json_encode(msg) .. "\n")
|
||||||
endif
|
endif
|
||||||
enddef
|
enddef
|
||||||
|
|
@ -30,9 +29,7 @@ def OnOutput(ch: channel, msg: string)
|
||||||
return
|
return
|
||||||
endtry
|
endtry
|
||||||
|
|
||||||
if data.type == 'ready'
|
if data.type == 'connected'
|
||||||
ready = true
|
|
||||||
elseif data.type == 'connected'
|
|
||||||
connected = true
|
connected = true
|
||||||
echom '[collab] connected to room: ' .. data.room
|
echom '[collab] connected to room: ' .. data.room
|
||||||
elseif data.type == 'disconnected'
|
elseif data.type == 'disconnected'
|
||||||
|
|
@ -70,12 +67,11 @@ def SendBuffer()
|
||||||
enddef
|
enddef
|
||||||
|
|
||||||
export def Connect(room_name: string)
|
export def Connect(room_name: string)
|
||||||
if bridge_job != null_job
|
if bridge_job != null
|
||||||
Disconnect()
|
Disconnect()
|
||||||
endif
|
endif
|
||||||
|
|
||||||
room = room_name
|
room = room_name
|
||||||
ready = false
|
|
||||||
bridge_job = job_start(['bun', 'run', bridge_script], {
|
bridge_job = job_start(['bun', 'run', bridge_script], {
|
||||||
mode: 'nl',
|
mode: 'nl',
|
||||||
out_cb: OnOutput,
|
out_cb: OnOutput,
|
||||||
|
|
@ -83,19 +79,8 @@ export def Connect(room_name: string)
|
||||||
})
|
})
|
||||||
bridge_channel = job_getchannel(bridge_job)
|
bridge_channel = job_getchannel(bridge_job)
|
||||||
|
|
||||||
# wait for bridge ready signal
|
# give it a moment to start
|
||||||
var timeout = 50
|
sleep 100m
|
||||||
while !ready && timeout > 0
|
|
||||||
sleep 10m
|
|
||||||
timeout -= 1
|
|
||||||
endwhile
|
|
||||||
|
|
||||||
if !ready
|
|
||||||
echoerr '[collab] bridge failed to start'
|
|
||||||
Disconnect()
|
|
||||||
return
|
|
||||||
endif
|
|
||||||
|
|
||||||
Send({type: 'connect', room: room_name})
|
Send({type: 'connect', room: room_name})
|
||||||
|
|
||||||
# set up autocmds to send changes
|
# set up autocmds to send changes
|
||||||
|
|
@ -106,14 +91,13 @@ export def Connect(room_name: string)
|
||||||
enddef
|
enddef
|
||||||
|
|
||||||
export def Disconnect()
|
export def Disconnect()
|
||||||
if bridge_job != null_job
|
if bridge_job != null
|
||||||
Send({type: 'disconnect'})
|
Send({type: 'disconnect'})
|
||||||
job_stop(bridge_job)
|
job_stop(bridge_job)
|
||||||
bridge_job = null_job
|
bridge_job = null
|
||||||
bridge_channel = null_channel
|
bridge_channel = null
|
||||||
endif
|
endif
|
||||||
connected = false
|
connected = false
|
||||||
ready = false
|
|
||||||
room = ""
|
room = ""
|
||||||
augroup CollabVim
|
augroup CollabVim
|
||||||
autocmd!
|
autocmd!
|
||||||
|
|
|
||||||
33
src/index.ts
33
src/index.ts
|
|
@ -3,20 +3,12 @@ import { type Client, getOrCreateSession, getSession } from "./session";
|
||||||
|
|
||||||
const PORT = Number(process.env.PORT) || 4040;
|
const PORT = Number(process.env.PORT) || 4040;
|
||||||
|
|
||||||
function isValidRoomName(name: unknown): name is string {
|
|
||||||
if (typeof name !== "string") return false;
|
|
||||||
if (name.length === 0 || name.length > 64) return false;
|
|
||||||
return /^[a-zA-Z0-9_-]+$/.test(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
Bun.serve({
|
Bun.serve({
|
||||||
port: PORT,
|
port: PORT,
|
||||||
fetch(req, server) {
|
fetch(req, server) {
|
||||||
const url = new URL(req.url);
|
const url = new URL(req.url);
|
||||||
if (url.pathname === "/ws") {
|
if (url.pathname === "/ws") {
|
||||||
const upgraded = server.upgrade(req, {
|
const upgraded = server.upgrade(req, { data: { room: null } });
|
||||||
data: { room: null, client: null },
|
|
||||||
});
|
|
||||||
if (!upgraded) {
|
if (!upgraded) {
|
||||||
return new Response("websocket upgrade failed", { status: 400 });
|
return new Response("websocket upgrade failed", { status: 400 });
|
||||||
}
|
}
|
||||||
|
|
@ -25,32 +17,17 @@ Bun.serve({
|
||||||
return new Response("collabd running");
|
return new Response("collabd running");
|
||||||
},
|
},
|
||||||
websocket: {
|
websocket: {
|
||||||
open(ws) {
|
open() {
|
||||||
// create client object once and store in ws.data
|
|
||||||
const client: Client = { ws };
|
|
||||||
ws.data.client = client;
|
|
||||||
console.debug("client connected");
|
console.debug("client connected");
|
||||||
},
|
},
|
||||||
message(ws, raw) {
|
message(ws, raw) {
|
||||||
const msg = decode(raw.toString());
|
const msg = decode(raw.toString());
|
||||||
if (!msg) return;
|
if (!msg) return;
|
||||||
|
|
||||||
// reuse the client object from ws.data
|
const client: Client = { ws };
|
||||||
const client = ws.data.client;
|
|
||||||
if (!client) return;
|
|
||||||
|
|
||||||
switch (msg.type) {
|
switch (msg.type) {
|
||||||
case "join": {
|
case "join": {
|
||||||
if (!isValidRoomName(msg.room)) {
|
|
||||||
try {
|
|
||||||
ws.send(
|
|
||||||
JSON.stringify({ type: "error", message: "invalid room name" }),
|
|
||||||
);
|
|
||||||
} catch (err) {
|
|
||||||
console.debug("failed to send error to client:", err);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
const session = getOrCreateSession(msg.room);
|
const session = getOrCreateSession(msg.room);
|
||||||
ws.data.room = msg.room;
|
ws.data.room = msg.room;
|
||||||
session.join(client);
|
session.join(client);
|
||||||
|
|
@ -75,9 +52,9 @@ Bun.serve({
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
close(ws) {
|
close(ws) {
|
||||||
if (ws.data.room && ws.data.client) {
|
if (ws.data.room) {
|
||||||
const session = getSession(ws.data.room);
|
const session = getSession(ws.data.room);
|
||||||
session?.leave(ws.data.client);
|
session?.leave({ ws });
|
||||||
}
|
}
|
||||||
console.debug("client disconnected");
|
console.debug("client disconnected");
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -1,338 +0,0 @@
|
||||||
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
|
|
||||||
import type { ServerMessage } from "./protocol";
|
|
||||||
import { decode } from "./protocol";
|
|
||||||
import { getOrCreateSession, getSession, removeSession } from "./session";
|
|
||||||
|
|
||||||
describe("WebSocket concurrent edits integration", () => {
|
|
||||||
let server: ReturnType<typeof Bun.serve>;
|
|
||||||
const PORT = 4041; // use different port for tests
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
// start server for each test
|
|
||||||
server = Bun.serve({
|
|
||||||
port: PORT,
|
|
||||||
fetch(req, server) {
|
|
||||||
const url = new URL(req.url);
|
|
||||||
if (url.pathname === "/ws") {
|
|
||||||
const upgraded = server.upgrade(req, {
|
|
||||||
data: { room: null, client: null },
|
|
||||||
});
|
|
||||||
if (!upgraded) {
|
|
||||||
return new Response("websocket upgrade failed", { status: 400 });
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
return new Response("collabd test server");
|
|
||||||
},
|
|
||||||
websocket: {
|
|
||||||
open(ws) {
|
|
||||||
const client = { ws };
|
|
||||||
ws.data.client = client;
|
|
||||||
},
|
|
||||||
message(ws, raw) {
|
|
||||||
const msg = decode(raw.toString());
|
|
||||||
if (!msg) return;
|
|
||||||
|
|
||||||
const client = ws.data.client;
|
|
||||||
if (!client) return;
|
|
||||||
|
|
||||||
switch (msg.type) {
|
|
||||||
case "join": {
|
|
||||||
const session = getOrCreateSession(msg.room);
|
|
||||||
ws.data.room = msg.room;
|
|
||||||
session.join(client);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case "leave": {
|
|
||||||
if (ws.data.room) {
|
|
||||||
const session = getSession(ws.data.room);
|
|
||||||
session?.leave(client);
|
|
||||||
removeSession(ws.data.room);
|
|
||||||
ws.data.room = null;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case "update": {
|
|
||||||
if (ws.data.room) {
|
|
||||||
const session = getSession(ws.data.room);
|
|
||||||
session?.applyUpdate(new Uint8Array(msg.data), client);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
close(ws) {
|
|
||||||
if (ws.data.room && ws.data.client) {
|
|
||||||
const session = getSession(ws.data.room);
|
|
||||||
session?.leave(ws.data.client);
|
|
||||||
removeSession(ws.data.room);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
},
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(() => {
|
|
||||||
server.stop();
|
|
||||||
});
|
|
||||||
|
|
||||||
test("two clients concurrent edits converge to same state", async () => {
|
|
||||||
const Y = await import("yjs");
|
|
||||||
|
|
||||||
// create two clients with their own yjs docs
|
|
||||||
const doc1 = new Y.Doc();
|
|
||||||
const text1 = doc1.getText("content");
|
|
||||||
|
|
||||||
const doc2 = new Y.Doc();
|
|
||||||
const text2 = doc2.getText("content");
|
|
||||||
|
|
||||||
// track messages received by each client
|
|
||||||
const client1Messages: ServerMessage[] = [];
|
|
||||||
const client2Messages: ServerMessage[] = [];
|
|
||||||
|
|
||||||
let ws1Ready = false;
|
|
||||||
let ws2Ready = false;
|
|
||||||
|
|
||||||
const ws1 = new WebSocket(`ws://localhost:${PORT}/ws`);
|
|
||||||
const ws2 = new WebSocket(`ws://localhost:${PORT}/ws`);
|
|
||||||
|
|
||||||
// setup all handlers immediately
|
|
||||||
ws1.onopen = () => {
|
|
||||||
ws1.send(JSON.stringify({ type: "join", room: "test-room" }));
|
|
||||||
};
|
|
||||||
|
|
||||||
ws1.onmessage = (ev) => {
|
|
||||||
const msg = JSON.parse(ev.data);
|
|
||||||
client1Messages.push(msg);
|
|
||||||
if (msg.type === "sync") {
|
|
||||||
ws1Ready = true;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
ws2.onopen = () => {
|
|
||||||
ws2.send(JSON.stringify({ type: "join", room: "test-room" }));
|
|
||||||
};
|
|
||||||
|
|
||||||
ws2.onmessage = (ev) => {
|
|
||||||
const msg = JSON.parse(ev.data);
|
|
||||||
client2Messages.push(msg);
|
|
||||||
if (msg.type === "sync") {
|
|
||||||
ws2Ready = true;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// wait for both to be ready
|
|
||||||
await new Promise<void>((resolve, reject) => {
|
|
||||||
const timeout = setTimeout(() => {
|
|
||||||
reject(new Error("clients join timeout"));
|
|
||||||
}, 2000);
|
|
||||||
|
|
||||||
const interval = setInterval(() => {
|
|
||||||
if (ws1Ready && ws2Ready) {
|
|
||||||
clearInterval(interval);
|
|
||||||
clearTimeout(timeout);
|
|
||||||
resolve();
|
|
||||||
}
|
|
||||||
}, 10);
|
|
||||||
});
|
|
||||||
|
|
||||||
// wait a bit for peers message
|
|
||||||
await new Promise((r) => setTimeout(r, 50));
|
|
||||||
|
|
||||||
// client 1 inserts "hello" at position 0
|
|
||||||
text1.insert(0, "hello");
|
|
||||||
const update1 = Y.encodeStateAsUpdate(doc1);
|
|
||||||
ws1.send(JSON.stringify({ type: "update", data: Array.from(update1) }));
|
|
||||||
|
|
||||||
// client 2 inserts "world" at position 0 (concurrent edit)
|
|
||||||
text2.insert(0, "world");
|
|
||||||
const update2 = Y.encodeStateAsUpdate(doc2);
|
|
||||||
ws2.send(JSON.stringify({ type: "update", data: Array.from(update2) }));
|
|
||||||
|
|
||||||
// wait for updates to propagate
|
|
||||||
await new Promise((r) => setTimeout(r, 200));
|
|
||||||
|
|
||||||
// apply received updates to both clients
|
|
||||||
for (const msg of client1Messages) {
|
|
||||||
if (msg.type === "update" || msg.type === "sync") {
|
|
||||||
Y.applyUpdate(doc1, new Uint8Array(msg.data));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const msg of client2Messages) {
|
|
||||||
if (msg.type === "update" || msg.type === "sync") {
|
|
||||||
Y.applyUpdate(doc2, new Uint8Array(msg.data));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// both clients should have the same final content
|
|
||||||
const final1 = text1.toString();
|
|
||||||
const final2 = text2.toString();
|
|
||||||
|
|
||||||
expect(final1).toBe(final2);
|
|
||||||
|
|
||||||
// both words should be present
|
|
||||||
expect(final1).toContain("hello");
|
|
||||||
expect(final1).toContain("world");
|
|
||||||
|
|
||||||
// total length should be sum of both inserts
|
|
||||||
expect(final1.length).toBe(10); // "hello" + "world"
|
|
||||||
|
|
||||||
ws1.close();
|
|
||||||
ws2.close();
|
|
||||||
});
|
|
||||||
|
|
||||||
test("three clients with sequential edits converge", async () => {
|
|
||||||
const Y = await import("yjs");
|
|
||||||
|
|
||||||
const doc1 = new Y.Doc();
|
|
||||||
const text1 = doc1.getText("content");
|
|
||||||
|
|
||||||
const doc2 = new Y.Doc();
|
|
||||||
const text2 = doc2.getText("content");
|
|
||||||
|
|
||||||
const doc3 = new Y.Doc();
|
|
||||||
const text3 = doc3.getText("content");
|
|
||||||
|
|
||||||
const client1Messages: ServerMessage[] = [];
|
|
||||||
const client2Messages: ServerMessage[] = [];
|
|
||||||
const client3Messages: ServerMessage[] = [];
|
|
||||||
|
|
||||||
let ws1Ready = false;
|
|
||||||
let ws2Ready = false;
|
|
||||||
let ws3Ready = false;
|
|
||||||
|
|
||||||
const ws1 = new WebSocket(`ws://localhost:${PORT}/ws`);
|
|
||||||
const ws2 = new WebSocket(`ws://localhost:${PORT}/ws`);
|
|
||||||
const ws3 = new WebSocket(`ws://localhost:${PORT}/ws`);
|
|
||||||
|
|
||||||
// setup all handlers immediately
|
|
||||||
ws1.onopen = () => {
|
|
||||||
ws1.send(JSON.stringify({ type: "join", room: "multi-room" }));
|
|
||||||
};
|
|
||||||
|
|
||||||
ws1.onmessage = (ev) => {
|
|
||||||
const msg = JSON.parse(ev.data);
|
|
||||||
client1Messages.push(msg);
|
|
||||||
if (msg.type === "sync") {
|
|
||||||
ws1Ready = true;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
ws2.onopen = () => {
|
|
||||||
ws2.send(JSON.stringify({ type: "join", room: "multi-room" }));
|
|
||||||
};
|
|
||||||
|
|
||||||
ws2.onmessage = (ev) => {
|
|
||||||
const msg = JSON.parse(ev.data);
|
|
||||||
client2Messages.push(msg);
|
|
||||||
if (msg.type === "sync") {
|
|
||||||
ws2Ready = true;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
ws3.onopen = () => {
|
|
||||||
ws3.send(JSON.stringify({ type: "join", room: "multi-room" }));
|
|
||||||
};
|
|
||||||
|
|
||||||
ws3.onmessage = (ev) => {
|
|
||||||
const msg = JSON.parse(ev.data);
|
|
||||||
client3Messages.push(msg);
|
|
||||||
if (msg.type === "sync") {
|
|
||||||
ws3Ready = true;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// wait for all three to be ready
|
|
||||||
await new Promise<void>((resolve, reject) => {
|
|
||||||
const timeout = setTimeout(() => {
|
|
||||||
reject(new Error("clients join timeout"));
|
|
||||||
}, 2000);
|
|
||||||
|
|
||||||
const interval = setInterval(() => {
|
|
||||||
if (ws1Ready && ws2Ready && ws3Ready) {
|
|
||||||
clearInterval(interval);
|
|
||||||
clearTimeout(timeout);
|
|
||||||
resolve();
|
|
||||||
}
|
|
||||||
}, 10);
|
|
||||||
});
|
|
||||||
|
|
||||||
await new Promise((r) => setTimeout(r, 50));
|
|
||||||
|
|
||||||
// client 1: insert "a"
|
|
||||||
text1.insert(0, "a");
|
|
||||||
ws1.send(
|
|
||||||
JSON.stringify({
|
|
||||||
type: "update",
|
|
||||||
data: Array.from(Y.encodeStateAsUpdate(doc1)),
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
|
|
||||||
await new Promise((r) => setTimeout(r, 30));
|
|
||||||
|
|
||||||
// client 2: insert "b"
|
|
||||||
// first apply any updates received
|
|
||||||
for (const msg of client2Messages) {
|
|
||||||
if (msg.type === "update" || msg.type === "sync") {
|
|
||||||
Y.applyUpdate(doc2, new Uint8Array(msg.data));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
text2.insert(text2.length, "b");
|
|
||||||
ws2.send(
|
|
||||||
JSON.stringify({
|
|
||||||
type: "update",
|
|
||||||
data: Array.from(Y.encodeStateAsUpdate(doc2)),
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
|
|
||||||
await new Promise((r) => setTimeout(r, 30));
|
|
||||||
|
|
||||||
// client 3: insert "c"
|
|
||||||
for (const msg of client3Messages) {
|
|
||||||
if (msg.type === "update" || msg.type === "sync") {
|
|
||||||
Y.applyUpdate(doc3, new Uint8Array(msg.data));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
text3.insert(text3.length, "c");
|
|
||||||
ws3.send(
|
|
||||||
JSON.stringify({
|
|
||||||
type: "update",
|
|
||||||
data: Array.from(Y.encodeStateAsUpdate(doc3)),
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
|
|
||||||
await new Promise((r) => setTimeout(r, 200));
|
|
||||||
|
|
||||||
// apply all updates
|
|
||||||
for (const msg of client1Messages) {
|
|
||||||
if (msg.type === "update" || msg.type === "sync") {
|
|
||||||
Y.applyUpdate(doc1, new Uint8Array(msg.data));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const msg of client2Messages) {
|
|
||||||
if (msg.type === "update" || msg.type === "sync") {
|
|
||||||
Y.applyUpdate(doc2, new Uint8Array(msg.data));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const msg of client3Messages) {
|
|
||||||
if (msg.type === "update" || msg.type === "sync") {
|
|
||||||
Y.applyUpdate(doc3, new Uint8Array(msg.data));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const final1 = text1.toString();
|
|
||||||
const final2 = text2.toString();
|
|
||||||
const final3 = text3.toString();
|
|
||||||
|
|
||||||
expect(final1).toBe(final2);
|
|
||||||
expect(final2).toBe(final3);
|
|
||||||
expect(final1).toBe("abc");
|
|
||||||
|
|
||||||
ws1.close();
|
|
||||||
ws2.close();
|
|
||||||
ws3.close();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
@ -10,48 +10,15 @@ export type ServerMessage =
|
||||||
| { type: "sync"; data: number[] } // full yjs state
|
| { type: "sync"; data: number[] } // full yjs state
|
||||||
| { type: "update"; data: number[] }
|
| { type: "update"; data: number[] }
|
||||||
| { type: "awareness"; data: number[] }
|
| { type: "awareness"; data: number[] }
|
||||||
| { type: "peers"; count: number }
|
| { type: "peers"; count: number };
|
||||||
| { type: "error"; message: string };
|
|
||||||
|
|
||||||
export type BridgeMessage =
|
|
||||||
| { type: "ready" }
|
|
||||||
| { type: "connected"; room: string }
|
|
||||||
| { type: "disconnected" }
|
|
||||||
| { type: "content"; text: string }
|
|
||||||
| { type: "peers"; count: number }
|
|
||||||
| { type: "error"; message: string };
|
|
||||||
|
|
||||||
export function encode(msg: ServerMessage): string {
|
export function encode(msg: ServerMessage): string {
|
||||||
return JSON.stringify(msg);
|
return JSON.stringify(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
function isClientMessage(obj: unknown): obj is ClientMessage {
|
|
||||||
if (typeof obj !== "object" || obj === null) return false;
|
|
||||||
const msg = obj as Record<string, unknown>;
|
|
||||||
|
|
||||||
switch (msg.type) {
|
|
||||||
case "join":
|
|
||||||
return typeof msg.room === "string";
|
|
||||||
case "leave":
|
|
||||||
return true;
|
|
||||||
case "update":
|
|
||||||
return (
|
|
||||||
Array.isArray(msg.data) && msg.data.every((n) => typeof n === "number")
|
|
||||||
);
|
|
||||||
case "awareness":
|
|
||||||
return (
|
|
||||||
Array.isArray(msg.data) && msg.data.every((n) => typeof n === "number")
|
|
||||||
);
|
|
||||||
default:
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export function decode(raw: string): ClientMessage | null {
|
export function decode(raw: string): ClientMessage | null {
|
||||||
try {
|
try {
|
||||||
const parsed = JSON.parse(raw);
|
return JSON.parse(raw) as ClientMessage;
|
||||||
if (!isClientMessage(parsed)) return null;
|
|
||||||
return parsed;
|
|
||||||
} catch {
|
} catch {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,52 +18,21 @@ export class Session {
|
||||||
this.clients.add(client);
|
this.clients.add(client);
|
||||||
// send full state to new client
|
// send full state to new client
|
||||||
const state = Y.encodeStateAsUpdate(this.doc);
|
const state = Y.encodeStateAsUpdate(this.doc);
|
||||||
try {
|
client.ws.send(encode({ type: "sync", data: Array.from(state) }));
|
||||||
client.ws.send(encode({ type: "sync", data: Array.from(state) }));
|
|
||||||
} catch (err) {
|
|
||||||
console.debug("failed to send sync to client, removing:", err);
|
|
||||||
this.clients.delete(client);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this.broadcastPeerCount();
|
this.broadcastPeerCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
leave(client: Client) {
|
leave(client: Client) {
|
||||||
this.clients.delete(client);
|
this.clients.delete(client);
|
||||||
this.broadcastPeerCount();
|
this.broadcastPeerCount();
|
||||||
if (this.isEmpty()) {
|
|
||||||
sessions.delete(this.name);
|
|
||||||
console.debug(`session removed: ${this.name}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
isEmpty(): boolean {
|
|
||||||
return this.clients.size === 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
applyUpdate(update: Uint8Array, from: Client) {
|
applyUpdate(update: Uint8Array, from: Client) {
|
||||||
try {
|
Y.applyUpdate(this.doc, update);
|
||||||
Y.applyUpdate(this.doc, update);
|
// broadcast to others
|
||||||
// broadcast to others
|
for (const client of this.clients) {
|
||||||
for (const client of this.clients) {
|
if (client !== from) {
|
||||||
if (client !== from) {
|
client.ws.send(encode({ type: "update", data: Array.from(update) }));
|
||||||
try {
|
|
||||||
client.ws.send(
|
|
||||||
encode({ type: "update", data: Array.from(update) }),
|
|
||||||
);
|
|
||||||
} catch (err) {
|
|
||||||
console.debug("failed to send update to client, removing:", err);
|
|
||||||
this.clients.delete(client);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
console.error(`failed to apply update in session ${this.name}:`, err);
|
|
||||||
// optionally notify the client that sent the bad update
|
|
||||||
try {
|
|
||||||
from.ws.send(encode({ type: "error", message: "invalid update" }));
|
|
||||||
} catch {
|
|
||||||
// ignore send errors
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -71,12 +40,7 @@ export class Session {
|
||||||
broadcastPeerCount() {
|
broadcastPeerCount() {
|
||||||
const msg = encode({ type: "peers", count: this.clients.size });
|
const msg = encode({ type: "peers", count: this.clients.size });
|
||||||
for (const client of this.clients) {
|
for (const client of this.clients) {
|
||||||
try {
|
client.ws.send(msg);
|
||||||
client.ws.send(msg);
|
|
||||||
} catch (err) {
|
|
||||||
console.debug("failed to send peer count to client, removing:", err);
|
|
||||||
this.clients.delete(client);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -97,11 +61,3 @@ export function getOrCreateSession(name: string): Session {
|
||||||
export function getSession(name: string): Session | undefined {
|
export function getSession(name: string): Session | undefined {
|
||||||
return sessions.get(name);
|
return sessions.get(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
export function removeSession(name: string): void {
|
|
||||||
const session = sessions.get(name);
|
|
||||||
if (session?.isEmpty()) {
|
|
||||||
sessions.delete(name);
|
|
||||||
console.debug(`session removed: ${name}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue