Compare commits

..

26 commits

Author SHA1 Message Date
5d4b144604
Use modern autocmd_add() and autocmd_delete() APIs 2026-01-27 21:43:40 -05:00
30349936d7
Use modern hlset() API for highlight definition 2026-01-27 21:43:27 -05:00
fe368edd74
Convert to method chaining syntax 2026-01-27 21:43:15 -05:00
6138168af0
Convert string concatenation to interpolation 2026-01-27 21:42:59 -05:00
a7c84ed9d5
Add return type annotations to all void functions 2026-01-27 21:42:35 -05:00
936428a029
Add version guard for Vim 9.0 requirement 2026-01-27 21:41:36 -05:00
883c4f9b28
Update docs for persistence support 2026-01-27 21:34:54 -05:00
9d9234173c
Add env var support, error handling, and race protection 2026-01-27 21:34:54 -05:00
ef4b4b06e2
Init database on daemon startup 2026-01-27 21:34:54 -05:00
2082a908fa
Hook persistence into session lifecycle 2026-01-27 21:34:54 -05:00
d904b9d2b6
Fix lint and type errors 2026-01-27 21:34:54 -05:00
415e49327e
Add bun:sqlite persistence layer 2026-01-27 21:34:54 -05:00
a1864e5880
Generate bridge clientId once 2026-01-27 21:13:43 -05:00
ead57baf6f
Fix linter warnings 2026-01-27 21:13:43 -05:00
5275c99976
Send local cursor position to peers 2026-01-27 21:13:26 -05:00
80da4f9f5b
Display peer cursors in vim with yellow highlight 2026-01-27 21:13:26 -05:00
e467881a8c
Forward awareness/cursor to vim from bridge 2026-01-27 21:13:26 -05:00
56aa8dc9bd
Route awareness messages between peers 2026-01-27 21:13:26 -05:00
93bb462ffa
Add awareness broadcast to session 2026-01-27 21:13:07 -05:00
cd4b1def5b
Add a new plan 2026-01-27 21:12:41 -05:00
853fcba641
Add type-safe message decoding 2026-01-27 21:12:41 -05:00
0f506faa5f
Apply biome formatting fixes
Reorder imports to match biome style guide.
2026-01-27 21:05:17 -05:00
4b4b434bf8
Update session test mocks to use WsData type
Change mock websocket objects in session tests to include both room
and client fields, matching the WsData interface definition.
2026-01-27 21:04:54 -05:00
00c9a140c6
Fix type assertion in protocol decode test
Use 'as const' on the type field to ensure the literal type "join" is
preserved, matching the ClientMessage union type requirements.
2026-01-27 21:04:17 -05:00
d654a1bcdf
Add proper typing to integration test websocket mocks
Import and apply WsData type to the test server's websocket handlers
to ensure mock websocket objects match production typing.
2026-01-27 21:03:52 -05:00
0235b2c3e6
Fix websocket data type for room and client storage
Add WsData interface to properly type the websocket data object that
stores room name and client reference. This fixes type errors where
ws.data was previously untyped and causing compilation failures.
2026-01-27 21:03:19 -05:00
15 changed files with 1504 additions and 43 deletions

2
.gitignore vendored
View file

@ -32,3 +32,5 @@ report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json
# Finder (MacOS) folder config
.DS_Store
*.db

View file

@ -15,7 +15,6 @@ working:
not yet:
- cursor/selection sync (awareness protocol stubbed but unused)
- other editor adapters (helix, kakoune, zed)
- persistence (rooms are ephemeral)
- reconnection handling
## stack
@ -28,9 +27,10 @@ not yet:
## running
```bash
just dev # daemon on :4040
bun test # unit tests
just check # biome lint
just dev # daemon on :4040
COLLABD_DB=path.db just dev # custom db path (default: collabd.db)
bun test # unit tests
just check # biome lint
```
## vim adapter usage
@ -66,6 +66,7 @@ server -> client:
- src/index.ts - websocket server, room routing
- src/session.ts - yjs doc per room, peer management
- src/protocol.ts - message types
- src/db.ts - sqlite persistence, save/load crdt updates
- adapters/vim/bridge.ts - bun process vim spawns
- adapters/vim/collab.vim - vim9script plugin
@ -95,7 +96,6 @@ known gaps (not production ready):
- room names guessable via brute force
- no encryption (deploy behind wss, not ws)
- no audit logging
- no persistence (data lost on daemon restart)
before production:
1. auth layer (jwt tokens or unix socket for local-only)

View file

