Compare commits
No commits in common. "5d4b14460461d5ed81acbb548fdd4511c4ab8501" and "100cd678239e1942c831ba48abef558332b5f216" have entirely different histories.
5d4b144604
...
100cd67823
15 changed files with 43 additions and 1504 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -32,5 +32,3 @@ report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json
|
||||||
|
|
||||||
# Finder (MacOS) folder config
|
# Finder (MacOS) folder config
|
||||||
.DS_Store
|
.DS_Store
|
||||||
|
|
||||||
*.db
|
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ working:
|
||||||
not yet:
|
not yet:
|
||||||
- cursor/selection sync (awareness protocol stubbed but unused)
|
- cursor/selection sync (awareness protocol stubbed but unused)
|
||||||
- other editor adapters (helix, kakoune, zed)
|
- other editor adapters (helix, kakoune, zed)
|
||||||
|
- persistence (rooms are ephemeral)
|
||||||
- reconnection handling
|
- reconnection handling
|
||||||
|
|
||||||
## stack
|
## stack
|
||||||
|
|
@ -28,7 +29,6 @@ not yet:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
just dev # daemon on :4040
|
just dev # daemon on :4040
|
||||||
COLLABD_DB=path.db just dev # custom db path (default: collabd.db)
|
|
||||||
bun test # unit tests
|
bun test # unit tests
|
||||||
just check # biome lint
|
just check # biome lint
|
||||||
```
|
```
|
||||||
|
|
@ -66,7 +66,6 @@ server -> client:
|
||||||
- src/index.ts - websocket server, room routing
|
- src/index.ts - websocket server, room routing
|
||||||
- src/session.ts - yjs doc per room, peer management
|
- src/session.ts - yjs doc per room, peer management
|
||||||
- src/protocol.ts - message types
|
- src/protocol.ts - message types
|
||||||
- src/db.ts - sqlite persistence, save/load crdt updates
|
|
||||||
- adapters/vim/bridge.ts - bun process vim spawns
|
- adapters/vim/bridge.ts - bun process vim spawns
|
||||||
- adapters/vim/collab.vim - vim9script plugin
|
- adapters/vim/collab.vim - vim9script plugin
|
||||||
|
|
||||||
|
|
@ -96,6 +95,7 @@ known gaps (not production ready):
|
||||||
- room names guessable via brute force
|
- room names guessable via brute force
|
||||||
- no encryption (deploy behind wss, not ws)
|
- no encryption (deploy behind wss, not ws)
|
||||||
- no audit logging
|
- no audit logging
|
||||||
|
- no persistence (data lost on daemon restart)
|
||||||
|
|
||||||
before production:
|
before production:
|
||||||
1. auth layer (jwt tokens or unix socket for local-only)
|
1. auth layer (jwt tokens or unix socket for local-only)
|
||||||
|
|
|
||||||
|
|
@ -6,15 +6,11 @@ describe("Bridge lifecycle", () => {
|
||||||
const DAEMON_PORT = 4042;
|
const DAEMON_PORT = 4042;
|
||||||
|
|
||||||
beforeEach(async () => {
|
beforeEach(async () => {
|
||||||
const { initDb } = await import("../../src/db");
|
|
||||||
const { decode } = await import("../../src/protocol");
|
const { decode } = await import("../../src/protocol");
|
||||||
const { getOrCreateSession, getSession, removeSession } = await import(
|
const { getOrCreateSession, getSession, removeSession } = await import(
|
||||||
"../../src/session"
|
"../../src/session"
|
||||||
);
|
);
|
||||||
|
|
||||||
// Initialize database
|
|
||||||
initDb(":memory:");
|
|
||||||
|
|
||||||
// start daemon for bridge to connect to
|
// start daemon for bridge to connect to
|
||||||
daemon = Bun.serve({
|
daemon = Bun.serve({
|
||||||
port: DAEMON_PORT,
|
port: DAEMON_PORT,
|
||||||
|
|
@ -79,10 +75,8 @@ describe("Bridge lifecycle", () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
afterEach(async () => {
|
afterEach(() => {
|
||||||
daemon.stop();
|
daemon.stop();
|
||||||
const { close: closeDb } = await import("../../src/db");
|
|
||||||
closeDb();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
test("bridge starts and signals ready", async () => {
|
test("bridge starts and signals ready", async () => {
|
||||||
|
|
@ -198,79 +192,3 @@ describe("Bridge lifecycle", () => {
|
||||||
expect(exitCode).toBe(0);
|
expect(exitCode).toBe(0);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("awareness", () => {
|
|
||||||
test("forwards awareness from daemon to vim", async () => {
|
|
||||||
// Start daemon
|
|
||||||
const server = Bun.serve({
|
|
||||||
port: 4043,
|
|
||||||
fetch(req, server) {
|
|
||||||
if (new URL(req.url).pathname === "/ws") {
|
|
||||||
server.upgrade(req);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
return new Response("not found", { status: 404 });
|
|
||||||
},
|
|
||||||
websocket: {
|
|
||||||
open(ws) {
|
|
||||||
// Simulate sending awareness after join
|
|
||||||
setTimeout(() => {
|
|
||||||
ws.send(
|
|
||||||
JSON.stringify({
|
|
||||||
type: "awareness",
|
|
||||||
data: {
|
|
||||||
clientId: 99,
|
|
||||||
cursor: { line: 3, col: 7 },
|
|
||||||
name: "peer",
|
|
||||||
},
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
}, 100);
|
|
||||||
},
|
|
||||||
message() {},
|
|
||||||
close() {},
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
const output: string[] = [];
|
|
||||||
const bridge = spawn({
|
|
||||||
cmd: ["bun", "adapters/vim/bridge.ts"],
|
|
||||||
env: { ...process.env, COLLABD_URL: "ws://localhost:4043/ws" },
|
|
||||||
stdin: "pipe",
|
|
||||||
stdout: "pipe",
|
|
||||||
});
|
|
||||||
|
|
||||||
const reader = bridge.stdout.getReader();
|
|
||||||
const decoder = new TextDecoder();
|
|
||||||
|
|
||||||
// Collect output
|
|
||||||
(async () => {
|
|
||||||
while (true) {
|
|
||||||
const { done, value } = await reader.read();
|
|
||||||
if (done) break;
|
|
||||||
output.push(decoder.decode(value));
|
|
||||||
}
|
|
||||||
})();
|
|
||||||
|
|
||||||
await new Promise((r) => setTimeout(r, 50));
|
|
||||||
bridge.stdin.write(
|
|
||||||
`${JSON.stringify({ type: "connect", room: "test" })}\n`,
|
|
||||||
);
|
|
||||||
await new Promise((r) => setTimeout(r, 200));
|
|
||||||
|
|
||||||
const awarenessMsg = output
|
|
||||||
.join("")
|
|
||||||
.split("\n")
|
|
||||||
.filter(Boolean)
|
|
||||||
.map((l) => JSON.parse(l))
|
|
||||||
.find((m) => m.type === "cursor");
|
|
||||||
|
|
||||||
expect(awarenessMsg).toBeDefined();
|
|
||||||
expect(awarenessMsg.data.line).toBe(3);
|
|
||||||
expect(awarenessMsg.data.col).toBe(7);
|
|
||||||
expect(awarenessMsg.data.name).toBe("peer");
|
|
||||||
|
|
||||||
bridge.kill();
|
|
||||||
server.stop();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,6 @@ let ws: WebSocket | null = null;
|
||||||
let doc: Y.Doc | null = null;
|
let doc: Y.Doc | null = null;
|
||||||
let text: Y.Text | null = null;
|
let text: Y.Text | null = null;
|
||||||
let suppressLocal = 0;
|
let suppressLocal = 0;
|
||||||
let clientId: number | null = null;
|
|
||||||
|
|
||||||
function send(msg: object) {
|
function send(msg: object) {
|
||||||
console.log(JSON.stringify(msg));
|
console.log(JSON.stringify(msg));
|
||||||
|
|
@ -33,7 +32,6 @@ function connect(roomName: string) {
|
||||||
ws = new WebSocket(DAEMON_URL);
|
ws = new WebSocket(DAEMON_URL);
|
||||||
|
|
||||||
ws.onopen = () => {
|
ws.onopen = () => {
|
||||||
clientId = Math.floor(Math.random() * 1000000);
|
|
||||||
ws?.send(JSON.stringify({ type: "join", room: roomName }));
|
ws?.send(JSON.stringify({ type: "join", room: roomName }));
|
||||||
send({ type: "connected", room: roomName });
|
send({ type: "connected", room: roomName });
|
||||||
};
|
};
|
||||||
|
|
@ -60,21 +58,6 @@ function connect(roomName: string) {
|
||||||
send({ type: "peers", count: msg.count });
|
send({ type: "peers", count: msg.count });
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case "awareness": {
|
|
||||||
// Forward cursor info to vim
|
|
||||||
if (msg.data?.cursor) {
|
|
||||||
send({
|
|
||||||
type: "cursor",
|
|
||||||
data: {
|
|
||||||
clientId: msg.data.clientId,
|
|
||||||
line: msg.data.cursor.line,
|
|
||||||
col: msg.data.cursor.col,
|
|
||||||
name: msg.data.name || `peer-${msg.data.clientId}`,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -162,24 +145,6 @@ for await (const chunk of Bun.stdin.stream()) {
|
||||||
case "content":
|
case "content":
|
||||||
setContent(msg.text);
|
setContent(msg.text);
|
||||||
break;
|
break;
|
||||||
case "cursor":
|
|
||||||
if (
|
|
||||||
ws &&
|
|
||||||
clientId !== null &&
|
|
||||||
msg.line !== undefined &&
|
|
||||||
msg.col !== undefined
|
|
||||||
) {
|
|
||||||
ws.send(
|
|
||||||
JSON.stringify({
|
|
||||||
type: "awareness",
|
|
||||||
data: {
|
|
||||||
clientId,
|
|
||||||
cursor: { line: msg.line, col: msg.col },
|
|
||||||
},
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case "disconnect":
|
case "disconnect":
|
||||||
ws?.close();
|
ws?.close();
|
||||||
send({ type: "disconnected" });
|
send({ type: "disconnected" });
|
||||||
|
|
|
||||||
|
|
@ -1,39 +1,26 @@
|
||||||
if !has('vim9script') || v:version < 900
|
|
||||||
finish
|
|
||||||
endif
|
|
||||||
|
|
||||||
vim9script
|
vim9script
|
||||||
|
|
||||||
# collab.vim - collaborative editing adapter for collabd
|
# collab.vim - collaborative editing adapter for collabd
|
||||||
# requires: bun, collabd daemon running
|
# requires: bun, collabd daemon running
|
||||||
|
|
||||||
hlset([{
|
|
||||||
name: 'PeerCursor',
|
|
||||||
ctermbg: 'yellow',
|
|
||||||
ctermfg: 'black',
|
|
||||||
guibg: 'yellow',
|
|
||||||
guifg: 'black'
|
|
||||||
}])
|
|
||||||
|
|
||||||
var bridge_job: job = null_job
|
var bridge_job: job = null_job
|
||||||
var bridge_channel: channel = null_channel
|
var bridge_channel: channel = null_channel
|
||||||
var connected = false
|
var connected = false
|
||||||
var ready = false
|
var ready = false
|
||||||
var room = ""
|
var room = ""
|
||||||
var suppressing = false
|
var suppressing = false
|
||||||
var peer_match_ids: dict<number> = {}
|
|
||||||
|
|
||||||
# path to bridge script (adjust as needed)
|
# path to bridge script (adjust as needed)
|
||||||
const bridge_script = expand('<sfile>:p:h') .. '/bridge.ts'
|
const bridge_script = expand('<sfile>:p:h') .. '/bridge.ts'
|
||||||
|
|
||||||
def Send(msg: dict<any>): void
|
def Send(msg: dict<any>)
|
||||||
if bridge_channel != null_channel
|
if bridge_channel != null_channel
|
||||||
ch_sendraw(bridge_channel, json_encode(msg) .. "\n")
|
ch_sendraw(bridge_channel, json_encode(msg) .. "\n")
|
||||||
endif
|
endif
|
||||||
enddef
|
enddef
|
||||||
|
|
||||||
def OnOutput(ch: channel, msg: string): void
|
def OnOutput(ch: channel, msg: string)
|
||||||
if msg->empty()
|
if empty(msg)
|
||||||
return
|
return
|
||||||
endif
|
endif
|
||||||
var data: dict<any>
|
var data: dict<any>
|
||||||
|
|
@ -47,27 +34,25 @@ def OnOutput(ch: channel, msg: string): void
|
||||||
ready = true
|
ready = true
|
||||||
elseif data.type == 'connected'
|
elseif data.type == 'connected'
|
||||||
connected = true
|
connected = true
|
||||||
echom $'[collab] connected to room: {data.room}'
|
echom '[collab] connected to room: ' .. data.room
|
||||||
elseif data.type == 'disconnected'
|
elseif data.type == 'disconnected'
|
||||||
connected = false
|
connected = false
|
||||||
echom '[collab] disconnected'
|
echom '[collab] disconnected'
|
||||||
elseif data.type == 'content'
|
elseif data.type == 'content'
|
||||||
ApplyRemoteContent(data.text)
|
ApplyRemoteContent(data.text)
|
||||||
elseif data.type == 'peers'
|
elseif data.type == 'peers'
|
||||||
echom $'[collab] peers: {data.count}'
|
echom '[collab] peers: ' .. data.count
|
||||||
elseif data.type == 'cursor'
|
|
||||||
ShowPeerCursor(data.data)
|
|
||||||
elseif data.type == 'error'
|
elseif data.type == 'error'
|
||||||
echoerr '[collab] ' .. data.message
|
echoerr '[collab] ' .. data.message
|
||||||
endif
|
endif
|
||||||
enddef
|
enddef
|
||||||
|
|
||||||
def ApplyRemoteContent(content: string): void
|
def ApplyRemoteContent(content: string)
|
||||||
if suppressing
|
if suppressing
|
||||||
return
|
return
|
||||||
endif
|
endif
|
||||||
suppressing = true
|
suppressing = true
|
||||||
var lines = content->split("\n", true)
|
var lines = split(content, "\n", true)
|
||||||
var view = winsaveview()
|
var view = winsaveview()
|
||||||
silent! :%delete _
|
silent! :%delete _
|
||||||
setline(1, lines)
|
setline(1, lines)
|
||||||
|
|
@ -75,49 +60,16 @@ def ApplyRemoteContent(content: string): void
|
||||||
suppressing = false
|
suppressing = false
|
||||||
enddef
|
enddef
|
||||||
|
|
||||||
def SendBuffer(): void
|
def SendBuffer()
|
||||||
if !connected || suppressing
|
if !connected || suppressing
|
||||||
return
|
return
|
||||||
endif
|
endif
|
||||||
var lines = getline(1, '$')
|
var lines = getline(1, '$')
|
||||||
var content = lines->join("\n")
|
var content = join(lines, "\n")
|
||||||
Send({type: 'content', text: content})
|
Send({type: 'content', text: content})
|
||||||
enddef
|
enddef
|
||||||
|
|
||||||
def SendCursor(): void
|
export def Connect(room_name: string)
|
||||||
if !connected
|
|
||||||
return
|
|
||||||
endif
|
|
||||||
const pos = getpos('.')
|
|
||||||
# pos is [bufnum, line, col, off] - line/col are 1-indexed
|
|
||||||
Send({type: 'cursor', line: pos[1] - 1, col: pos[2] - 1})
|
|
||||||
enddef
|
|
||||||
|
|
||||||
def ShowPeerCursor(data: dict<any>): void
|
|
||||||
const client_id = string(data.clientId)
|
|
||||||
|
|
||||||
# Clear previous highlight for this peer
|
|
||||||
if has_key(peer_match_ids, client_id)
|
|
||||||
silent! matchdelete(peer_match_ids[client_id])
|
|
||||||
endif
|
|
||||||
|
|
||||||
# Highlight the cursor position (line, col are 0-indexed from bridge)
|
|
||||||
const line_nr = data.line + 1
|
|
||||||
const col_nr = data.col + 1
|
|
||||||
|
|
||||||
# Create a 1-char highlight at cursor position
|
|
||||||
const pattern = $'\%{line_nr}l\%{col_nr}c.'
|
|
||||||
peer_match_ids[client_id] = matchadd('PeerCursor', pattern, 10)
|
|
||||||
enddef
|
|
||||||
|
|
||||||
def ClearPeerCursors(): void
|
|
||||||
for id in values(peer_match_ids)
|
|
||||||
silent! matchdelete(id)
|
|
||||||
endfor
|
|
||||||
peer_match_ids = {}
|
|
||||||
enddef
|
|
||||||
|
|
||||||
export def Connect(room_name: string): void
|
|
||||||
if bridge_job != null_job
|
if bridge_job != null_job
|
||||||
Disconnect()
|
Disconnect()
|
||||||
endif
|
endif
|
||||||
|
|
@ -147,40 +99,31 @@ export def Connect(room_name: string): void
|
||||||
Send({type: 'connect', room: room_name})
|
Send({type: 'connect', room: room_name})
|
||||||
|
|
||||||
# set up autocmds to send changes
|
# set up autocmds to send changes
|
||||||
autocmd_add([
|
augroup CollabVim
|
||||||
{
|
autocmd!
|
||||||
group: 'CollabVim',
|
autocmd TextChanged,TextChangedI * call SendBuffer()
|
||||||
event: ['TextChanged', 'TextChangedI'],
|
augroup END
|
||||||
pattern: '*',
|
|
||||||
cmd: 'SendBuffer()'
|
|
||||||
},
|
|
||||||
{
|
|
||||||
group: 'CollabVim',
|
|
||||||
event: ['CursorMoved', 'CursorMovedI'],
|
|
||||||
bufnr: bufnr(),
|
|
||||||
cmd: 'SendCursor()'
|
|
||||||
}
|
|
||||||
])
|
|
||||||
enddef
|
enddef
|
||||||
|
|
||||||
export def Disconnect(): void
|
export def Disconnect()
|
||||||
if bridge_job != null_job
|
if bridge_job != null_job
|
||||||
Send({type: 'disconnect'})
|
Send({type: 'disconnect'})
|
||||||
job_stop(bridge_job)
|
job_stop(bridge_job)
|
||||||
bridge_job = null_job
|
bridge_job = null_job
|
||||||
bridge_channel = null_channel
|
bridge_channel = null_channel
|
||||||
endif
|
endif
|
||||||
ClearPeerCursors()
|
|
||||||
connected = false
|
connected = false
|
||||||
ready = false
|
ready = false
|
||||||
room = ""
|
room = ""
|
||||||
autocmd_delete([{group: 'CollabVim'}])
|
augroup CollabVim
|
||||||
|
autocmd!
|
||||||
|
augroup END
|
||||||
echom '[collab] disconnected'
|
echom '[collab] disconnected'
|
||||||
enddef
|
enddef
|
||||||
|
|
||||||
export def Status(): void
|
export def Status()
|
||||||
if connected
|
if connected
|
||||||
echom $'[collab] connected to room: {room}'
|
echom '[collab] connected to room: ' .. room
|
||||||
else
|
else
|
||||||
echom '[collab] not connected'
|
echom '[collab] not connected'
|
||||||
endif
|
endif
|
||||||
|
|
|
||||||
|
|
@ -1,982 +0,0 @@
|
||||||
# Cursor Sync, Persistence & Adapter Guide
|
|
||||||
|
|
||||||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
|
||||||
|
|
||||||
**Goal:** Add cursor/selection awareness, persistent rooms via bun:sqlite, and document how to write editor adapters.
|
|
||||||
|
|
||||||
**Architecture:** Yjs awareness protocol for cursors (ephemeral state, separate from document CRDT). SQLite stores yjs update log per room - replay on daemon restart. Adapter guide documents the vim pattern for other editors.
|
|
||||||
|
|
||||||
**Tech Stack:** yjs awareness, bun:sqlite, vim9script
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Task 1: Cursor Sync - Protocol & Session
|
|
||||||
|
|
||||||
**Files:**
|
|
||||||
- Modify: `src/protocol.ts:7,12` (awareness types exist, need payload shape)
|
|
||||||
- Modify: `src/session.ts` (add awareness state tracking)
|
|
||||||
- Create: `src/session.test.ts` (add awareness tests)
|
|
||||||
|
|
||||||
**Step 1: Define awareness payload type in protocol**
|
|
||||||
|
|
||||||
In `src/protocol.ts`, the awareness message exists but payload is just `number[]`. Add a typed structure:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
// After line 14, add:
|
|
||||||
export type AwarenessState = {
|
|
||||||
clientId: number;
|
|
||||||
cursor?: { line: number; col: number };
|
|
||||||
selection?: { startLine: number; startCol: number; endLine: number; endCol: number };
|
|
||||||
name?: string;
|
|
||||||
};
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 2: Write failing test for awareness broadcast**
|
|
||||||
|
|
||||||
Add to `src/session.test.ts`:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
import { describe, test, expect, mock } from "bun:test";
|
|
||||||
|
|
||||||
describe("awareness", () => {
|
|
||||||
test("broadcasts awareness to other clients", () => {
|
|
||||||
const session = new Session();
|
|
||||||
const sent1: unknown[] = [];
|
|
||||||
const sent2: unknown[] = [];
|
|
||||||
|
|
||||||
const client1 = { ws: { send: (m: string) => sent1.push(JSON.parse(m)) } } as Client;
|
|
||||||
const client2 = { ws: { send: (m: string) => sent2.push(JSON.parse(m)) } } as Client;
|
|
||||||
|
|
||||||
session.join(client1);
|
|
||||||
session.join(client2);
|
|
||||||
|
|
||||||
const awareness = { clientId: 1, cursor: { line: 5, col: 10 } };
|
|
||||||
session.broadcastAwareness(client1, awareness);
|
|
||||||
|
|
||||||
// client1 should NOT receive their own awareness
|
|
||||||
expect(sent1.filter(m => m.type === "awareness")).toHaveLength(0);
|
|
||||||
// client2 should receive it
|
|
||||||
expect(sent2.filter(m => m.type === "awareness")).toHaveLength(1);
|
|
||||||
expect(sent2.find(m => m.type === "awareness")?.data).toEqual(awareness);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 3: Run test to verify it fails**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
bun test src/session.test.ts
|
|
||||||
```
|
|
||||||
|
|
||||||
Expected: FAIL - `broadcastAwareness` is not a function
|
|
||||||
|
|
||||||
**Step 4: Implement broadcastAwareness in session.ts**
|
|
||||||
|
|
||||||
Add after `broadcastPeerCount()`:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
broadcastAwareness(sender: Client, state: AwarenessState): void {
|
|
||||||
const message = encode({ type: "awareness", data: state });
|
|
||||||
for (const client of this.clients) {
|
|
||||||
if (client !== sender) {
|
|
||||||
try {
|
|
||||||
client.ws.send(message);
|
|
||||||
} catch {
|
|
||||||
// client disconnected, ignore
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
Update imports at top:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
import { encode, decode, type AwarenessState } from "./protocol";
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 5: Run test to verify it passes**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
bun test src/session.test.ts
|
|
||||||
```
|
|
||||||
|
|
||||||
Expected: PASS
|
|
||||||
|
|
||||||
**Step 6: Commit**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
git add src/protocol.ts src/session.ts src/session.test.ts
|
|
||||||
git commit -m "Add awareness broadcast to session"
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Task 2: Cursor Sync - Daemon Routing
|
|
||||||
|
|
||||||
**Files:**
|
|
||||||
- Modify: `src/index.ts:68-73` (add awareness case)
|
|
||||||
- Create: `src/index.test.ts` (integration test for awareness routing)
|
|
||||||
|
|
||||||
**Step 1: Write failing integration test**
|
|
||||||
|
|
||||||
Create `src/awareness.test.ts`:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
import { describe, test, expect, beforeAll, afterAll } from "bun:test";
|
|
||||||
import type { Server } from "bun";
|
|
||||||
|
|
||||||
describe("awareness routing", () => {
|
|
||||||
let server: Server;
|
|
||||||
const PORT = 4042;
|
|
||||||
|
|
||||||
beforeAll(async () => {
|
|
||||||
// Import and start server on test port
|
|
||||||
process.env.PORT = String(PORT);
|
|
||||||
const mod = await import("./index");
|
|
||||||
server = mod.server;
|
|
||||||
});
|
|
||||||
|
|
||||||
afterAll(() => {
|
|
||||||
server?.stop();
|
|
||||||
});
|
|
||||||
|
|
||||||
test("awareness message routes to other peers in same room", async () => {
|
|
||||||
const ws1 = new WebSocket(`ws://localhost:${PORT}/ws`);
|
|
||||||
const ws2 = new WebSocket(`ws://localhost:${PORT}/ws`);
|
|
||||||
|
|
||||||
const received: unknown[] = [];
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
new Promise(r => ws1.onopen = r),
|
|
||||||
new Promise(r => ws2.onopen = r),
|
|
||||||
]);
|
|
||||||
|
|
||||||
ws2.onmessage = (e) => {
|
|
||||||
received.push(JSON.parse(e.data));
|
|
||||||
};
|
|
||||||
|
|
||||||
// Both join same room
|
|
||||||
ws1.send(JSON.stringify({ type: "join", room: "test" }));
|
|
||||||
ws2.send(JSON.stringify({ type: "join", room: "test" }));
|
|
||||||
|
|
||||||
await Bun.sleep(50);
|
|
||||||
|
|
||||||
// ws1 sends awareness
|
|
||||||
ws1.send(JSON.stringify({
|
|
||||||
type: "awareness",
|
|
||||||
data: { clientId: 1, cursor: { line: 10, col: 5 } }
|
|
||||||
}));
|
|
||||||
|
|
||||||
await Bun.sleep(50);
|
|
||||||
|
|
||||||
const awareness = received.find(m => m.type === "awareness");
|
|
||||||
expect(awareness).toBeDefined();
|
|
||||||
expect(awareness.data.cursor).toEqual({ line: 10, col: 5 });
|
|
||||||
|
|
||||||
ws1.close();
|
|
||||||
ws2.close();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 2: Run test to verify it fails**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
bun test src/awareness.test.ts
|
|
||||||
```
|
|
||||||
|
|
||||||
Expected: FAIL - awareness not received (no routing in index.ts yet)
|
|
||||||
|
|
||||||
**Step 3: Add awareness routing to index.ts**
|
|
||||||
|
|
||||||
In `src/index.ts`, the message handler switch (around line 43), add after the `update` case:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
case "awareness":
|
|
||||||
if (client.room) {
|
|
||||||
const session = getSession(client.room);
|
|
||||||
if (session && "data" in msg) {
|
|
||||||
session.broadcastAwareness(client, msg.data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
```
|
|
||||||
|
|
||||||
Also update `isClientMessage` in protocol.ts to validate awareness data shape:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
// In isClientMessage(), add validation for awareness:
|
|
||||||
if (msg.type === "awareness") {
|
|
||||||
return typeof msg.data === "object" && msg.data !== null;
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 4: Run test to verify it passes**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
bun test src/awareness.test.ts
|
|
||||||
```
|
|
||||||
|
|
||||||
Expected: PASS
|
|
||||||
|
|
||||||
**Step 5: Commit**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
git add src/index.ts src/protocol.ts src/awareness.test.ts
|
|
||||||
git commit -m "Route awareness messages between peers"
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Task 3: Cursor Sync - Bridge
|
|
||||||
|
|
||||||
**Files:**
|
|
||||||
- Modify: `adapters/vim/bridge.ts` (send/receive awareness)
|
|
||||||
- Modify: `adapters/vim/bridge.test.ts` (test awareness flow)
|
|
||||||
|
|
||||||
**Step 1: Write failing test for awareness in bridge**
|
|
||||||
|
|
||||||
Add to `adapters/vim/bridge.test.ts`:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
describe("awareness", () => {
|
|
||||||
test("forwards awareness from daemon to vim", async () => {
|
|
||||||
// Start daemon
|
|
||||||
const server = Bun.serve({
|
|
||||||
port: 4043,
|
|
||||||
fetch(req, server) {
|
|
||||||
if (new URL(req.url).pathname === "/ws") {
|
|
||||||
server.upgrade(req);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
return new Response("not found", { status: 404 });
|
|
||||||
},
|
|
||||||
websocket: {
|
|
||||||
open(ws) {
|
|
||||||
// Simulate sending awareness after join
|
|
||||||
setTimeout(() => {
|
|
||||||
ws.send(JSON.stringify({
|
|
||||||
type: "awareness",
|
|
||||||
data: { clientId: 99, cursor: { line: 3, col: 7 }, name: "peer" }
|
|
||||||
}));
|
|
||||||
}, 100);
|
|
||||||
},
|
|
||||||
message() {},
|
|
||||||
close() {},
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
const output: string[] = [];
|
|
||||||
const bridge = Bun.spawn(["bun", "run", "adapters/vim/bridge.ts"], {
|
|
||||||
env: { ...process.env, DAEMON_URL: "ws://localhost:4043/ws" },
|
|
||||||
stdin: "pipe",
|
|
||||||
stdout: "pipe",
|
|
||||||
});
|
|
||||||
|
|
||||||
const reader = bridge.stdout.getReader();
|
|
||||||
const decoder = new TextDecoder();
|
|
||||||
|
|
||||||
// Collect output
|
|
||||||
(async () => {
|
|
||||||
while (true) {
|
|
||||||
const { done, value } = await reader.read();
|
|
||||||
if (done) break;
|
|
||||||
output.push(decoder.decode(value));
|
|
||||||
}
|
|
||||||
})();
|
|
||||||
|
|
||||||
await Bun.sleep(50);
|
|
||||||
bridge.stdin.write(JSON.stringify({ type: "connect", room: "test" }) + "\n");
|
|
||||||
await Bun.sleep(200);
|
|
||||||
|
|
||||||
const awarenessMsg = output.join("").split("\n")
|
|
||||||
.filter(Boolean)
|
|
||||||
.map(l => JSON.parse(l))
|
|
||||||
.find(m => m.type === "cursor");
|
|
||||||
|
|
||||||
expect(awarenessMsg).toBeDefined();
|
|
||||||
expect(awarenessMsg.data.line).toBe(3);
|
|
||||||
expect(awarenessMsg.data.col).toBe(7);
|
|
||||||
expect(awarenessMsg.data.name).toBe("peer");
|
|
||||||
|
|
||||||
bridge.kill();
|
|
||||||
server.stop();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 2: Run test to verify it fails**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
bun test adapters/vim/bridge.test.ts
|
|
||||||
```
|
|
||||||
|
|
||||||
Expected: FAIL - no cursor message received
|
|
||||||
|
|
||||||
**Step 3: Handle awareness in bridge.ts**
|
|
||||||
|
|
||||||
In `adapters/vim/bridge.ts`, in the `ws.onmessage` handler, add a case for awareness:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
case "awareness":
|
|
||||||
// Forward cursor info to vim
|
|
||||||
if (parsed.data?.cursor) {
|
|
||||||
send({
|
|
||||||
type: "cursor",
|
|
||||||
data: {
|
|
||||||
clientId: parsed.data.clientId,
|
|
||||||
line: parsed.data.cursor.line,
|
|
||||||
col: parsed.data.cursor.col,
|
|
||||||
name: parsed.data.name || `peer-${parsed.data.clientId}`,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 4: Run test to verify it passes**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
bun test adapters/vim/bridge.test.ts
|
|
||||||
```
|
|
||||||
|
|
||||||
Expected: PASS
|
|
||||||
|
|
||||||
**Step 5: Commit**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
git add adapters/vim/bridge.ts adapters/vim/bridge.test.ts
|
|
||||||
git commit -m "Forward awareness/cursor to vim from bridge"
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Task 4: Cursor Sync - Vim Plugin
|
|
||||||
|
|
||||||
**Files:**
|
|
||||||
- Modify: `adapters/vim/collab.vim` (receive cursor, display highlight)
|
|
||||||
|
|
||||||
**Step 1: Add cursor handling to vim plugin**
|
|
||||||
|
|
||||||
In `adapters/vim/collab.vim`, in `OnOutput()` function, add a case for cursor messages:
|
|
||||||
|
|
||||||
```vim
|
|
||||||
elseif msg.type ==# 'cursor'
|
|
||||||
call s:ShowPeerCursor(msg.data)
|
|
||||||
```
|
|
||||||
|
|
||||||
Add the display function:
|
|
||||||
|
|
||||||
```vim
|
|
||||||
var peer_match_ids: dict<number> = {}
|
|
||||||
|
|
||||||
def ShowPeerCursor(data: dict<any>)
|
|
||||||
const client_id = string(data.clientId)
|
|
||||||
|
|
||||||
# Clear previous highlight for this peer
|
|
||||||
if has_key(peer_match_ids, client_id)
|
|
||||||
silent! matchdelete(peer_match_ids[client_id])
|
|
||||||
endif
|
|
||||||
|
|
||||||
# Highlight the cursor position (line, col are 0-indexed from bridge)
|
|
||||||
const line_nr = data.line + 1
|
|
||||||
const col_nr = data.col + 1
|
|
||||||
|
|
||||||
# Create a 1-char highlight at cursor position
|
|
||||||
const pattern = '\%' .. line_nr .. 'l\%' .. col_nr .. 'c.'
|
|
||||||
peer_match_ids[client_id] = matchadd('PeerCursor', pattern, 10)
|
|
||||||
enddef
|
|
||||||
|
|
||||||
def ClearPeerCursors()
|
|
||||||
for id in values(peer_match_ids)
|
|
||||||
silent! matchdelete(id)
|
|
||||||
endfor
|
|
||||||
peer_match_ids = {}
|
|
||||||
enddef
|
|
||||||
```
|
|
||||||
|
|
||||||
Add highlight group setup at top of file:
|
|
||||||
|
|
||||||
```vim
|
|
||||||
highlight PeerCursor ctermbg=yellow guibg=yellow ctermfg=black guifg=black
|
|
||||||
```
|
|
||||||
|
|
||||||
Call `ClearPeerCursors()` in `Disconnect()`.
|
|
||||||
|
|
||||||
**Step 2: Manual test**
|
|
||||||
|
|
||||||
No automated test for vim highlight rendering. Test manually:
|
|
||||||
|
|
||||||
1. Open two vim instances
|
|
||||||
2. `:CollabJoin testroom` in both
|
|
||||||
3. Move cursor in one - should see yellow highlight in other
|
|
||||||
|
|
||||||
**Step 3: Commit**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
git add adapters/vim/collab.vim
|
|
||||||
git commit -m "Display peer cursors in vim with yellow highlight"
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Task 5: Cursor Sync - Send Local Cursor
|
|
||||||
|
|
||||||
**Files:**
|
|
||||||
- Modify: `adapters/vim/collab.vim` (send cursor on CursorMoved)
|
|
||||||
- Modify: `adapters/vim/bridge.ts` (forward cursor to daemon)
|
|
||||||
|
|
||||||
**Step 1: Send cursor position from vim**
|
|
||||||
|
|
||||||
In `adapters/vim/collab.vim`, add autocmd in `Connect()`:
|
|
||||||
|
|
||||||
```vim
|
|
||||||
autocmd CursorMoved,CursorMovedI <buffer> call s:SendCursor()
|
|
||||||
```
|
|
||||||
|
|
||||||
Add the send function:
|
|
||||||
|
|
||||||
```vim
|
|
||||||
def SendCursor()
|
|
||||||
if !connected
|
|
||||||
return
|
|
||||||
endif
|
|
||||||
const pos = getpos('.')
|
|
||||||
# pos is [bufnum, line, col, off] - line/col are 1-indexed
|
|
||||||
Send({type: 'cursor', line: pos[1] - 1, col: pos[2] - 1})
|
|
||||||
enddef
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 2: Forward cursor from bridge to daemon**
|
|
||||||
|
|
||||||
In `adapters/vim/bridge.ts`, in the stdin message handler, add:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
case "cursor":
|
|
||||||
if (ws && msg.line !== undefined && msg.col !== undefined) {
|
|
||||||
ws.send(JSON.stringify({
|
|
||||||
type: "awareness",
|
|
||||||
data: {
|
|
||||||
clientId: Math.floor(Math.random() * 1000000), // TODO: stable client id
|
|
||||||
cursor: { line: msg.line, col: msg.col },
|
|
||||||
},
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 3: Manual test**
|
|
||||||
|
|
||||||
1. Two vim instances, same room
|
|
||||||
2. Move cursor in one - yellow highlight appears in other
|
|
||||||
3. Move cursor in other - highlight updates
|
|
||||||
|
|
||||||
**Step 4: Commit**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
git add adapters/vim/collab.vim adapters/vim/bridge.ts
|
|
||||||
git commit -m "Send local cursor position to peers"
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Task 6: Persistence - Schema & Setup
|
|
||||||
|
|
||||||
**Files:**
|
|
||||||
- Create: `src/db.ts` (database module)
|
|
||||||
- Create: `src/db.test.ts` (database tests)
|
|
||||||
|
|
||||||
**Step 1: Write failing test for db module**
|
|
||||||
|
|
||||||
Create `src/db.test.ts`:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
import { describe, test, expect, beforeEach, afterEach } from "bun:test";
|
|
||||||
import { unlinkSync } from "fs";
|
|
||||||
import { initDb, saveUpdate, getUpdates, close } from "./db";
|
|
||||||
|
|
||||||
describe("persistence", () => {
|
|
||||||
const TEST_DB = ":memory:";
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
initDb(TEST_DB);
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(() => {
|
|
||||||
close();
|
|
||||||
});
|
|
||||||
|
|
||||||
test("saves and retrieves updates for a room", () => {
|
|
||||||
const room = "testroom";
|
|
||||||
const update1 = new Uint8Array([1, 2, 3]);
|
|
||||||
const update2 = new Uint8Array([4, 5, 6]);
|
|
||||||
|
|
||||||
saveUpdate(room, update1);
|
|
||||||
saveUpdate(room, update2);
|
|
||||||
|
|
||||||
const updates = getUpdates(room);
|
|
||||||
expect(updates).toHaveLength(2);
|
|
||||||
expect(updates[0]).toEqual(update1);
|
|
||||||
expect(updates[1]).toEqual(update2);
|
|
||||||
});
|
|
||||||
|
|
||||||
test("returns empty array for unknown room", () => {
|
|
||||||
const updates = getUpdates("nonexistent");
|
|
||||||
expect(updates).toEqual([]);
|
|
||||||
});
|
|
||||||
|
|
||||||
test("rooms are isolated", () => {
|
|
||||||
saveUpdate("room1", new Uint8Array([1]));
|
|
||||||
saveUpdate("room2", new Uint8Array([2]));
|
|
||||||
|
|
||||||
expect(getUpdates("room1")).toHaveLength(1);
|
|
||||||
expect(getUpdates("room2")).toHaveLength(1);
|
|
||||||
expect(getUpdates("room1")[0]).toEqual(new Uint8Array([1]));
|
|
||||||
});
|
|
||||||
});
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 2: Run test to verify it fails**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
bun test src/db.test.ts
|
|
||||||
```
|
|
||||||
|
|
||||||
Expected: FAIL - module not found
|
|
||||||
|
|
||||||
**Step 3: Implement db.ts with bun:sqlite**
|
|
||||||
|
|
||||||
Create `src/db.ts`:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
import { Database } from "bun:sqlite";
|
|
||||||
|
|
||||||
let db: Database | null = null;
|
|
||||||
|
|
||||||
export function initDb(path: string = "collabd.db"): void {
|
|
||||||
db = new Database(path);
|
|
||||||
db.run(`
|
|
||||||
CREATE TABLE IF NOT EXISTS updates (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
room TEXT NOT NULL,
|
|
||||||
data BLOB NOT NULL,
|
|
||||||
created_at INTEGER DEFAULT (unixepoch())
|
|
||||||
)
|
|
||||||
`);
|
|
||||||
db.run(`CREATE INDEX IF NOT EXISTS idx_room ON updates(room)`);
|
|
||||||
}
|
|
||||||
|
|
||||||
export function saveUpdate(room: string, data: Uint8Array): void {
|
|
||||||
if (!db) throw new Error("Database not initialized");
|
|
||||||
db.run("INSERT INTO updates (room, data) VALUES (?, ?)", [room, data]);
|
|
||||||
}
|
|
||||||
|
|
||||||
export function getUpdates(room: string): Uint8Array[] {
|
|
||||||
if (!db) throw new Error("Database not initialized");
|
|
||||||
const rows = db.query("SELECT data FROM updates WHERE room = ? ORDER BY id").all(room) as { data: Uint8Array }[];
|
|
||||||
return rows.map(r => new Uint8Array(r.data));
|
|
||||||
}
|
|
||||||
|
|
||||||
export function close(): void {
|
|
||||||
db?.close();
|
|
||||||
db = null;
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 4: Run test to verify it passes**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
bun test src/db.test.ts
|
|
||||||
```
|
|
||||||
|
|
||||||
Expected: PASS
|
|
||||||
|
|
||||||
**Step 5: Commit**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
git add src/db.ts src/db.test.ts
|
|
||||||
git commit -m "Add bun:sqlite persistence layer"
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Task 7: Persistence - Hook Into Session
|
|
||||||
|
|
||||||
**Files:**
|
|
||||||
- Modify: `src/session.ts` (save updates, load on create)
|
|
||||||
- Modify: `src/session.test.ts` (test persistence integration)
|
|
||||||
|
|
||||||
**Step 1: Write failing test for session persistence**
|
|
||||||
|
|
||||||
Add to `src/session.test.ts`:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
import { initDb, getUpdates, close as closeDb } from "./db";
|
|
||||||
|
|
||||||
describe("session persistence", () => {
|
|
||||||
beforeEach(() => {
|
|
||||||
initDb(":memory:");
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(() => {
|
|
||||||
closeDb();
|
|
||||||
});
|
|
||||||
|
|
||||||
test("saves updates to database", () => {
|
|
||||||
const session = getOrCreateSession("persist-test");
|
|
||||||
const client = mockClient();
|
|
||||||
session.join(client);
|
|
||||||
|
|
||||||
// Create a yjs update
|
|
||||||
const doc = new Y.Doc();
|
|
||||||
const text = doc.getText("content");
|
|
||||||
text.insert(0, "hello");
|
|
||||||
const update = Y.encodeStateAsUpdate(doc);
|
|
||||||
|
|
||||||
session.applyUpdate(client, Array.from(update));
|
|
||||||
|
|
||||||
// Check database has the update
|
|
||||||
const saved = getUpdates("persist-test");
|
|
||||||
expect(saved.length).toBeGreaterThan(0);
|
|
||||||
});
|
|
||||||
|
|
||||||
test("loads existing updates on session create", () => {
|
|
||||||
// First session creates content
|
|
||||||
const session1 = getOrCreateSession("reload-test");
|
|
||||||
const client1 = mockClient();
|
|
||||||
session1.join(client1);
|
|
||||||
|
|
||||||
const doc = new Y.Doc();
|
|
||||||
const text = doc.getText("content");
|
|
||||||
text.insert(0, "persisted");
|
|
||||||
const update = Y.encodeStateAsUpdate(doc);
|
|
||||||
session1.applyUpdate(client1, Array.from(update));
|
|
||||||
|
|
||||||
// Clear in-memory sessions
|
|
||||||
removeSession("reload-test");
|
|
||||||
|
|
||||||
// New session should load from db
|
|
||||||
const session2 = getOrCreateSession("reload-test");
|
|
||||||
const content = session2.doc.getText("content").toString();
|
|
||||||
expect(content).toBe("persisted");
|
|
||||||
});
|
|
||||||
});
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 2: Run test to verify it fails**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
bun test src/session.test.ts
|
|
||||||
```
|
|
||||||
|
|
||||||
Expected: FAIL - updates not persisted
|
|
||||||
|
|
||||||
**Step 3: Modify session.ts to persist updates**
|
|
||||||
|
|
||||||
In `src/session.ts`:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
import { saveUpdate, getUpdates } from "./db";
|
|
||||||
|
|
||||||
// In getOrCreateSession():
|
|
||||||
export function getOrCreateSession(room: string): Session {
|
|
||||||
let session = sessions.get(room);
|
|
||||||
if (!session) {
|
|
||||||
session = new Session();
|
|
||||||
// Load persisted updates
|
|
||||||
const updates = getUpdates(room);
|
|
||||||
for (const update of updates) {
|
|
||||||
Y.applyUpdate(session.doc, update);
|
|
||||||
}
|
|
||||||
sessions.set(room, session);
|
|
||||||
}
|
|
||||||
return session;
|
|
||||||
}
|
|
||||||
|
|
||||||
// In Session.applyUpdate(), after Y.applyUpdate():
|
|
||||||
applyUpdate(sender: Client, data: number[]): void {
|
|
||||||
const update = new Uint8Array(data);
|
|
||||||
Y.applyUpdate(this.doc, update);
|
|
||||||
|
|
||||||
// Persist the update (need room name - add to session)
|
|
||||||
if (this.room) {
|
|
||||||
saveUpdate(this.room, update);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ... rest of broadcast logic
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
Add room tracking to Session:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
class Session {
|
|
||||||
doc: Y.Doc;
|
|
||||||
clients: Set<Client>;
|
|
||||||
room?: string; // Add this
|
|
||||||
|
|
||||||
// Update getOrCreateSession to set it:
|
|
||||||
session.room = room;
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 4: Run test to verify it passes**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
bun test src/session.test.ts
|
|
||||||
```
|
|
||||||
|
|
||||||
Expected: PASS
|
|
||||||
|
|
||||||
**Step 5: Commit**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
git add src/session.ts src/session.test.ts
|
|
||||||
git commit -m "Persist yjs updates to sqlite, reload on session create"
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Task 8: Persistence - Init on Daemon Start
|
|
||||||
|
|
||||||
**Files:**
|
|
||||||
- Modify: `src/index.ts` (init db on startup)
|
|
||||||
|
|
||||||
**Step 1: Add db initialization**
|
|
||||||
|
|
||||||
In `src/index.ts`, at the top after imports:
|
|
||||||
|
|
||||||
```typescript
|
|
||||||
import { initDb } from "./db";
|
|
||||||
|
|
||||||
// Initialize database
|
|
||||||
const DB_PATH = process.env.COLLABD_DB || "collabd.db";
|
|
||||||
initDb(DB_PATH);
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 2: Manual test**
|
|
||||||
|
|
||||||
1. Start daemon: `just dev`
|
|
||||||
2. Join room, make edits
|
|
||||||
3. Kill daemon (Ctrl+C)
|
|
||||||
4. Restart daemon
|
|
||||||
5. Join same room - content should be there
|
|
||||||
|
|
||||||
**Step 3: Commit**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
git add src/index.ts
|
|
||||||
git commit -m "Initialize sqlite database on daemon startup"
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Task 9: Write Adapter Guide
|
|
||||||
|
|
||||||
**Files:**
|
|
||||||
- Create: `docs/ADAPTERS.md`
|
|
||||||
|
|
||||||
**Step 1: Write the adapter guide**
|
|
||||||
|
|
||||||
Create `docs/ADAPTERS.md`:
|
|
||||||
|
|
||||||
```markdown
|
|
||||||
# Writing Editor Adapters for collabd
|
|
||||||
|
|
||||||
This guide explains how to write an adapter that lets any editor participate
|
|
||||||
in collaborative editing sessions.
|
|
||||||
|
|
||||||
## Architecture
|
|
||||||
|
|
||||||
```
|
|
||||||
┌─────────────┐ json/channel ┌─────────────┐ websocket ┌─────────────┐
|
|
||||||
│ Editor │ ◄──────────────────► │ Bridge │ ◄────────────────► │ Daemon │
|
|
||||||
│ (plugin) │ │ (bun) │ │ :4040 │
|
|
||||||
└─────────────┘ └─────────────┘ └─────────────┘
|
|
||||||
```
|
|
||||||
|
|
||||||
The **bridge** is a Bun process that:
|
|
||||||
- Manages the Yjs document (CRDT state)
|
|
||||||
- Translates editor buffer changes to Yjs operations
|
|
||||||
- Translates remote Yjs updates to buffer content
|
|
||||||
- Speaks a simple JSON protocol with the editor plugin
|
|
||||||
|
|
||||||
The **plugin** is editor-specific code that:
|
|
||||||
- Hooks buffer change events
|
|
||||||
- Sends buffer content to bridge
|
|
||||||
- Applies remote content from bridge
|
|
||||||
- Optionally displays peer cursors
|
|
||||||
|
|
||||||
## Why a Bridge?
|
|
||||||
|
|
||||||
Most editors can't embed Yjs directly:
|
|
||||||
- Vim: No npm/node, limited async
|
|
||||||
- Helix: Rust-only plugins
|
|
||||||
- Kakoune: Shell-based scripting
|
|
||||||
|
|
||||||
The bridge isolates CRDT complexity. Plugins stay simple.
|
|
||||||
|
|
||||||
## Protocol: Plugin ↔ Bridge
|
|
||||||
|
|
||||||
Messages are newline-delimited JSON.
|
|
||||||
|
|
||||||
### Plugin → Bridge
|
|
||||||
|
|
||||||
```json
|
|
||||||
{"type": "connect", "room": "myroom"}
|
|
||||||
{"type": "disconnect"}
|
|
||||||
{"type": "content", "text": "full buffer contents"}
|
|
||||||
{"type": "cursor", "line": 5, "col": 10}
|
|
||||||
```
|
|
||||||
|
|
||||||
### Bridge → Plugin
|
|
||||||
|
|
||||||
```json
|
|
||||||
{"type": "ready"}
|
|
||||||
{"type": "connected", "room": "myroom"}
|
|
||||||
{"type": "disconnected"}
|
|
||||||
{"type": "content", "text": "full buffer contents"}
|
|
||||||
{"type": "peers", "count": 2}
|
|
||||||
{"type": "cursor", "data": {"clientId": 123, "line": 5, "col": 10, "name": "peer-123"}}
|
|
||||||
{"type": "error", "message": "connection failed"}
|
|
||||||
```
|
|
||||||
|
|
||||||
## Implementing a Plugin
|
|
||||||
|
|
||||||
### 1. Spawn the Bridge
|
|
||||||
|
|
||||||
```
|
|
||||||
bun run adapters/vim/bridge.ts
|
|
||||||
```
|
|
||||||
|
|
||||||
Or use your own bridge (see next section).
|
|
||||||
|
|
||||||
Communication via stdin/stdout with JSON lines.
|
|
||||||
|
|
||||||
### 2. Wait for Ready
|
|
||||||
|
|
||||||
Bridge sends `{"type": "ready"}` when it starts. Then send connect:
|
|
||||||
|
|
||||||
```json
|
|
||||||
{"type": "connect", "room": "myroom"}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 3. Handle Content Updates
|
|
||||||
|
|
||||||
When you receive `{"type": "content", "text": "..."}`:
|
|
||||||
|
|
||||||
1. Save cursor position
|
|
||||||
2. Replace buffer contents with received text
|
|
||||||
3. Restore cursor position (clamped to valid range)
|
|
||||||
|
|
||||||
**Critical:** Do NOT trigger your change handler when applying remote content,
|
|
||||||
or you'll create an infinite loop.
|
|
||||||
|
|
||||||
### 4. Send Local Changes
|
|
||||||
|
|
||||||
When user edits the buffer:
|
|
||||||
|
|
||||||
1. Get full buffer text
|
|
||||||
2. Send `{"type": "content", "text": "full text"}`
|
|
||||||
|
|
||||||
The bridge handles diffing - just send the whole buffer.
|
|
||||||
|
|
||||||
### 5. Cursor Sync (Optional)
|
|
||||||
|
|
||||||
On cursor move:
|
|
||||||
```json
|
|
||||||
{"type": "cursor", "line": 5, "col": 10}
|
|
||||||
```
|
|
||||||
|
|
||||||
On receiving cursor:
|
|
||||||
```json
|
|
||||||
{"type": "cursor", "data": {"line": 5, "col": 10, "name": "peer"}}
|
|
||||||
```
|
|
||||||
|
|
||||||
Render a highlight/virtual text at that position.
|
|
||||||
|
|
||||||
## Implementing a Custom Bridge
|
|
||||||
|
|
||||||
If you need a bridge in a different language (e.g., Rust for Helix),
|
|
||||||
implement the same protocol. Key responsibilities:
|
|
||||||
|
|
||||||
1. **Connect to daemon** at `ws://localhost:4040/ws`
|
|
||||||
|
|
||||||
2. **Manage Y.Doc** - create on connect, apply updates
|
|
||||||
|
|
||||||
3. **Diff buffer changes** - when receiving content from plugin:
|
|
||||||
```
|
|
||||||
old_text = current Y.Text content
|
|
||||||
new_text = received buffer content
|
|
||||||
diff = compute_diff(old, new)
|
|
||||||
apply diff as Y.Text operations
|
|
||||||
```
|
|
||||||
|
|
||||||
4. **Forward updates** - when Y.Doc changes from remote:
|
|
||||||
```
|
|
||||||
send full text to plugin as {"type": "content", ...}
|
|
||||||
```
|
|
||||||
|
|
||||||
5. **Handle awareness** - forward cursor positions both ways
|
|
||||||
|
|
||||||
### Yjs Libraries by Language
|
|
||||||
|
|
||||||
- **JavaScript/TypeScript:** `yjs` (npm)
|
|
||||||
- **Rust:** `yrs` (crates.io) - Yjs port
|
|
||||||
- **Python:** `pycrdt` (pypi)
|
|
||||||
- **Go:** `yjs-go` (experimental)
|
|
||||||
|
|
||||||
## Reference: Vim Adapter
|
|
||||||
|
|
||||||
See `adapters/vim/` for a complete example:
|
|
||||||
|
|
||||||
- `collab.vim` - 135 lines of Vim9script
|
|
||||||
- `bridge.ts` - 160 lines of TypeScript
|
|
||||||
|
|
||||||
The vim plugin:
|
|
||||||
1. Spawns bridge as job
|
|
||||||
2. Communicates via channels (vim's async IPC)
|
|
||||||
3. Sets TextChanged autocmd to detect edits
|
|
||||||
4. Applies remote content with `:delete` + `setline()`
|
|
||||||
|
|
||||||
## Testing Your Adapter
|
|
||||||
|
|
||||||
1. Start daemon: `just dev`
|
|
||||||
2. Open editor A, join room "test"
|
|
||||||
3. Open editor B (vim works), join room "test"
|
|
||||||
4. Type in one - should appear in other within 100ms
|
|
||||||
5. Type in both simultaneously - should converge
|
|
||||||
|
|
||||||
## Common Pitfalls
|
|
||||||
|
|
||||||
**Feedback loops:** Applying remote content triggers your change handler,
|
|
||||||
which sends it back. Use a flag to suppress during remote apply.
|
|
||||||
|
|
||||||
**Cursor jitter:** Cursor moves during remote apply. Save/restore position.
|
|
||||||
|
|
||||||
**Large files:** Sending full buffer on every keystroke is fine for <1MB.
|
|
||||||
For larger files, consider debouncing or incremental updates.
|
|
||||||
|
|
||||||
**Encoding:** Always UTF-8. Line endings should be `\n`.
|
|
||||||
```
|
|
||||||
|
|
||||||
**Step 2: Commit**
|
|
||||||
|
|
||||||
```bash
|
|
||||||
git add docs/ADAPTERS.md
|
|
||||||
git commit -m "Add adapter implementation guide"
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Summary
|
|
||||||
|
|
||||||
After completing all tasks:
|
|
||||||
|
|
||||||
- **Cursor sync:** Peers see each other's cursor positions with yellow highlights
|
|
||||||
- **Persistence:** Room state survives daemon restarts via SQLite
|
|
||||||
- **Adapter guide:** Clear documentation for adding new editors
|
|
||||||
|
|
||||||
The vim adapter remains simple (~150 lines). The bridge handles CRDT complexity.
|
|
||||||
New adapters follow the same pattern: spawn bridge, send JSON, apply content.
|
|
||||||
|
|
@ -1,68 +0,0 @@
|
||||||
import { afterAll, beforeAll, describe, expect, test } from "bun:test";
|
|
||||||
import type { Server } from "bun";
|
|
||||||
import { close as closeDb, initDb } from "./db";
|
|
||||||
import type { ServerMessage } from "./protocol";
|
|
||||||
|
|
||||||
describe("awareness routing", () => {
|
|
||||||
let server: Server<unknown>;
|
|
||||||
const PORT = 4042;
|
|
||||||
|
|
||||||
beforeAll(async () => {
|
|
||||||
// Initialize database before starting server
|
|
||||||
initDb(":memory:");
|
|
||||||
// Import and start server on test port
|
|
||||||
process.env.PORT = String(PORT);
|
|
||||||
const mod = await import("./index");
|
|
||||||
server = mod.server;
|
|
||||||
});
|
|
||||||
|
|
||||||
afterAll(() => {
|
|
||||||
server?.stop();
|
|
||||||
closeDb();
|
|
||||||
});
|
|
||||||
|
|
||||||
test("awareness message routes to other peers in same room", async () => {
|
|
||||||
const ws1 = new WebSocket(`ws://localhost:${PORT}/ws`);
|
|
||||||
const ws2 = new WebSocket(`ws://localhost:${PORT}/ws`);
|
|
||||||
|
|
||||||
const received: ServerMessage[] = [];
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
new Promise((r) => {
|
|
||||||
ws1.onopen = r;
|
|
||||||
}),
|
|
||||||
new Promise((r) => {
|
|
||||||
ws2.onopen = r;
|
|
||||||
}),
|
|
||||||
]);
|
|
||||||
|
|
||||||
ws2.onmessage = (e) => {
|
|
||||||
received.push(JSON.parse(e.data));
|
|
||||||
};
|
|
||||||
|
|
||||||
// Both join same room
|
|
||||||
ws1.send(JSON.stringify({ type: "join", room: "test" }));
|
|
||||||
ws2.send(JSON.stringify({ type: "join", room: "test" }));
|
|
||||||
|
|
||||||
await Bun.sleep(50);
|
|
||||||
|
|
||||||
// ws1 sends awareness
|
|
||||||
ws1.send(
|
|
||||||
JSON.stringify({
|
|
||||||
type: "awareness",
|
|
||||||
data: { clientId: 1, cursor: { line: 10, col: 5 } },
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
|
|
||||||
await Bun.sleep(50);
|
|
||||||
|
|
||||||
const awareness = received.find((m) => m.type === "awareness");
|
|
||||||
expect(awareness).toBeDefined();
|
|
||||||
if (awareness?.type === "awareness") {
|
|
||||||
expect(awareness.data.cursor).toEqual({ line: 10, col: 5 });
|
|
||||||
}
|
|
||||||
|
|
||||||
ws1.close();
|
|
||||||
ws2.close();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
@ -1,51 +0,0 @@
|
||||||
import { afterEach, beforeEach, describe, expect, it } from "bun:test";
|
|
||||||
import { close, getUpdates, initDb, saveUpdate } from "./db";
|
|
||||||
|
|
||||||
const TEST_DB = ":memory:";
|
|
||||||
|
|
||||||
describe("db persistence", () => {
|
|
||||||
beforeEach(() => {
|
|
||||||
initDb(TEST_DB);
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(() => {
|
|
||||||
close();
|
|
||||||
});
|
|
||||||
|
|
||||||
it("saves and retrieves updates for a room", () => {
|
|
||||||
const room = "test-room";
|
|
||||||
const update1 = new Uint8Array([1, 2, 3]);
|
|
||||||
const update2 = new Uint8Array([4, 5, 6]);
|
|
||||||
|
|
||||||
saveUpdate(room, update1);
|
|
||||||
saveUpdate(room, update2);
|
|
||||||
|
|
||||||
const updates = getUpdates(room);
|
|
||||||
expect(updates).toHaveLength(2);
|
|
||||||
expect(updates[0]).toEqual(update1);
|
|
||||||
expect(updates[1]).toEqual(update2);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("returns empty array for unknown room", () => {
|
|
||||||
const updates = getUpdates("nonexistent");
|
|
||||||
expect(updates).toEqual([]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("rooms are isolated", () => {
|
|
||||||
const room1 = "room-one";
|
|
||||||
const room2 = "room-two";
|
|
||||||
const update1 = new Uint8Array([1, 1, 1]);
|
|
||||||
const update2 = new Uint8Array([2, 2, 2]);
|
|
||||||
|
|
||||||
saveUpdate(room1, update1);
|
|
||||||
saveUpdate(room2, update2);
|
|
||||||
|
|
||||||
const room1Updates = getUpdates(room1);
|
|
||||||
const room2Updates = getUpdates(room2);
|
|
||||||
|
|
||||||
expect(room1Updates).toHaveLength(1);
|
|
||||||
expect(room2Updates).toHaveLength(1);
|
|
||||||
expect(room1Updates[0]).toEqual(update1);
|
|
||||||
expect(room2Updates[0]).toEqual(update2);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
38
src/db.ts
38
src/db.ts
|
|
@ -1,38 +0,0 @@
|
||||||
import { Database } from "bun:sqlite";
|
|
||||||
|
|
||||||
let db: Database | null = null;
|
|
||||||
|
|
||||||
export function initDb(path: string): void {
|
|
||||||
db = new Database(path);
|
|
||||||
db.exec(`
|
|
||||||
CREATE TABLE IF NOT EXISTS updates (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
room TEXT NOT NULL,
|
|
||||||
data BLOB NOT NULL,
|
|
||||||
created_at INTEGER DEFAULT (unixepoch())
|
|
||||||
);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_room ON updates(room);
|
|
||||||
`);
|
|
||||||
}
|
|
||||||
|
|
||||||
export function saveUpdate(room: string, data: Uint8Array): void {
|
|
||||||
if (!db) throw new Error("Database not initialized");
|
|
||||||
const stmt = db.prepare("INSERT INTO updates (room, data) VALUES (?, ?)");
|
|
||||||
stmt.run(room, data);
|
|
||||||
}
|
|
||||||
|
|
||||||
export function getUpdates(room: string): Uint8Array[] {
|
|
||||||
if (!db) throw new Error("Database not initialized");
|
|
||||||
const stmt = db.prepare(
|
|
||||||
"SELECT data FROM updates WHERE room = ? ORDER BY id ASC",
|
|
||||||
);
|
|
||||||
const rows = stmt.all(room) as Array<{ data: Uint8Array }>;
|
|
||||||
return rows.map((row) => row.data);
|
|
||||||
}
|
|
||||||
|
|
||||||
export function close(): void {
|
|
||||||
if (db) {
|
|
||||||
db.close();
|
|
||||||
db = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
24
src/index.ts
24
src/index.ts
|
|
@ -1,26 +1,15 @@
|
||||||
import { initDb } from "./db";
|
|
||||||
import { decode } from "./protocol";
|
import { decode } from "./protocol";
|
||||||
import {
|
import { type Client, getOrCreateSession, getSession } from "./session";
|
||||||
type Client,
|
|
||||||
getOrCreateSession,
|
|
||||||
getSession,
|
|
||||||
type WsData,
|
|
||||||
} from "./session";
|
|
||||||
|
|
||||||
const PORT = Number(process.env.PORT) || 4040;
|
const PORT = Number(process.env.PORT) || 4040;
|
||||||
|
|
||||||
// Initialize database
|
|
||||||
const DB_PATH = process.env.COLLABD_DB || "collabd.db";
|
|
||||||
initDb(DB_PATH);
|
|
||||||
console.debug(`database initialized: ${DB_PATH}`);
|
|
||||||
|
|
||||||
function isValidRoomName(name: unknown): name is string {
|
function isValidRoomName(name: unknown): name is string {
|
||||||
if (typeof name !== "string") return false;
|
if (typeof name !== "string") return false;
|
||||||
if (name.length === 0 || name.length > 64) return false;
|
if (name.length === 0 || name.length > 64) return false;
|
||||||
return /^[a-zA-Z0-9_-]+$/.test(name);
|
return /^[a-zA-Z0-9_-]+$/.test(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
export const server = Bun.serve<WsData>({
|
Bun.serve({
|
||||||
port: PORT,
|
port: PORT,
|
||||||
fetch(req, server) {
|
fetch(req, server) {
|
||||||
const url = new URL(req.url);
|
const url = new URL(req.url);
|
||||||
|
|
@ -83,15 +72,6 @@ export const server = Bun.serve<WsData>({
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case "awareness": {
|
|
||||||
if (ws.data.room) {
|
|
||||||
const session = getSession(ws.data.room);
|
|
||||||
if (session && "data" in msg) {
|
|
||||||
session.broadcastAwareness(client, msg.data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
close(ws) {
|
close(ws) {
|
||||||
|
|
|
||||||
|
|
@ -1,23 +1,15 @@
|
||||||
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
|
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
|
||||||
import { close as closeDb, initDb } from "./db";
|
|
||||||
import type { ServerMessage } from "./protocol";
|
import type { ServerMessage } from "./protocol";
|
||||||
import { decode } from "./protocol";
|
import { decode } from "./protocol";
|
||||||
import {
|
import { getOrCreateSession, getSession, removeSession } from "./session";
|
||||||
getOrCreateSession,
|
|
||||||
getSession,
|
|
||||||
removeSession,
|
|
||||||
type WsData,
|
|
||||||
} from "./session";
|
|
||||||
|
|
||||||
describe("WebSocket concurrent edits integration", () => {
|
describe("WebSocket concurrent edits integration", () => {
|
||||||
let server: ReturnType<typeof Bun.serve>;
|
let server: ReturnType<typeof Bun.serve>;
|
||||||
const PORT = 4041; // use different port for tests
|
const PORT = 4041; // use different port for tests
|
||||||
|
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
// initialize database for each test
|
|
||||||
initDb(":memory:");
|
|
||||||
// start server for each test
|
// start server for each test
|
||||||
server = Bun.serve<WsData>({
|
server = Bun.serve({
|
||||||
port: PORT,
|
port: PORT,
|
||||||
fetch(req, server) {
|
fetch(req, server) {
|
||||||
const url = new URL(req.url);
|
const url = new URL(req.url);
|
||||||
|
|
@ -82,7 +74,6 @@ describe("WebSocket concurrent edits integration", () => {
|
||||||
|
|
||||||
afterEach(() => {
|
afterEach(() => {
|
||||||
server.stop();
|
server.stop();
|
||||||
closeDb();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
test("two clients concurrent edits converge to same state", async () => {
|
test("two clients concurrent edits converge to same state", async () => {
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ describe("protocol", () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
test("decode parses valid json", () => {
|
test("decode parses valid json", () => {
|
||||||
const msg = { type: "join" as const, room: "test" };
|
const msg = { type: "join", room: "test" };
|
||||||
const decoded = decode(JSON.stringify(msg));
|
const decoded = decode(JSON.stringify(msg));
|
||||||
expect(decoded).toEqual(msg);
|
expect(decoded).toEqual(msg);
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -4,24 +4,12 @@ export type ClientMessage =
|
||||||
| { type: "join"; room: string }
|
| { type: "join"; room: string }
|
||||||
| { type: "leave" }
|
| { type: "leave" }
|
||||||
| { type: "update"; data: number[] } // yjs update as byte array
|
| { type: "update"; data: number[] } // yjs update as byte array
|
||||||
| { type: "awareness"; data: AwarenessState };
|
| { type: "awareness"; data: number[] };
|
||||||
|
|
||||||
export type AwarenessState = {
|
|
||||||
clientId: number;
|
|
||||||
cursor?: { line: number; col: number };
|
|
||||||
selection?: {
|
|
||||||
startLine: number;
|
|
||||||
startCol: number;
|
|
||||||
endLine: number;
|
|
||||||
endCol: number;
|
|
||||||
};
|
|
||||||
name?: string;
|
|
||||||
};
|
|
||||||
|
|
||||||
export type ServerMessage =
|
export type ServerMessage =
|
||||||
| { type: "sync"; data: number[] } // full yjs state
|
| { type: "sync"; data: number[] } // full yjs state
|
||||||
| { type: "update"; data: number[] }
|
| { type: "update"; data: number[] }
|
||||||
| { type: "awareness"; data: AwarenessState }
|
| { type: "awareness"; data: number[] }
|
||||||
| { type: "peers"; count: number }
|
| { type: "peers"; count: number }
|
||||||
| { type: "error"; message: string };
|
| { type: "error"; message: string };
|
||||||
|
|
||||||
|
|
@ -37,31 +25,9 @@ export function encode(msg: ServerMessage): string {
|
||||||
return JSON.stringify(msg);
|
return JSON.stringify(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
function isClientMessage(obj: unknown): obj is ClientMessage {
|
|
||||||
if (typeof obj !== "object" || obj === null) return false;
|
|
||||||
const msg = obj as Record<string, unknown>;
|
|
||||||
|
|
||||||
switch (msg.type) {
|
|
||||||
case "join":
|
|
||||||
return typeof msg.room === "string";
|
|
||||||
case "leave":
|
|
||||||
return true;
|
|
||||||
case "update":
|
|
||||||
return (
|
|
||||||
Array.isArray(msg.data) && msg.data.every((n) => typeof n === "number")
|
|
||||||
);
|
|
||||||
case "awareness":
|
|
||||||
return typeof msg.data === "object" && msg.data !== null;
|
|
||||||
default:
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export function decode(raw: string): ClientMessage | null {
|
export function decode(raw: string): ClientMessage | null {
|
||||||
try {
|
try {
|
||||||
const parsed = JSON.parse(raw);
|
return JSON.parse(raw) as ClientMessage;
|
||||||
if (!isClientMessage(parsed)) return null;
|
|
||||||
return parsed;
|
|
||||||
} catch {
|
} catch {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,21 +1,6 @@
|
||||||
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
|
import { describe, expect, test } from "bun:test";
|
||||||
import * as Y from "yjs";
|
import * as Y from "yjs";
|
||||||
import { close as closeDb, initDb } from "./db";
|
import { getOrCreateSession, Session } from "./session";
|
||||||
import type { ServerMessage } from "./protocol";
|
|
||||||
import {
|
|
||||||
type Client,
|
|
||||||
getOrCreateSession,
|
|
||||||
Session,
|
|
||||||
type WsData,
|
|
||||||
} from "./session";
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
initDb(":memory:");
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(() => {
|
|
||||||
closeDb();
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("Session", () => {
|
describe("Session", () => {
|
||||||
test("creates yjs doc on init", () => {
|
test("creates yjs doc on init", () => {
|
||||||
|
|
@ -28,8 +13,8 @@ describe("Session", () => {
|
||||||
const session = new Session("test-room");
|
const session = new Session("test-room");
|
||||||
const mockWs = {
|
const mockWs = {
|
||||||
send: () => {},
|
send: () => {},
|
||||||
data: { room: null as string | null, client: null },
|
data: { room: null as string | null },
|
||||||
} as unknown as import("bun").ServerWebSocket<WsData>;
|
} as unknown as import("bun").ServerWebSocket<{ room: string | null }>;
|
||||||
const client = { ws: mockWs };
|
const client = { ws: mockWs };
|
||||||
|
|
||||||
expect(session.clients.size).toBe(0);
|
expect(session.clients.size).toBe(0);
|
||||||
|
|
@ -51,8 +36,8 @@ describe("Session", () => {
|
||||||
|
|
||||||
const mockWs = {
|
const mockWs = {
|
||||||
send: () => {},
|
send: () => {},
|
||||||
data: { room: null as string | null, client: null },
|
data: { room: null as string | null },
|
||||||
} as unknown as import("bun").ServerWebSocket<WsData>;
|
} as unknown as import("bun").ServerWebSocket<{ room: string | null }>;
|
||||||
session.applyUpdate(update, { ws: mockWs });
|
session.applyUpdate(update, { ws: mockWs });
|
||||||
|
|
||||||
expect(text.toString()).toBe("hello");
|
expect(text.toString()).toBe("hello");
|
||||||
|
|
@ -72,33 +57,3 @@ describe("getOrCreateSession", () => {
|
||||||
expect(s1).not.toBe(s2);
|
expect(s1).not.toBe(s2);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("awareness", () => {
|
|
||||||
test("broadcasts awareness to other clients", () => {
|
|
||||||
const session = new Session("test-room");
|
|
||||||
const sent1: ServerMessage[] = [];
|
|
||||||
const sent2: ServerMessage[] = [];
|
|
||||||
|
|
||||||
const client1 = {
|
|
||||||
ws: { send: (m: string) => sent1.push(JSON.parse(m)) },
|
|
||||||
} as Client;
|
|
||||||
const client2 = {
|
|
||||||
ws: { send: (m: string) => sent2.push(JSON.parse(m)) },
|
|
||||||
} as Client;
|
|
||||||
|
|
||||||
session.join(client1);
|
|
||||||
session.join(client2);
|
|
||||||
|
|
||||||
const awareness = { clientId: 1, cursor: { line: 5, col: 10 } };
|
|
||||||
session.broadcastAwareness(client1, awareness);
|
|
||||||
|
|
||||||
// client1 should NOT receive their own awareness
|
|
||||||
expect(sent1.filter((m) => m.type === "awareness")).toHaveLength(0);
|
|
||||||
// client2 should receive it
|
|
||||||
expect(sent2.filter((m) => m.type === "awareness")).toHaveLength(1);
|
|
||||||
const awarenessMsg = sent2.find((m) => m.type === "awareness");
|
|
||||||
if (awarenessMsg?.type === "awareness") {
|
|
||||||
expect(awarenessMsg.data).toEqual(awareness);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
|
||||||
|
|
@ -1,22 +1,16 @@
|
||||||
import type { ServerWebSocket } from "bun";
|
import type { ServerWebSocket } from "bun";
|
||||||
import * as Y from "yjs";
|
import * as Y from "yjs";
|
||||||
import { getUpdates, saveUpdate } from "./db";
|
import { encode } from "./protocol";
|
||||||
import { type AwarenessState, encode } from "./protocol";
|
|
||||||
|
|
||||||
export interface WsData {
|
|
||||||
room: string | null;
|
|
||||||
client: Client | null;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface Client {
|
export interface Client {
|
||||||
ws: ServerWebSocket<WsData>;
|
ws: ServerWebSocket<{ room: string | null }>;
|
||||||
}
|
}
|
||||||
|
|
||||||
export class Session {
|
export class Session {
|
||||||
doc: Y.Doc;
|
doc: Y.Doc;
|
||||||
clients: Set<Client> = new Set();
|
clients: Set<Client> = new Set();
|
||||||
|
|
||||||
constructor(public readonly name: string) {
|
constructor(public name: string) {
|
||||||
this.doc = new Y.Doc();
|
this.doc = new Y.Doc();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -50,13 +44,6 @@ export class Session {
|
||||||
applyUpdate(update: Uint8Array, from: Client) {
|
applyUpdate(update: Uint8Array, from: Client) {
|
||||||
try {
|
try {
|
||||||
Y.applyUpdate(this.doc, update);
|
Y.applyUpdate(this.doc, update);
|
||||||
// Persist the update (log but don't fail on db errors)
|
|
||||||
try {
|
|
||||||
saveUpdate(this.name, update);
|
|
||||||
} catch (err) {
|
|
||||||
console.error(`failed to persist update in room ${this.name}:`, err);
|
|
||||||
// Continue - memory state is still updated
|
|
||||||
}
|
|
||||||
// broadcast to others
|
// broadcast to others
|
||||||
for (const client of this.clients) {
|
for (const client of this.clients) {
|
||||||
if (client !== from) {
|
if (client !== from) {
|
||||||
|
|
@ -92,19 +79,6 @@ export class Session {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
broadcastAwareness(sender: Client, state: AwarenessState): void {
|
|
||||||
const message = encode({ type: "awareness", data: state });
|
|
||||||
for (const client of this.clients) {
|
|
||||||
if (client !== sender) {
|
|
||||||
try {
|
|
||||||
client.ws.send(message);
|
|
||||||
} catch {
|
|
||||||
// client disconnected, ignore
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// room name -> session
|
// room name -> session
|
||||||
|
|
@ -114,18 +88,6 @@ export function getOrCreateSession(name: string): Session {
|
||||||
let session = sessions.get(name);
|
let session = sessions.get(name);
|
||||||
if (!session) {
|
if (!session) {
|
||||||
session = new Session(name);
|
session = new Session(name);
|
||||||
// Load persisted updates
|
|
||||||
const updates = getUpdates(name);
|
|
||||||
for (const update of updates) {
|
|
||||||
Y.applyUpdate(session.doc, update);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Double-check pattern - another request may have created session while we loaded
|
|
||||||
const existing = sessions.get(name);
|
|
||||||
if (existing) {
|
|
||||||
return existing;
|
|
||||||
}
|
|
||||||
|
|
||||||
sessions.set(name, session);
|
sessions.set(name, session);
|
||||||
console.debug(`session created: ${name}`);
|
console.debug(`session created: ${name}`);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue