From 67b5fb8532ce600ed27753f0907a992127066400 Mon Sep 17 00:00:00 2001 From: Bastian Wagner Date: Wed, 10 Jun 2026 14:35:18 +0200 Subject: [PATCH] =?UTF-8?q?sse=20f=C3=BCr=20live=20collab=20eingebaut?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/lists/list-realtime.service.ts | 97 ++++++++ listify-api/src/lists/lists.controller.ts | 10 + listify-api/src/lists/lists.module.ts | 5 +- listify-api/src/lists/lists.service.spec.ts | 52 +++++ listify-api/src/lists/lists.service.ts | 35 ++- .../list-detail/list-detail.component.ts | 49 +++- .../src/app/lists/lists-realtime.service.ts | 217 ++++++++++++++++++ .../src/app/lists/lists.component.ts | 43 +++- listify-client/src/app/lists/lists.models.ts | 16 ++ 9 files changed, 507 insertions(+), 17 deletions(-) create mode 100644 listify-api/src/lists/list-realtime.service.ts create mode 100644 listify-client/src/app/lists/lists-realtime.service.ts diff --git a/listify-api/src/lists/list-realtime.service.ts b/listify-api/src/lists/list-realtime.service.ts new file mode 100644 index 0000000..d14e45a --- /dev/null +++ b/listify-api/src/lists/list-realtime.service.ts @@ -0,0 +1,97 @@ +import { Injectable, MessageEvent } from '@nestjs/common'; +import { Observable, Observer } from 'rxjs'; +import { UserList } from '../list-templates/list-template.types'; + +export type ListRealtimeEvent = + | { + type: 'list.snapshot'; + data: UserList; + } + | { + type: 'list.deleted'; + data: { listId: string }; + } + | { + type: 'heartbeat'; + data: { at: string }; + }; + +@Injectable() +export class ListRealtimeService { + // In-memory SSE fanout for one API process. If the API is scaled horizontally, + // replace this map with a shared pub/sub backend while keeping the event shape. + private readonly channels = new Map>>(); + + /** + * Opens an owner-scoped stream. The controller authenticates the request before + * calling this method, so subscribers only receive their own list events. + */ + eventsFor(ownerId: string): Observable { + return new Observable((observer) => { + this.addObserver(ownerId, observer); + + // Keep proxies and browsers from treating an otherwise quiet stream as idle. + const heartbeatInterval = setInterval(() => { + observer.next({ + type: 'heartbeat', + data: { at: new Date().toISOString() }, + }); + }, 25_000); + + return () => { + clearInterval(heartbeatInterval); + this.removeObserver(ownerId, observer); + }; + }); + } + + publishSnapshot(ownerId: string, list: UserList): void { + this.publish(ownerId, { + type: 'list.snapshot', + data: list, + }); + } + + publishDeleted(ownerId: string, listId: string): void { + this.publish(ownerId, { + type: 'list.deleted', + data: { listId }, + }); + } + + private publish(ownerId: string, event: ListRealtimeEvent): void { + const observers = this.channels.get(ownerId); + + if (!observers) { + return; + } + + observers.forEach((observer) => observer.next(event)); + } + + private addObserver( + ownerId: string, + observer: Observer, + ): void { + const observers = this.channels.get(ownerId) ?? new Set(); + observers.add(observer); + this.channels.set(ownerId, observers); + } + + private removeObserver( + ownerId: string, + observer: Observer, + ): void { + const observers = this.channels.get(ownerId); + + if (!observers) { + return; + } + + observers.delete(observer); + + if (observers.size === 0) { + this.channels.delete(ownerId); + } + } +} diff --git a/listify-api/src/lists/lists.controller.ts b/listify-api/src/lists/lists.controller.ts index 5258e45..d004429 100644 --- a/listify-api/src/lists/lists.controller.ts +++ b/listify-api/src/lists/lists.controller.ts @@ -7,22 +7,27 @@ import { Patch, Post, Req, + Sse, UnauthorizedException, UseGuards, } from '@nestjs/common'; +import { Observable } from 'rxjs'; import { JwtAuthGuard } from '../auth/jwt-auth.guard'; import { AuthService } from '../auth/auth.service'; import { CreateListDto } from './dto/create-list.dto'; import { AddListItemDto, UpdateListItemDto } from './dto/list-item.dto'; import { UpdateListDto } from './dto/update-list.dto'; +import { ListRealtimeService } from './list-realtime.service'; import { ListsService } from './lists.service'; import type { AuthenticatedRequest } from '../auth/auth.types'; +import type { MessageEvent } from '@nestjs/common'; @Controller('lists') @UseGuards(JwtAuthGuard) export class ListsController { constructor( private readonly authService: AuthService, + private readonly listRealtimeService: ListRealtimeService, private readonly listsService: ListsService, ) {} @@ -39,6 +44,11 @@ export class ListsController { return this.listsService.listLists(this.requireUserId(request)); } + @Sse('events') + listEvents(@Req() request: AuthenticatedRequest): Observable { + return this.listRealtimeService.eventsFor(this.requireUserId(request)); + } + @Get(':listId') getList( @Req() request: AuthenticatedRequest, diff --git a/listify-api/src/lists/lists.module.ts b/listify-api/src/lists/lists.module.ts index 3f96e2f..8faa946 100644 --- a/listify-api/src/lists/lists.module.ts +++ b/listify-api/src/lists/lists.module.ts @@ -3,6 +3,7 @@ import { TypeOrmModule } from '@nestjs/typeorm'; import { AuditModule } from '../audit/audit.module'; import { AuthModule } from '../auth/auth.module'; import { ListsController } from './lists.controller'; +import { ListRealtimeService } from './list-realtime.service'; import { ListsService } from './lists.service'; import { UserListEntity } from './user-list.entity'; import { UserListItemEntity } from './user-list-item.entity'; @@ -14,7 +15,7 @@ import { UserListItemEntity } from './user-list-item.entity'; TypeOrmModule.forFeature([UserListEntity, UserListItemEntity]), ], controllers: [ListsController], - providers: [ListsService], - exports: [ListsService], + providers: [ListRealtimeService, ListsService], + exports: [ListRealtimeService, ListsService], }) export class ListsModule {} diff --git a/listify-api/src/lists/lists.service.spec.ts b/listify-api/src/lists/lists.service.spec.ts index 94b2ac1..aec234b 100644 --- a/listify-api/src/lists/lists.service.spec.ts +++ b/listify-api/src/lists/lists.service.spec.ts @@ -1,6 +1,7 @@ import { ForbiddenException, NotFoundException } from '@nestjs/common'; import { ListTemplate } from '../list-templates/list-template.types'; import { InMemoryRepository } from '../testing/in-memory-repository'; +import { ListRealtimeService } from './list-realtime.service'; import { ListsService } from './lists.service'; import { UserListEntity } from './user-list.entity'; import { UserListItemEntity } from './user-list-item.entity'; @@ -48,6 +49,57 @@ describe('ListsService', () => { await expect(service.listLists('user-1')).resolves.toHaveLength(0); }); + it('publishes realtime snapshots and deletions for the owning user', async () => { + const realtimeService = { + publishDeleted: jest.fn(), + publishSnapshot: jest.fn(), + } satisfies Partial; + service = new ListsService( + new InMemoryRepository() as never, + new InMemoryRepository() as never, + undefined, + realtimeService as never, + ); + + const list = await service.createList('user-1', { + name: 'Live Liste', + kind: 'todo', + }); + const withItem = await service.addItem('user-1', list.id, { + title: 'Eintrag', + }); + + await service.updateItem( + 'user-1', + list.id, + withItem.items[0].id, + { checked: true }, + 'Test User', + ); + await service.deleteList('user-1', list.id); + + expect(realtimeService.publishSnapshot).toHaveBeenCalledWith( + 'user-1', + expect.objectContaining({ id: list.id, name: 'Live Liste' }), + ); + expect(realtimeService.publishSnapshot).toHaveBeenCalledWith( + 'user-1', + expect.objectContaining({ + id: list.id, + items: [ + expect.objectContaining({ + id: withItem.items[0].id, + checked: true, + }), + ], + }), + ); + expect(realtimeService.publishDeleted).toHaveBeenCalledWith( + 'user-1', + list.id, + ); + }); + it('adds, updates, checks and deletes list items', async () => { const list = await service.createList('user-1', { name: 'Einkauf', diff --git a/listify-api/src/lists/lists.service.ts b/listify-api/src/lists/lists.service.ts index 55a0a9c..6c1d5b1 100644 --- a/listify-api/src/lists/lists.service.ts +++ b/listify-api/src/lists/lists.service.ts @@ -18,6 +18,7 @@ import { import { CreateListFromTemplateDto } from '../list-templates/dto/create-list-from-template.dto'; import { AddListItemDto, UpdateListItemDto } from './dto/list-item.dto'; import { CreateListDto } from './dto/create-list.dto'; +import { ListRealtimeService } from './list-realtime.service'; import { UpdateListDto } from './dto/update-list.dto'; import { UserListEntity } from './user-list.entity'; import { UserListItemEntity } from './user-list-item.entity'; @@ -31,6 +32,8 @@ export class ListsService { private readonly listItemsRepository: Repository, @Optional() private readonly auditLogService?: AuditLogService, + @Optional() + private readonly listRealtimeService?: ListRealtimeService, ) {} async createList(ownerId: string, createDto: CreateListDto): Promise { @@ -56,7 +59,10 @@ export class ListsService { }, }); - return this.toUserList(savedList); + const userList = this.toUserList(savedList); + this.listRealtimeService?.publishSnapshot(ownerId, userList); + + return userList; } async createListFromTemplate( @@ -108,7 +114,10 @@ export class ListsService { }, }); - return this.toUserList(savedList); + const userList = this.toUserList(savedList); + this.listRealtimeService?.publishSnapshot(ownerId, userList); + + return userList; } async listLists(ownerId: string): Promise { @@ -160,7 +169,10 @@ export class ListsService { }, }); - return this.toUserList(savedList); + const userList = this.toUserList(savedList); + this.listRealtimeService?.publishSnapshot(ownerId, userList); + + return userList; } async deleteList(ownerId: string, listId: string): Promise<{ message: string }> { @@ -180,6 +192,8 @@ export class ListsService { metadata, }); + this.listRealtimeService?.publishDeleted(ownerId, listId); + return { message: 'List deleted.' }; } @@ -209,7 +223,10 @@ export class ListsService { }, }); - return this.getList(ownerId, listId); + const updatedList = await this.getList(ownerId, listId); + this.listRealtimeService?.publishSnapshot(ownerId, updatedList); + + return updatedList; } async updateItem( @@ -279,7 +296,10 @@ export class ListsService { }, }); - return this.getList(ownerId, listId); + const updatedList = await this.getList(ownerId, listId); + this.listRealtimeService?.publishSnapshot(ownerId, updatedList); + + return updatedList; } async deleteItem( @@ -315,7 +335,10 @@ export class ListsService { }, }); - return this.getList(ownerId, listId); + const updatedList = await this.getList(ownerId, listId); + this.listRealtimeService?.publishSnapshot(ownerId, updatedList); + + return updatedList; } private async findOwnedList( diff --git a/listify-client/src/app/lists/list-detail/list-detail.component.ts b/listify-client/src/app/lists/list-detail/list-detail.component.ts index 1b1ec32..b058c55 100644 --- a/listify-client/src/app/lists/list-detail/list-detail.component.ts +++ b/listify-client/src/app/lists/list-detail/list-detail.component.ts @@ -1,5 +1,6 @@ import { DatePipe } from '@angular/common'; -import { Component, OnInit, computed, inject, signal } from '@angular/core'; +import { Component, DestroyRef, OnInit, computed, inject, signal } from '@angular/core'; +import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; import { NonNullableFormBuilder, ReactiveFormsModule, Validators } from '@angular/forms'; import { ActivatedRoute, Router, RouterLink } from '@angular/router'; import { finalize } from 'rxjs'; @@ -13,7 +14,8 @@ import { MatProgressSpinnerModule } from '@angular/material/progress-spinner'; import { MatSnackBar, MatSnackBarModule } from '@angular/material/snack-bar'; import { getAuthErrorMessage } from '../../auth/error-message'; import { OnboardingService } from '../../onboarding/onboarding.service'; -import { UserList, UserListItem } from '../lists.models'; +import { ListRealtimeEvent, UserList, UserListItem } from '../lists.models'; +import { ListsRealtimeService } from '../lists-realtime.service'; import { ListsService } from '../lists.service'; @Component({ @@ -35,8 +37,10 @@ import { ListsService } from '../lists.service'; styleUrl: './list-detail.component.scss', }) export class ListDetailComponent implements OnInit { + private readonly destroyRef = inject(DestroyRef); private readonly formBuilder = inject(NonNullableFormBuilder); private readonly listsService = inject(ListsService); + private readonly listsRealtimeService = inject(ListsRealtimeService); private readonly route = inject(ActivatedRoute); private readonly router = inject(Router); private readonly snackBar = inject(MatSnackBar); @@ -77,6 +81,7 @@ export class ListDetailComponent implements OnInit { if (listId) { this.onboarding.listOpened(listId); + this.subscribeToRealtime(listId); } this.loadList(); @@ -233,12 +238,42 @@ export class ListDetailComponent implements OnInit { await this.router.navigateByUrl('/lists'); } - private setList(list: UserList): void { + private subscribeToRealtime(listId: string): void { + this.listsRealtimeService + .events() + .pipe(takeUntilDestroyed(this.destroyRef)) + .subscribe({ + next: (event) => this.applyRealtimeEvent(listId, event), + }); + } + + private applyRealtimeEvent(listId: string, event: ListRealtimeEvent): void { + if (event.type === 'list.snapshot' && event.data.id === listId) { + this.errorMessage.set(null); + this.loading.set(false); + // Remote snapshots should update visible item state immediately, but they + // must not overwrite a title/description form while the user is editing it. + this.setList(event.data, !this.showEditor()); + return; + } + + if (event.type === 'list.deleted' && event.data.listId === listId) { + this.list.set(null); + this.loading.set(false); + this.editing.set(false); + this.errorMessage.set('Diese Liste wurde geloescht.'); + } + } + + private setList(list: UserList, resetForm = true): void { this.list.set(list); - this.listForm.reset({ - name: list.name, - description: list.description ?? '', - }); + + if (resetForm) { + this.listForm.reset({ + name: list.name, + description: list.description ?? '', + }); + } } private listId(): string | null { diff --git a/listify-client/src/app/lists/lists-realtime.service.ts b/listify-client/src/app/lists/lists-realtime.service.ts new file mode 100644 index 0000000..9f91682 --- /dev/null +++ b/listify-client/src/app/lists/lists-realtime.service.ts @@ -0,0 +1,217 @@ +import { Injectable, inject } from '@angular/core'; +import { Router } from '@angular/router'; +import { Observable, Subject, firstValueFrom, share } from 'rxjs'; +import { AuthService } from '../auth/auth.service'; +import { ListRealtimeEvent } from './lists.models'; + +const EVENTS_URL = '/api/lists/events'; +const INITIAL_RECONNECT_DELAY_MS = 1_000; +const MAX_RECONNECT_DELAY_MS = 10_000; + +@Injectable({ providedIn: 'root' }) +export class ListsRealtimeService { + private readonly auth = inject(AuthService); + private readonly router = inject(Router); + private events$?: Observable; + + /** + * Shared live list stream for the current browser session. Native EventSource + * cannot send Authorization headers, so this uses fetch and parses SSE frames. + */ + events(): Observable { + this.events$ ??= new Observable((observer) => { + let stopped = false; + let reconnectDelay = INITIAL_RECONNECT_DELAY_MS; + let reconnectTimeout: ReturnType | undefined; + let abortController: AbortController | undefined; + + const scheduleReconnect = () => { + if (stopped) { + return; + } + + reconnectTimeout = setTimeout(() => { + reconnectDelay = Math.min( + reconnectDelay * 2, + MAX_RECONNECT_DELAY_MS, + ); + void connect(); + }, reconnectDelay); + }; + + const connect = async () => { + abortController = new AbortController(); + + try { + const response = await this.openStream(abortController.signal); + + reconnectDelay = INITIAL_RECONNECT_DELAY_MS; + await this.readEvents(response, observer, abortController.signal); + + if (!stopped && !abortController.signal.aborted) { + scheduleReconnect(); + } + } catch (error: unknown) { + if (stopped || abortController.signal.aborted) { + return; + } + + if (this.isUnauthorizedResponse(error)) { + const refreshed = await this.tryRefreshSession(); + + if (!refreshed) { + observer.error(error); + return; + } + } + + scheduleReconnect(); + } + }; + + void connect(); + + return () => { + stopped = true; + abortController?.abort(); + + if (reconnectTimeout) { + clearTimeout(reconnectTimeout); + } + }; + }).pipe( + share({ + connector: () => new Subject(), + resetOnComplete: true, + resetOnError: true, + resetOnRefCountZero: true, + }), + ); + + return this.events$; + } + + private async openStream(signal: AbortSignal): Promise { + const token = this.auth.accessToken(); + + if (!token) { + throw new Response(null, { status: 401 }); + } + + const response = await fetch(EVENTS_URL, { + headers: { + Accept: 'text/event-stream', + Authorization: `Bearer ${token}`, + }, + signal, + }); + + if (response.status === 401) { + throw response; + } + + if (!response.ok || !response.body) { + throw response; + } + + return response; + } + + private async readEvents( + response: Response, + observer: { next: (event: ListRealtimeEvent) => void }, + signal: AbortSignal, + ): Promise { + const reader = response.body?.getReader(); + + if (!reader) { + throw new Error('Realtime response body is missing.'); + } + + const decoder = new TextDecoder(); + let buffer = ''; + + while (!signal.aborted) { + const { done, value } = await reader.read(); + + if (done) { + break; + } + + buffer += decoder.decode(value, { stream: true }); + buffer = this.flushEventBuffer(buffer, observer); + } + } + + private flushEventBuffer( + buffer: string, + observer: { next: (event: ListRealtimeEvent) => void }, + ): string { + const normalizedBuffer = buffer.replace(/\r\n/g, '\n'); + const blocks = normalizedBuffer.split('\n\n'); + const remainingBuffer = blocks.pop() ?? ''; + + blocks + .map((block) => this.parseEventBlock(block)) + .filter((event): event is ListRealtimeEvent => event !== null) + .forEach((event) => observer.next(event)); + + return remainingBuffer; + } + + private parseEventBlock(block: string): ListRealtimeEvent | null { + let eventType = 'message'; + const dataLines: string[] = []; + + block.split('\n').forEach((line) => { + if (line.startsWith(':')) { + return; + } + + if (line.startsWith('event:')) { + eventType = line.slice('event:'.length).trim(); + return; + } + + if (line.startsWith('data:')) { + dataLines.push(line.slice('data:'.length).trimStart()); + } + }); + + if (dataLines.length === 0) { + return null; + } + + const data = JSON.parse(dataLines.join('\n')) as unknown; + + if ( + eventType === 'list.snapshot' || + eventType === 'list.deleted' || + eventType === 'heartbeat' + ) { + return { + type: eventType, + data, + } as ListRealtimeEvent; + } + + return null; + } + + // Treat one 401 as an expired access token. If refresh fails, the normal auth + // logout flow takes over and the stream stops reconnecting with stale tokens. + private isUnauthorizedResponse(error: unknown): boolean { + return error instanceof Response && error.status === 401; + } + + private async tryRefreshSession(): Promise { + try { + await firstValueFrom(this.auth.refreshSession()); + return true; + } catch { + this.auth.logout(); + await this.router.navigateByUrl('/login'); + return false; + } + } +} diff --git a/listify-client/src/app/lists/lists.component.ts b/listify-client/src/app/lists/lists.component.ts index c3eeeb9..25e905c 100644 --- a/listify-client/src/app/lists/lists.component.ts +++ b/listify-client/src/app/lists/lists.component.ts @@ -1,5 +1,6 @@ import { DatePipe } from '@angular/common'; -import { Component, OnInit, computed, inject, signal } from '@angular/core'; +import { Component, DestroyRef, OnInit, computed, inject, signal } from '@angular/core'; +import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; import { RouterLink } from '@angular/router'; import { MatButtonModule } from '@angular/material/button'; import { MatButtonToggleModule } from '@angular/material/button-toggle'; @@ -12,7 +13,8 @@ import { MatSelectModule } from '@angular/material/select'; import { getAuthErrorMessage } from '../auth/error-message'; import { OnboardingService } from '../onboarding/onboarding.service'; import { ListTemplateKind } from '../templates/templates.models'; -import { UserList } from './lists.models'; +import { ListRealtimeEvent, UserList } from './lists.models'; +import { ListsRealtimeService } from './lists-realtime.service'; import { ListsService } from './lists.service'; type ListStatusFilter = 'all' | 'open' | 'completed'; @@ -42,7 +44,9 @@ type ListKindFilter = ListTemplateKind | 'all'; styleUrls: ['../workspace-page.scss', './lists.component.scss'], }) export class ListsComponent implements OnInit { + private readonly destroyRef = inject(DestroyRef); private readonly listsService = inject(ListsService); + private readonly listsRealtimeService = inject(ListsRealtimeService); protected readonly onboarding = inject(OnboardingService); protected readonly lists = signal([]); @@ -114,6 +118,7 @@ export class ListsComponent implements OnInit { ngOnInit(): void { this.loadLists(); + this.subscribeToRealtime(); } protected loadLists(): void { @@ -162,6 +167,40 @@ export class ListsComponent implements OnInit { this.sortOption.set('updated-desc'); } + private subscribeToRealtime(): void { + this.listsRealtimeService + .events() + .pipe(takeUntilDestroyed(this.destroyRef)) + .subscribe({ + next: (event) => this.applyRealtimeEvent(event), + }); + } + + private applyRealtimeEvent(event: ListRealtimeEvent): void { + if (event.type === 'list.snapshot') { + // The overview keeps its local collection hot so progress and timestamps + // update when another tab changes a list. + this.lists.update((lists) => { + const existingIndex = lists.findIndex((list) => list.id === event.data.id); + + if (existingIndex === -1) { + return [...lists, event.data]; + } + + return lists.map((list, index) => + index === existingIndex ? event.data : list, + ); + }); + return; + } + + if (event.type === 'list.deleted') { + this.lists.update((lists) => + lists.filter((list) => list.id !== event.data.listId), + ); + } + } + private compareLists(a: UserList, b: UserList): number { switch (this.sortOption()) { case 'created-desc': diff --git a/listify-client/src/app/lists/lists.models.ts b/listify-client/src/app/lists/lists.models.ts index 0e193ed..bb913a0 100644 --- a/listify-client/src/app/lists/lists.models.ts +++ b/listify-client/src/app/lists/lists.models.ts @@ -54,3 +54,19 @@ export interface UpdateListItemRequest { required?: boolean; checked?: boolean; } + +// Wire contract for /api/lists/events. Keep this in sync with the API +// ListRealtimeEvent type; consumers should ignore unknown event types. +export type ListRealtimeEvent = + | { + type: 'list.snapshot'; + data: UserList; + } + | { + type: 'list.deleted'; + data: { listId: string }; + } + | { + type: 'heartbeat'; + data: { at: string }; + };