@ -6,11 +6,15 @@ describe("Bridge lifecycle", () => {
const DAEMON_PORT = 4042;
beforeEach(async () => {
const { initDb } = await import("../../src/db");
const { decode } = await import("../../src/protocol");
const { getOrCreateSession, getSession, removeSession } = await import(
"../../src/session"
);
// Initialize database
initDb(":memory:");
// start daemon for bridge to connect to
daemon = Bun.serve({
port: DAEMON_PORT,
@ -75,8 +79,10 @@ describe("Bridge lifecycle", () => {
});
});
afterEach(() => {
afterEach(async () => {
daemon.stop();
const { close: closeDb } = await import("../../src/db");
closeDb();
});
test("bridge starts and signals ready", async () => {
@ -192,3 +198,79 @@ describe("Bridge lifecycle", () => {
expect(exitCode).toBe(0);
});
});
describe("awareness", () => {
test("forwards awareness from daemon to vim", async () => {
// Start daemon
const server = Bun.serve({
port: 4043,
fetch(req, server) {
if (new URL(req.url).pathname === "/ws") {
server.upgrade(req);
return;
}
return new Response("not found", { status: 404 });
},
websocket: {
open(ws) {
// Simulate sending awareness after join
setTimeout(() => {
ws.send(
JSON.stringify({
type: "awareness",
data: {
clientId: 99,
cursor: { line: 3, col: 7 },
name: "peer",
},
}),
);
}, 100);
},
message() {},
close() {},
},
});
const output: string[] = [];
const bridge = spawn({
cmd: ["bun", "adapters/vim/bridge.ts"],
env: { ...process.env, COLLABD_URL: "ws://localhost:4043/ws" },
stdin: "pipe",
stdout: "pipe",
});
const reader = bridge.stdout.getReader();
const decoder = new TextDecoder();
// Collect output
(async () => {
while (true) {
const { done, value } = await reader.read();
if (done) break;
output.push(decoder.decode(value));
}
})();
await new Promise((r) => setTimeout(r, 50));
bridge.stdin.write(
`${JSON.stringify({ type: "connect", room: "test" })}\n`,
);
await new Promise((r) => setTimeout(r, 200));
const awarenessMsg = output
.join("")
.split("\n")
.filter(Boolean)
.map((l) => JSON.parse(l))
.find((m) => m.type === "cursor");
expect(awarenessMsg).toBeDefined();
expect(awarenessMsg.data.line).toBe(3);
expect(awarenessMsg.data.col).toBe(7);
expect(awarenessMsg.data.name).toBe("peer");
bridge.kill();
server.stop();
});
});

View file

@ -10,6 +10,7 @@ let ws: WebSocket | null = null;
let doc: Y.Doc | null = null;
let text: Y.Text | null = null;
let suppressLocal = 0;
let clientId: number | null = null;
function send(msg: object) {
console.log(JSON.stringify(msg));
@ -32,6 +33,7 @@ function connect(roomName: string) {
ws = new WebSocket(DAEMON_URL);
ws.onopen = () => {
clientId = Math.floor(Math.random() * 1000000);
ws?.send(JSON.stringify({ type: "join", room: roomName }));
send({ type: "connected", room: roomName });
};
@ -58,6 +60,21 @@ function connect(roomName: string) {
send({ type: "peers", count: msg.count });
break;
}
case "awareness": {
// Forward cursor info to vim
if (msg.data?.cursor) {
send({
type: "cursor",
data: {
clientId: msg.data.clientId,
line: msg.data.cursor.line,
col: msg.data.cursor.col,
name: msg.data.name || `peer-${msg.data.clientId}`,
},
});
}
break;
}
}
};
@ -145,6 +162,24 @@ for await (const chunk of Bun.stdin.stream()) {
case "content":
setContent(msg.text);
break;
case "cursor":
if (
ws &&
clientId !== null &&
msg.line !== undefined &&
msg.col !== undefined
) {
ws.send(
JSON.stringify({
type: "awareness",
data: {
clientId,
cursor: { line: msg.line, col: msg.col },
},
}),
);
}
break;
case "disconnect":
ws?.close();
send({ type: "disconnected" });

View file

@ -1,26 +1,39 @@
if !has('vim9script') || v:version < 900
finish
endif
vim9script
# collab.vim - collaborative editing adapter for collabd
# requires: bun, collabd daemon running
hlset([{
name: 'PeerCursor',
ctermbg: 'yellow',
ctermfg: 'black',
guibg: 'yellow',
guifg: 'black'
}])
var bridge_job: job = null_job
var bridge_channel: channel = null_channel
var connected = false
var ready = false
var room = ""
var suppressing = false
var peer_match_ids: dict<number> = {}
# path to bridge script (adjust as needed)
const bridge_script = expand('<sfile>:p:h') .. '/bridge.ts'
def Send(msg: dict<any>)
def Send(msg: dict<any>): void
if bridge_channel != null_channel
ch_sendraw(bridge_channel, json_encode(msg) .. "\n")
endif
enddef
def OnOutput(ch: channel, msg: string)
if empty(msg)
def OnOutput(ch: channel, msg: string): void
if msg->empty()
return
endif
var data: dict<any>
@ -34,25 +47,27 @@ def OnOutput(ch: channel, msg: string)
ready = true
elseif data.type == 'connected'
connected = true
echom '[collab] connected to room: ' .. data.room
echom $'[collab] connected to room: {data.room}'
elseif data.type == 'disconnected'
connected = false
echom '[collab] disconnected'
elseif data.type == 'content'
ApplyRemoteContent(data.text)
elseif data.type == 'peers'
echom '[collab] peers: ' .. data.count
echom $'[collab] peers: {data.count}'
elseif data.type == 'cursor'
ShowPeerCursor(data.data)
elseif data.type == 'error'
echoerr '[collab] ' .. data.message
endif
enddef
def ApplyRemoteContent(content: string)
def ApplyRemoteContent(content: string): void
if suppressing
return
endif
suppressing = true
var lines = split(content, "\n", true)
var lines = content->split("\n", true)
var view = winsaveview()
silent! :%delete _
setline(1, lines)
@ -60,16 +75,49 @@ def ApplyRemoteContent(content: string)
suppressing = false
enddef
def SendBuffer()
def SendBuffer(): void
if !connected || suppressing
return
endif
var lines = getline(1, '$')
var content = join(lines, "\n")
var content = lines->join("\n")
Send({type: 'content', text: content})
enddef
export def Connect(room_name: string)
def SendCursor(): void
if !connected
return
endif
const pos = getpos('.')
# pos is [bufnum, line, col, off] - line/col are 1-indexed
Send({type: 'cursor', line: pos[1] - 1, col: pos[2] - 1})
enddef
def ShowPeerCursor(data: dict<any>): void
const client_id = string(data.clientId)
# Clear previous highlight for this peer
if has_key(peer_match_ids, client_id)
silent! matchdelete(peer_match_ids[client_id])
endif
# Highlight the cursor position (line, col are 0-indexed from bridge)
const line_nr = data.line + 1
const col_nr = data.col + 1
# Create a 1-char highlight at cursor position
const pattern = $'\%{line_nr}l\%{col_nr}c.'
peer_match_ids[client_id] = matchadd('PeerCursor', pattern, 10)
enddef
def ClearPeerCursors(): void
for id in values(peer_match_ids)
silent! matchdelete(id)
endfor
peer_match_ids = {}
enddef
export def Connect(room_name: string): void
if bridge_job != null_job
Disconnect()
endif
@ -99,31 +147,40 @@ export def Connect(room_name: string)
Send({type: 'connect', room: room_name})
# set up autocmds to send changes
augroup CollabVim
autocmd!
autocmd TextChanged,TextChangedI * call SendBuffer()
augroup END
autocmd_add([
{
group: 'CollabVim',
event: ['TextChanged', 'TextChangedI'],
pattern: '*',
cmd: 'SendBuffer()'
},
{
group: 'CollabVim',
event: ['CursorMoved', 'CursorMovedI'],
bufnr: bufnr(),
cmd: 'SendCursor()'
}
])
enddef
export def Disconnect()
export def Disconnect(): void
if bridge_job != null_job
Send({type: 'disconnect'})
job_stop(bridge_job)
bridge_job = null_job
bridge_channel = null_channel
endif
ClearPeerCursors()
connected = false
ready = false
room = ""
augroup CollabVim
autocmd!
augroup END
autocmd_delete([{group: 'CollabVim'}])
echom '[collab] disconnected'
enddef
export def Status()
export def Status(): void
if connected
echom '[collab] connected to room: ' .. room
echom $'[collab] connected to room: {room}'
else
echom '[collab] not connected'
endif

View file

@ -0,0 +1,982 @@
# Cursor Sync, Persistence & Adapter Guide
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Add cursor/selection awareness, persistent rooms via bun:sqlite, and document how to write editor adapters.
**Architecture:** Yjs awareness protocol for cursors (ephemeral state, separate from document CRDT). SQLite stores yjs update log per room - replay on daemon restart. Adapter guide documents the vim pattern for other editors.
**Tech Stack:** yjs awareness, bun:sqlite, vim9script
---
## Task 1: Cursor Sync - Protocol & Session
**Files:**
- Modify: `src/protocol.ts:7,12` (awareness types exist, need payload shape)
- Modify: `src/session.ts` (add awareness state tracking)
- Create: `src/session.test.ts` (add awareness tests)
**Step 1: Define awareness payload type in protocol**
In `src/protocol.ts`, the awareness message exists but payload is just `number[]`. Add a typed structure:
```typescript
// After line 14, add:
export type AwarenessState = {
clientId: number;
cursor?: { line: number; col: number };
selection?: { startLine: number; startCol: number; endLine: number; endCol: number };
name?: string;
};
```
**Step 2: Write failing test for awareness broadcast**
Add to `src/session.test.ts`:
```typescript
import { describe, test, expect, mock } from "bun:test";
describe("awareness", () => {
test("broadcasts awareness to other clients", () => {
const session = new Session();
const sent1: unknown[] = [];
const sent2: unknown[] = [];
const client1 = { ws: { send: (m: string) => sent1.push(JSON.parse(m)) } } as Client;
const client2 = { ws: { send: (m: string) => sent2.push(JSON.parse(m)) } } as Client;
session.join(client1);
session.join(client2);
const awareness = { clientId: 1, cursor: { line: 5, col: 10 } };
session.broadcastAwareness(client1, awareness);
// client1 should NOT receive their own awareness
expect(sent1.filter(m => m.type === "awareness")).toHaveLength(0);
// client2 should receive it
expect(sent2.filter(m => m.type === "awareness")).toHaveLength(1);
expect(sent2.find(m => m.type === "awareness")?.data).toEqual(awareness);
});
});
```
**Step 3: Run test to verify it fails**
```bash
bun test src/session.test.ts
```
Expected: FAIL - `broadcastAwareness` is not a function
**Step 4: Implement broadcastAwareness in session.ts**
Add after `broadcastPeerCount()`:
```typescript
broadcastAwareness(sender: Client, state: AwarenessState): void {
const message = encode({ type: "awareness", data: state });
for (const client of this.clients) {
if (client !== sender) {
try {
client.ws.send(message);
} catch {
// client disconnected, ignore
}
}
}
}
```
Update imports at top:
```typescript
import { encode, decode, type AwarenessState } from "./protocol";
```
**Step 5: Run test to verify it passes**
```bash
bun test src/session.test.ts
```
Expected: PASS
**Step 6: Commit**
```bash
git add src/protocol.ts src/session.ts src/session.test.ts
git commit -m "Add awareness broadcast to session"
```
---
## Task 2: Cursor Sync - Daemon Routing
**Files:**
- Modify: `src/index.ts:68-73` (add awareness case)
- Create: `src/index.test.ts` (integration test for awareness routing)
**Step 1: Write failing integration test**
Create `src/awareness.test.ts`:
```typescript
import { describe, test, expect, beforeAll, afterAll } from "bun:test";
import type { Server } from "bun";
describe("awareness routing", () => {
let server: Server;
const PORT = 4042;
beforeAll(async () => {
// Import and start server on test port
process.env.PORT = String(PORT);
const mod = await import("./index");
server = mod.server;
});
afterAll(() => {
server?.stop();
});
test("awareness message routes to other peers in same room", async () => {
const ws1 = new WebSocket(`ws://localhost:${PORT}/ws`);
const ws2 = new WebSocket(`ws://localhost:${PORT}/ws`);
const received: unknown[] = [];
await Promise.all([
new Promise(r => ws1.onopen = r),
new Promise(r => ws2.onopen = r),
]);
ws2.onmessage = (e) => {
received.push(JSON.parse(e.data));
};
// Both join same room
ws1.send(JSON.stringify({ type: "join", room: "test" }));
ws2.send(JSON.stringify({ type: "join", room: "test" }));
await Bun.sleep(50);
// ws1 sends awareness
ws1.send(JSON.stringify({
type: "awareness",
data: { clientId: 1, cursor: { line: 10, col: 5 } }
}));
await Bun.sleep(50);
const awareness = received.find(m => m.type === "awareness");
expect(awareness).toBeDefined();
expect(awareness.data.cursor).toEqual({ line: 10, col: 5 });
ws1.close();
ws2.close();
});
});
```
**Step 2: Run test to verify it fails**
```bash
bun test src/awareness.test.ts
```
Expected: FAIL - awareness not received (no routing in index.ts yet)
**Step 3: Add awareness routing to index.ts**
In `src/index.ts`, the message handler switch (around line 43), add after the `update` case:
```typescript
case "awareness":
if (client.room) {
const session = getSession(client.room);
if (session && "data" in msg) {
session.broadcastAwareness(client, msg.data);
}
}
break;
```
Also update `isClientMessage` in protocol.ts to validate awareness data shape:
```typescript
// In isClientMessage(), add validation for awareness:
if (msg.type === "awareness") {
return typeof msg.data === "object" && msg.data !== null;
}
```
**Step 4: Run test to verify it passes**
```bash
bun test src/awareness.test.ts
```
Expected: PASS
**Step 5: Commit**
```bash
git add src/index.ts src/protocol.ts src/awareness.test.ts
git commit -m "Route awareness messages between peers"
```
---
## Task 3: Cursor Sync - Bridge
**Files:**
- Modify: `adapters/vim/bridge.ts` (send/receive awareness)
- Modify: `adapters/vim/bridge.test.ts` (test awareness flow)
**Step 1: Write failing test for awareness in bridge**
Add to `adapters/vim/bridge.test.ts`:
```typescript
describe("awareness", () => {
test("forwards awareness from daemon to vim", async () => {
// Start daemon
const server = Bun.serve({
port: 4043,
fetch(req, server) {
if (new URL(req.url).pathname === "/ws") {
server.upgrade(req);
return;
}
return new Response("not found", { status: 404 });
},
websocket: {
open(ws) {
// Simulate sending awareness after join
setTimeout(() => {
ws.send(JSON.stringify({
type: "awareness",
data: { clientId: 99, cursor: { line: 3, col: 7 }, name: "peer" }
}));
}, 100);
},
message() {},
close() {},
},
});
const output: string[] = [];
const bridge = Bun.spawn(["bun", "run", "adapters/vim/bridge.ts"], {
env: { ...process.env, DAEMON_URL: "ws://localhost:4043/ws" },
stdin: "pipe",
stdout: "pipe",
});
const reader = bridge.stdout.getReader();
const decoder = new TextDecoder();
// Collect output
(async () => {
while (true) {
const { done, value } = await reader.read();
if (done) break;
output.push(decoder.decode(value));
}
})();
await Bun.sleep(50);
bridge.stdin.write(JSON.stringify({ type: "connect", room: "test" }) + "\n");
await Bun.sleep(200);
const awarenessMsg = output.join("").split("\n")
.filter(Boolean)
.map(l => JSON.parse(l))
.find(m => m.type === "cursor");
expect(awarenessMsg).toBeDefined();
expect(awarenessMsg.data.line).toBe(3);
expect(awarenessMsg.data.col).toBe(7);
expect(awarenessMsg.data.name).toBe("peer");
bridge.kill();
server.stop();
});
});
```
**Step 2: Run test to verify it fails**
```bash
bun test adapters/vim/bridge.test.ts
```
Expected: FAIL - no cursor message received
**Step 3: Handle awareness in bridge.ts**
In `adapters/vim/bridge.ts`, in the `ws.onmessage` handler, add a case for awareness:
```typescript
case "awareness":
// Forward cursor info to vim
if (parsed.data?.cursor) {
send({
type: "cursor",
data: {
clientId: parsed.data.clientId,
line: parsed.data.cursor.line,
col: parsed.data.cursor.col,
name: parsed.data.name || `peer-${parsed.data.clientId}`,
},
});
}
break;
```
**Step 4: Run test to verify it passes**
```bash
bun test adapters/vim/bridge.test.ts
```
Expected: PASS
**Step 5: Commit**
```bash
git add adapters/vim/bridge.ts adapters/vim/bridge.test.ts
git commit -m "Forward awareness/cursor to vim from bridge"
```
---
## Task 4: Cursor Sync - Vim Plugin
**Files:**
- Modify: `adapters/vim/collab.vim` (receive cursor, display highlight)
**Step 1: Add cursor handling to vim plugin**
In `adapters/vim/collab.vim`, in `OnOutput()` function, add a case for cursor messages:
```vim
elseif msg.type ==# 'cursor'
call s:ShowPeerCursor(msg.data)
```
Add the display function:
```vim
var peer_match_ids: dict<number> = {}
def ShowPeerCursor(data: dict<any>)
const client_id = string(data.clientId)
# Clear previous highlight for this peer
if has_key(peer_match_ids, client_id)
silent! matchdelete(peer_match_ids[client_id])
endif
# Highlight the cursor position (line, col are 0-indexed from bridge)
const line_nr = data.line + 1
const col_nr = data.col + 1
# Create a 1-char highlight at cursor position
const pattern = '\%' .. line_nr .. 'l\%' .. col_nr .. 'c.'
peer_match_ids[client_id] = matchadd('PeerCursor', pattern, 10)
enddef
def ClearPeerCursors()
for id in values(peer_match_ids)
silent! matchdelete(id)
endfor
peer_match_ids = {}
enddef
```
Add highlight group setup at top of file:
```vim
highlight PeerCursor ctermbg=yellow guibg=yellow ctermfg=black guifg=black
```
Call `ClearPeerCursors()` in `Disconnect()`.
**Step 2: Manual test**
No automated test for vim highlight rendering. Test manually:
1. Open two vim instances
2. `:CollabJoin testroom` in both
3. Move cursor in one - should see yellow highlight in other
**Step 3: Commit**
```bash
git add adapters/vim/collab.vim
git commit -m "Display peer cursors in vim with yellow highlight"
```
---
## Task 5: Cursor Sync - Send Local Cursor
**Files:**
- Modify: `adapters/vim/collab.vim` (send cursor on CursorMoved)
- Modify: `adapters/vim/bridge.ts` (forward cursor to daemon)
**Step 1: Send cursor position from vim**
In `adapters/vim/collab.vim`, add autocmd in `Connect()`:
```vim
autocmd CursorMoved,CursorMovedI <buffer> call s:SendCursor()
```
Add the send function:
```vim
def SendCursor()
if !connected
return
endif
const pos = getpos('.')
# pos is [bufnum, line, col, off] - line/col are 1-indexed
Send({type: 'cursor', line: pos[1] - 1, col: pos[2] - 1})
enddef
```
**Step 2: Forward cursor from bridge to daemon**
In `adapters/vim/bridge.ts`, in the stdin message handler, add:
```typescript
case "cursor":
if (ws && msg.line !== undefined && msg.col !== undefined) {
ws.send(JSON.stringify({
type: "awareness",
data: {
clientId: Math.floor(Math.random() * 1000000), // TODO: stable client id
cursor: { line: msg.line, col: msg.col },
},
}));
}
break;
```
**Step 3: Manual test**
1. Two vim instances, same room
2. Move cursor in one - yellow highlight appears in other
3. Move cursor in other - highlight updates
**Step 4: Commit**
```bash
git add adapters/vim/collab.vim adapters/vim/bridge.ts
git commit -m "Send local cursor position to peers"
```
---
## Task 6: Persistence - Schema & Setup
**Files:**
- Create: `src/db.ts` (database module)
- Create: `src/db.test.ts` (database tests)
**Step 1: Write failing test for db module**
Create `src/db.test.ts`:
```typescript
import { describe, test, expect, beforeEach, afterEach } from "bun:test";
import { unlinkSync } from "fs";
import { initDb, saveUpdate, getUpdates, close } from "./db";
describe("persistence", () => {
const TEST_DB = ":memory:";
beforeEach(() => {
initDb(TEST_DB);
});
afterEach(() => {
close();
});
test("saves and retrieves updates for a room", () => {
const room = "testroom";
const update1 = new Uint8Array([1, 2, 3]);
const update2 = new Uint8Array([4, 5, 6]);
saveUpdate(room, update1);
saveUpdate(room, update2);
const updates = getUpdates(room);
expect(updates).toHaveLength(2);
expect(updates[0]).toEqual(update1);
expect(updates[1]).toEqual(update2);
});
test("returns empty array for unknown room", () => {
const updates = getUpdates("nonexistent");
expect(updates).toEqual([]);
});
test("rooms are isolated", () => {
saveUpdate("room1", new Uint8Array([1]));
saveUpdate("room2", new Uint8Array([2]));
expect(getUpdates("room1")).toHaveLength(1);
expect(getUpdates("room2")).toHaveLength(1);
expect(getUpdates("room1")[0]).toEqual(new Uint8Array([1]));
});
});
```
**Step 2: Run test to verify it fails**
```bash
bun test src/db.test.ts
```
Expected: FAIL - module not found
**Step 3: Implement db.ts with bun:sqlite**
Create `src/db.ts`:
```typescript
import { Database } from "bun:sqlite";
let db: Database | null = null;
export function initDb(path: string = "collabd.db"): void {
db = new Database(path);
db.run(`
CREATE TABLE IF NOT EXISTS updates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
room TEXT NOT NULL,
data BLOB NOT NULL,
created_at INTEGER DEFAULT (unixepoch())
)
`);
db.run(`CREATE INDEX IF NOT EXISTS idx_room ON updates(room)`);
}
export function saveUpdate(room: string, data: Uint8Array): void {
if (!db) throw new Error("Database not initialized");
db.run("INSERT INTO updates (room, data) VALUES (?, ?)", [room, data]);
}
export function getUpdates(room: string): Uint8Array[] {
if (!db) throw new Error("Database not initialized");
const rows = db.query("SELECT data FROM updates WHERE room = ? ORDER BY id").all(room) as { data: Uint8Array }[];
return rows.map(r => new Uint8Array(r.data));
}
export function close(): void {
db?.close();
db = null;
}
```
**Step 4: Run test to verify it passes**
```bash
bun test src/db.test.ts
```
Expected: PASS
**Step 5: Commit**
```bash
git add src/db.ts src/db.test.ts
git commit -m "Add bun:sqlite persistence layer"
```
---
## Task 7: Persistence - Hook Into Session
**Files:**
- Modify: `src/session.ts` (save updates, load on create)
- Modify: `src/session.test.ts` (test persistence integration)
**Step 1: Write failing test for session persistence**
Add to `src/session.test.ts`:
```typescript
import { initDb, getUpdates, close as closeDb } from "./db";
describe("session persistence", () => {
beforeEach(() => {
initDb(":memory:");
});
afterEach(() => {
closeDb();
});
test("saves updates to database", () => {
const session = getOrCreateSession("persist-test");
const client = mockClient();
session.join(client);
// Create a yjs update
const doc = new Y.Doc();
const text = doc.getText("content");
text.insert(0, "hello");
const update = Y.encodeStateAsUpdate(doc);
session.applyUpdate(client, Array.from(update));
// Check database has the update
const saved = getUpdates("persist-test");
expect(saved.length).toBeGreaterThan(0);
});
test("loads existing updates on session create", () => {
// First session creates content
const session1 = getOrCreateSession("reload-test");
const client1 = mockClient();
session1.join(client1);
const doc = new Y.Doc();
const text = doc.getText("content");
text.insert(0, "persisted");
const update = Y.encodeStateAsUpdate(doc);
session1.applyUpdate(client1, Array.from(update));
// Clear in-memory sessions
removeSession("reload-test");
// New session should load from db
const session2 = getOrCreateSession("reload-test");
const content = session2.doc.getText("content").toString();
expect(content).toBe("persisted");
});
});
```
**Step 2: Run test to verify it fails**
```bash
bun test src/session.test.ts
```
Expected: FAIL - updates not persisted
**Step 3: Modify session.ts to persist updates**
In `src/session.ts`:
```typescript
import { saveUpdate, getUpdates } from "./db";
// In getOrCreateSession():
export function getOrCreateSession(room: string): Session {
let session = sessions.get(room);
if (!session) {
session = new Session();
// Load persisted updates
const updates = getUpdates(room);
for (const update of updates) {
Y.applyUpdate(session.doc, update);
}
sessions.set(room, session);
}
return session;
}
// In Session.applyUpdate(), after Y.applyUpdate():
applyUpdate(sender: Client, data: number[]): void {
const update = new Uint8Array(data);
Y.applyUpdate(this.doc, update);
// Persist the update (need room name - add to session)
if (this.room) {
saveUpdate(this.room, update);
}
// ... rest of broadcast logic
}
```
Add room tracking to Session:
```typescript
class Session {
doc: Y.Doc;
clients: Set<Client>;
room?: string; // Add this
// Update getOrCreateSession to set it:
session.room = room;
}
```
**Step 4: Run test to verify it passes**
```bash
bun test src/session.test.ts
```
Expected: PASS
**Step 5: Commit**
```bash
git add src/session.ts src/session.test.ts
git commit -m "Persist yjs updates to sqlite, reload on session create"
```
---
## Task 8: Persistence - Init on Daemon Start
**Files:**
- Modify: `src/index.ts` (init db on startup)
**Step 1: Add db initialization**
In `src/index.ts`, at the top after imports:
```typescript
import { initDb } from "./db";
// Initialize database
const DB_PATH = process.env.COLLABD_DB || "collabd.db";
initDb(DB_PATH);
```
**Step 2: Manual test**
1. Start daemon: `just dev`
2. Join room, make edits
3. Kill daemon (Ctrl+C)
4. Restart daemon
5. Join same room - content should be there
**Step 3: Commit**
```bash
git add src/index.ts
git commit -m "Initialize sqlite database on daemon startup"
```
---
## Task 9: Write Adapter Guide
**Files:**
- Create: `docs/ADAPTERS.md`
**Step 1: Write the adapter guide**
Create `docs/ADAPTERS.md`:
```markdown
# Writing Editor Adapters for collabd
This guide explains how to write an adapter that lets any editor participate
in collaborative editing sessions.
## Architecture
```
┌─────────────┐ json/channel ┌─────────────┐ websocket ┌─────────────┐
│ Editor │ ◄──────────────────► │ Bridge │ ◄────────────────► │ Daemon │
│ (plugin) │ │ (bun) │ │ :4040 │
└─────────────┘ └─────────────┘ └─────────────┘
```
The **bridge** is a Bun process that:
- Manages the Yjs document (CRDT state)
- Translates editor buffer changes to Yjs operations
- Translates remote Yjs updates to buffer content
- Speaks a simple JSON protocol with the editor plugin
The **plugin** is editor-specific code that:
- Hooks buffer change events
- Sends buffer content to bridge
- Applies remote content from bridge
- Optionally displays peer cursors
## Why a Bridge?
Most editors can't embed Yjs directly:
- Vim: No npm/node, limited async
- Helix: Rust-only plugins
- Kakoune: Shell-based scripting
The bridge isolates CRDT complexity. Plugins stay simple.
## Protocol: Plugin ↔ Bridge
Messages are newline-delimited JSON.
### Plugin → Bridge
```json
{"type": "connect", "room": "myroom"}
{"type": "disconnect"}
{"type": "content", "text": "full buffer contents"}
{"type": "cursor", "line": 5, "col": 10}
```
### Bridge → Plugin
```json
{"type": "ready"}
{"type": "connected", "room": "myroom"}
{"type": "disconnected"}
{"type": "content", "text": "full buffer contents"}
{"type": "peers", "count": 2}
{"type": "cursor", "data": {"clientId": 123, "line": 5, "col": 10, "name": "peer-123"}}
{"type": "error", "message": "connection failed"}
```
## Implementing a Plugin
### 1. Spawn the Bridge
```
bun run adapters/vim/bridge.ts
```
Or use your own bridge (see next section).
Communication via stdin/stdout with JSON lines.
### 2. Wait for Ready
Bridge sends `{"type": "ready"}` when it starts. Then send connect:
```json
{"type": "connect", "room": "myroom"}
```
### 3. Handle Content Updates
When you receive `{"type": "content", "text": "..."}`:
1. Save cursor position
2. Replace buffer contents with received text
3. Restore cursor position (clamped to valid range)
**Critical:** Do NOT trigger your change handler when applying remote content,
or you'll create an infinite loop.
### 4. Send Local Changes
When user edits the buffer:
1. Get full buffer text
2. Send `{"type": "content", "text": "full text"}`
The bridge handles diffing - just send the whole buffer.
### 5. Cursor Sync (Optional)
On cursor move:
```json
{"type": "cursor", "line": 5, "col": 10}
```
On receiving cursor:
```json
{"type": "cursor", "data": {"line": 5, "col": 10, "name": "peer"}}
```
Render a highlight/virtual text at that position.
## Implementing a Custom Bridge
If you need a bridge in a different language (e.g., Rust for Helix),
implement the same protocol. Key responsibilities:
1. **Connect to daemon** at `ws://localhost:4040/ws`
2. **Manage Y.Doc** - create on connect, apply updates
3. **Diff buffer changes** - when receiving content from plugin:
```
old_text = current Y.Text content
new_text = received buffer content
diff = compute_diff(old, new)
apply diff as Y.Text operations
```
4. **Forward updates** - when Y.Doc changes from remote:
```
send full text to plugin as {"type": "content", ...}
```
5. **Handle awareness** - forward cursor positions both ways
### Yjs Libraries by Language
- **JavaScript/TypeScript:** `yjs` (npm)
- **Rust:** `yrs` (crates.io) - Yjs port
- **Python:** `pycrdt` (pypi)
- **Go:** `yjs-go` (experimental)
## Reference: Vim Adapter
See `adapters/vim/` for a complete example:
- `collab.vim` - 135 lines of Vim9script
- `bridge.ts` - 160 lines of TypeScript
The vim plugin:
1. Spawns bridge as job
2. Communicates via channels (vim's async IPC)
3. Sets TextChanged autocmd to detect edits
4. Applies remote content with `:delete` + `setline()`
## Testing Your Adapter
1. Start daemon: `just dev`
2. Open editor A, join room "test"
3. Open editor B (vim works), join room "test"
4. Type in one - should appear in other within 100ms
5. Type in both simultaneously - should converge
## Common Pitfalls
**Feedback loops:** Applying remote content triggers your change handler,
which sends it back. Use a flag to suppress during remote apply.
**Cursor jitter:** Cursor moves during remote apply. Save/restore position.
**Large files:** Sending full buffer on every keystroke is fine for <1MB.
For larger files, consider debouncing or incremental updates.
**Encoding:** Always UTF-8. Line endings should be `\n`.
```
**Step 2: Commit**
```bash
git add docs/ADAPTERS.md
git commit -m "Add adapter implementation guide"
```
---
## Summary
After completing all tasks:
- **Cursor sync:** Peers see each other's cursor positions with yellow highlights
- **Persistence:** Room state survives daemon restarts via SQLite
- **Adapter guide:** Clear documentation for adding new editors
The vim adapter remains simple (~150 lines). The bridge handles CRDT complexity.
New adapters follow the same pattern: spawn bridge, send JSON, apply content.

68
src/awareness.test.ts Normal file
View file

@ -0,0 +1,68 @@
import { afterAll, beforeAll, describe, expect, test } from "bun:test";
import type { Server } from "bun";
import { close as closeDb, initDb } from "./db";
import type { ServerMessage } from "./protocol";
describe("awareness routing", () => {
let server: Server<unknown>;
const PORT = 4042;
beforeAll(async () => {
// Initialize database before starting server
initDb(":memory:");
// Import and start server on test port
process.env.PORT = String(PORT);
const mod = await import("./index");
server = mod.server;
});
afterAll(() => {
server?.stop();
closeDb();
});
test("awareness message routes to other peers in same room", async () => {
const ws1 = new WebSocket(`ws://localhost:${PORT}/ws`);
const ws2 = new WebSocket(`ws://localhost:${PORT}/ws`);
const received: ServerMessage[] = [];
await Promise.all([
new Promise((r) => {
ws1.onopen = r;
}),
new Promise((r) => {
ws2.onopen = r;
}),
]);
ws2.onmessage = (e) => {
received.push(JSON.parse(e.data));
};
// Both join same room
ws1.send(JSON.stringify({ type: "join", room: "test" }));
ws2.send(JSON.stringify({ type: "join", room: "test" }));
await Bun.sleep(50);
// ws1 sends awareness
ws1.send(
JSON.stringify({
type: "awareness",
data: { clientId: 1, cursor: { line: 10, col: 5 } },
}),
);
await Bun.sleep(50);
const awareness = received.find((m) => m.type === "awareness");
expect(awareness).toBeDefined();
if (awareness?.type === "awareness") {
expect(awareness.data.cursor).toEqual({ line: 10, col: 5 });
}
ws1.close();
ws2.close();
});
});

