colabbd/src/session.ts

85 lines
2.1 KiB
TypeScript

import type { ServerWebSocket } from "bun";
import * as Y from "yjs";
import { encode } from "./protocol";
export interface Client {
ws: ServerWebSocket<{ room: string | null }>;
}
export class Session {
doc: Y.Doc;
clients: Set<Client> = new Set();
constructor(public name: string) {
this.doc = new Y.Doc();
}
join(client: Client) {
this.clients.add(client);
// send full state to new client
const state = Y.encodeStateAsUpdate(this.doc);
client.ws.send(encode({ type: "sync", data: Array.from(state) }));
this.broadcastPeerCount();
}
leave(client: Client) {
this.clients.delete(client);
this.broadcastPeerCount();
}
isEmpty(): boolean {
return this.clients.size === 0;
}
applyUpdate(update: Uint8Array, from: Client) {
try {
Y.applyUpdate(this.doc, update);
// broadcast to others
for (const client of this.clients) {
if (client !== from) {
client.ws.send(encode({ type: "update", data: Array.from(update) }));
}
}
} catch (err) {
console.error(`failed to apply update in session ${this.name}:`, err);
// optionally notify the client that sent the bad update
try {
from.ws.send(encode({ type: "error", message: "invalid update" }));
} catch {
// ignore send errors
}
}
}
broadcastPeerCount() {
const msg = encode({ type: "peers", count: this.clients.size });
for (const client of this.clients) {
client.ws.send(msg);
}
}
}
// room name -> session
const sessions = new Map<string, Session>();
export function getOrCreateSession(name: string): Session {
let session = sessions.get(name);
if (!session) {
session = new Session(name);
sessions.set(name, session);
console.debug(`session created: ${name}`);
}
return session;
}
export function getSession(name: string): Session | undefined {
return sessions.get(name);
}
export function removeSession(name: string): void {
const session = sessions.get(name);
if (session?.isEmpty()) {
sessions.delete(name);
console.debug(`session removed: ${name}`);
}
}