Hook persistence into session lifecycle

This commit is contained in:
Jared Miller 2026-01-27 21:22:52 -05:00
parent d904b9d2b6
commit 2082a908fa
Signed by: shmup
GPG key ID: 22B5C6D66A38B06C
5 changed files with 33 additions and 2 deletions

View file

@ -6,11 +6,15 @@ describe("Bridge lifecycle", () => {
const DAEMON_PORT = 4042; const DAEMON_PORT = 4042;
beforeEach(async () => { beforeEach(async () => {
const { initDb } = await import("../../src/db");
const { decode } = await import("../../src/protocol"); const { decode } = await import("../../src/protocol");
const { getOrCreateSession, getSession, removeSession } = await import( const { getOrCreateSession, getSession, removeSession } = await import(
"../../src/session" "../../src/session"
); );
// Initialize database
initDb(":memory:");
// start daemon for bridge to connect to // start daemon for bridge to connect to
daemon = Bun.serve({ daemon = Bun.serve({
port: DAEMON_PORT, port: DAEMON_PORT,
@ -75,8 +79,10 @@ describe("Bridge lifecycle", () => {
}); });
}); });
afterEach(() => { afterEach(async () => {
daemon.stop(); daemon.stop();
const { close: closeDb } = await import("../../src/db");
closeDb();
}); });
test("bridge starts and signals ready", async () => { test("bridge starts and signals ready", async () => {

View file

@ -1,5 +1,6 @@
import { afterAll, beforeAll, describe, expect, test } from "bun:test"; import { afterAll, beforeAll, describe, expect, test } from "bun:test";
import type { Server } from "bun"; import type { Server } from "bun";
import { close as closeDb, initDb } from "./db";
import type { ServerMessage } from "./protocol"; import type { ServerMessage } from "./protocol";
describe("awareness routing", () => { describe("awareness routing", () => {
@ -7,6 +8,8 @@ describe("awareness routing", () => {
const PORT = 4042; const PORT = 4042;
beforeAll(async () => { beforeAll(async () => {
// Initialize database before starting server
initDb(":memory:");
// Import and start server on test port // Import and start server on test port
process.env.PORT = String(PORT); process.env.PORT = String(PORT);
const mod = await import("./index"); const mod = await import("./index");
@ -15,6 +18,7 @@ describe("awareness routing", () => {
afterAll(() => { afterAll(() => {
server?.stop(); server?.stop();
closeDb();
}); });
test("awareness message routes to other peers in same room", async () => { test("awareness message routes to other peers in same room", async () => {

View file

@ -1,4 +1,5 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { close as closeDb, initDb } from "./db";
import type { ServerMessage } from "./protocol"; import type { ServerMessage } from "./protocol";
import { decode } from "./protocol"; import { decode } from "./protocol";
import { import {
@ -13,6 +14,8 @@ describe("WebSocket concurrent edits integration", () => {
const PORT = 4041; // use different port for tests const PORT = 4041; // use different port for tests
beforeEach(() => { beforeEach(() => {
// initialize database for each test
initDb(":memory:");
// start server for each test // start server for each test
server = Bun.serve<WsData>({ server = Bun.serve<WsData>({
port: PORT, port: PORT,
@ -79,6 +82,7 @@ describe("WebSocket concurrent edits integration", () => {
afterEach(() => { afterEach(() => {
server.stop(); server.stop();
closeDb();
}); });
test("two clients concurrent edits converge to same state", async () => { test("two clients concurrent edits converge to same state", async () => {

View file

@ -1,5 +1,6 @@
import { describe, expect, test } from "bun:test"; import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import * as Y from "yjs"; import * as Y from "yjs";
import { close as closeDb, initDb } from "./db";
import type { ServerMessage } from "./protocol"; import type { ServerMessage } from "./protocol";
import { import {
type Client, type Client,
@ -8,6 +9,14 @@ import {
type WsData, type WsData,
} from "./session"; } from "./session";
beforeEach(() => {
initDb(":memory:");
});
afterEach(() => {
closeDb();
});
describe("Session", () => { describe("Session", () => {
test("creates yjs doc on init", () => { test("creates yjs doc on init", () => {
const session = new Session("test-room"); const session = new Session("test-room");

View file

@ -1,6 +1,7 @@
import type { ServerWebSocket } from "bun"; import type { ServerWebSocket } from "bun";
import * as Y from "yjs"; import * as Y from "yjs";
import { type AwarenessState, encode } from "./protocol"; import { type AwarenessState, encode } from "./protocol";
import { saveUpdate, getUpdates } from "./db";
export interface WsData { export interface WsData {
room: string | null; room: string | null;
@ -49,6 +50,8 @@ export class Session {
applyUpdate(update: Uint8Array, from: Client) { applyUpdate(update: Uint8Array, from: Client) {
try { try {
Y.applyUpdate(this.doc, update); Y.applyUpdate(this.doc, update);
// Persist the update
saveUpdate(this.name, update);
// broadcast to others // broadcast to others
for (const client of this.clients) { for (const client of this.clients) {
if (client !== from) { if (client !== from) {
@ -106,6 +109,11 @@ export function getOrCreateSession(name: string): Session {
let session = sessions.get(name); let session = sessions.get(name);
if (!session) { if (!session) {
session = new Session(name); session = new Session(name);
// Load persisted updates
const updates = getUpdates(name);
for (const update of updates) {
Y.applyUpdate(session.doc, update);
}
sessions.set(name, session); sessions.set(name, session);
console.debug(`session created: ${name}`); console.debug(`session created: ${name}`);
} }