51
src/db.test.ts Normal file
View file

@ -0,0 +1,51 @@
import { afterEach, beforeEach, describe, expect, it } from "bun:test";
import { close, getUpdates, initDb, saveUpdate } from "./db";
const TEST_DB = ":memory:";
describe("db persistence", () => {
beforeEach(() => {
initDb(TEST_DB);
});
afterEach(() => {
close();
});
it("saves and retrieves updates for a room", () => {
const room = "test-room";
const update1 = new Uint8Array([1, 2, 3]);
const update2 = new Uint8Array([4, 5, 6]);
saveUpdate(room, update1);
saveUpdate(room, update2);
const updates = getUpdates(room);
expect(updates).toHaveLength(2);
expect(updates[0]).toEqual(update1);
expect(updates[1]).toEqual(update2);
});
it("returns empty array for unknown room", () => {
const updates = getUpdates("nonexistent");
expect(updates).toEqual([]);
});
it("rooms are isolated", () => {
const room1 = "room-one";
const room2 = "room-two";
const update1 = new Uint8Array([1, 1, 1]);
const update2 = new Uint8Array([2, 2, 2]);
saveUpdate(room1, update1);
saveUpdate(room2, update2);
const room1Updates = getUpdates(room1);
const room2Updates = getUpdates(room2);
expect(room1Updates).toHaveLength(1);
expect(room2Updates).toHaveLength(1);
expect(room1Updates[0]).toEqual(update1);
expect(room2Updates[0]).toEqual(update2);
});
});

38
src/db.ts Normal file
View file

@ -0,0 +1,38 @@
import { Database } from "bun:sqlite";
let db: Database | null = null;
export function initDb(path: string): void {
db = new Database(path);
db.exec(`
CREATE TABLE IF NOT EXISTS updates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
room TEXT NOT NULL,
data BLOB NOT NULL,
created_at INTEGER DEFAULT (unixepoch())
);
CREATE INDEX IF NOT EXISTS idx_room ON updates(room);
`);
}
export function saveUpdate(room: string, data: Uint8Array): void {
if (!db) throw new Error("Database not initialized");
const stmt = db.prepare("INSERT INTO updates (room, data) VALUES (?, ?)");
stmt.run(room, data);
}
export function getUpdates(room: string): Uint8Array[] {
if (!db) throw new Error("Database not initialized");
const stmt = db.prepare(
"SELECT data FROM updates WHERE room = ? ORDER BY id ASC",
);
const rows = stmt.all(room) as Array<{ data: Uint8Array }>;
return rows.map((row) => row.data);
}
export function close(): void {
if (db) {
db.close();
db = null;
}
}

View file

@ -1,15 +1,26 @@
import { initDb } from "./db";
import { decode } from "./protocol";
import { type Client, getOrCreateSession, getSession } from "./session";
import {
type Client,
getOrCreateSession,
getSession,
type WsData,
} from "./session";
const PORT = Number(process.env.PORT) || 4040;
// Initialize database
const DB_PATH = process.env.COLLABD_DB || "collabd.db";
initDb(DB_PATH);
console.debug(`database initialized: ${DB_PATH}`);
function isValidRoomName(name: unknown): name is string {
if (typeof name !== "string") return false;
if (name.length === 0 || name.length > 64) return false;
return /^[a-zA-Z0-9_-]+$/.test(name);
}
Bun.serve({
export const server = Bun.serve<WsData>({
port: PORT,
fetch(req, server) {
const url = new URL(req.url);
@ -72,6 +83,15 @@ Bun.serve({
}
break;
}
case "awareness": {
if (ws.data.room) {
const session = getSession(ws.data.room);
if (session && "data" in msg) {
session.broadcastAwareness(client, msg.data);
}
}
break;
}
}
},
close(ws) {

View file

@ -1,15 +1,23 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { close as closeDb, initDb } from "./db";
import type { ServerMessage } from "./protocol";
import { decode } from "./protocol";
import { getOrCreateSession, getSession, removeSession } from "./session";
import {
getOrCreateSession,
getSession,
removeSession,
type WsData,
} from "./session";
describe("WebSocket concurrent edits integration", () => {
let server: ReturnType<typeof Bun.serve>;
const PORT = 4041; // use different port for tests
beforeEach(() => {
// initialize database for each test
initDb(":memory:");
// start server for each test
server = Bun.serve({
server = Bun.serve<WsData>({
port: PORT,
fetch(req, server) {
const url = new URL(req.url);
@ -74,6 +82,7 @@ describe("WebSocket concurrent edits integration", () => {
afterEach(() => {
server.stop();
closeDb();
});
test("two clients concurrent edits converge to same state", async () => {

View file

@ -9,7 +9,7 @@ describe("protocol", () => {
});
test("decode parses valid json", () => {
const msg = { type: "join", room: "test" };
const msg = { type: "join" as const, room: "test" };
const decoded = decode(JSON.stringify(msg));
expect(decoded).toEqual(msg);
});

View file

@ -4,12 +4,24 @@ export type ClientMessage =
| { type: "join"; room: string }
| { type: "leave" }
| { type: "update"; data: number[] } // yjs update as byte array
| { type: "awareness"; data: number[] };
| { type: "awareness"; data: AwarenessState };
export type AwarenessState = {
clientId: number;
cursor?: { line: number; col: number };
selection?: {
startLine: number;
startCol: number;
endLine: number;
endCol: number;
};
name?: string;
};
export type ServerMessage =
| { type: "sync"; data: number[] } // full yjs state
| { type: "update"; data: number[] }
| { type: "awareness"; data: number[] }
| { type: "awareness"; data: AwarenessState }
| { type: "peers"; count: number }
| { type: "error"; message: string };
@ -25,9 +37,31 @@ export function encode(msg: ServerMessage): string {
return JSON.stringify(msg);
}
function isClientMessage(obj: unknown): obj is ClientMessage {
if (typeof obj !== "object" || obj === null) return false;
const msg = obj as Record<string, unknown>;
switch (msg.type) {
case "join":
return typeof msg.room === "string";
case "leave":
return true;
case "update":
return (
Array.isArray(msg.data) && msg.data.every((n) => typeof n === "number")
);
case "awareness":
return typeof msg.data === "object" && msg.data !== null;
default:
return false;
}
}
export function decode(raw: string): ClientMessage | null {
try {
return JSON.parse(raw) as ClientMessage;
const parsed = JSON.parse(raw);
if (!isClientMessage(parsed)) return null;
return parsed;
} catch {
return null;
}

View file

@ -1,6 +1,21 @@
import { describe, expect, test } from "bun:test";
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import * as Y from "yjs";
import { getOrCreateSession, Session } from "./session";
import { close as closeDb, initDb } from "./db";
import type { ServerMessage } from "./protocol";
import {
type Client,
getOrCreateSession,
Session,
type WsData,
} from "./session";
beforeEach(() => {
initDb(":memory:");
});
afterEach(() => {
closeDb();
});
describe("Session", () => {
test("creates yjs doc on init", () => {
@ -13,8 +28,8 @@ describe("Session", () => {
const session = new Session("test-room");
const mockWs = {
send: () => {},
data: { room: null as string | null },
} as unknown as import("bun").ServerWebSocket<{ room: string | null }>;
data: { room: null as string | null, client: null },
} as unknown as import("bun").ServerWebSocket<WsData>;
const client = { ws: mockWs };
expect(session.clients.size).toBe(0);
@ -36,8 +51,8 @@ describe("Session", () => {
const mockWs = {
send: () => {},
data: { room: null as string | null },
} as unknown as import("bun").ServerWebSocket<{ room: string | null }>;
data: { room: null as string | null, client: null },
} as unknown as import("bun").ServerWebSocket<WsData>;
session.applyUpdate(update, { ws: mockWs });
expect(text.toString()).toBe("hello");
@ -57,3 +72,33 @@ describe("getOrCreateSession", () => {
expect(s1).not.toBe(s2);
});
});
describe("awareness", () => {
test("broadcasts awareness to other clients", () => {
const session = new Session("test-room");
const sent1: ServerMessage[] = [];
const sent2: ServerMessage[] = [];
const client1 = {
ws: { send: (m: string) => sent1.push(JSON.parse(m)) },
} as Client;
const client2 = {
ws: { send: (m: string) => sent2.push(JSON.parse(m)) },
} as Client;
session.join(client1);
session.join(client2);
const awareness = { clientId: 1, cursor: { line: 5, col: 10 } };
session.broadcastAwareness(client1, awareness);
// client1 should NOT receive their own awareness
expect(sent1.filter((m) => m.type === "awareness")).toHaveLength(0);
// client2 should receive it
expect(sent2.filter((m) => m.type === "awareness")).toHaveLength(1);
const awarenessMsg = sent2.find((m) => m.type === "awareness");
if (awarenessMsg?.type === "awareness") {
expect(awarenessMsg.data).toEqual(awareness);
}
});
});

View file

@ -1,16 +1,22 @@
import type { ServerWebSocket } from "bun";
import * as Y from "yjs";
import { encode } from "./protocol";
import { getUpdates, saveUpdate } from "./db";
import { type AwarenessState, encode } from "./protocol";
export interface WsData {
room: string | null;
client: Client | null;
}
export interface Client {
ws: ServerWebSocket<{ room: string | null }>;
ws: ServerWebSocket<WsData>;
}
export class Session {
doc: Y.Doc;
clients: Set<Client> = new Set();
constructor(public name: string) {
constructor(public readonly name: string) {
this.doc = new Y.Doc();
}
@ -44,6 +50,13 @@ export class Session {
applyUpdate(update: Uint8Array, from: Client) {
try {
Y.applyUpdate(this.doc, update);
// Persist the update (log but don't fail on db errors)
try {
saveUpdate(this.name, update);
} catch (err) {
console.error(`failed to persist update in room ${this.name}:`, err);
// Continue - memory state is still updated
}
// broadcast to others
for (const client of this.clients) {
if (client !== from) {
@ -79,6 +92,19 @@ export class Session {
}
}
}
broadcastAwareness(sender: Client, state: AwarenessState): void {
const message = encode({ type: "awareness", data: state });
for (const client of this.clients) {
if (client !== sender) {
try {
client.ws.send(message);
} catch {
// client disconnected, ignore
}
}
}
}
}
// room name -> session
@ -88,6 +114,18 @@ export function getOrCreateSession(name: string): Session {
let session = sessions.get(name);
if (!session) {
session = new Session(name);
// Load persisted updates
const updates = getUpdates(name);
for (const update of updates) {
Y.applyUpdate(session.doc, update);
}
// Double-check pattern - another request may have created session while we loaded
const existing = sessions.get(name);
if (existing) {
return existing;
}
sessions.set(name, session);
console.debug(`session created: ${name}`);
}