sse für live collab eingebaut

This commit is contained in:
Bastian Wagner
2026-06-10 14:35:18 +02:00
parent 93a91c7bf6
commit 67b5fb8532
9 changed files with 507 additions and 17 deletions

View File

@@ -1,5 +1,6 @@
import { DatePipe } from '@angular/common';
import { Component, OnInit, computed, inject, signal } from '@angular/core';
import { Component, DestroyRef, OnInit, computed, inject, signal } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { NonNullableFormBuilder, ReactiveFormsModule, Validators } from '@angular/forms';
import { ActivatedRoute, Router, RouterLink } from '@angular/router';
import { finalize } from 'rxjs';
@@ -13,7 +14,8 @@ import { MatProgressSpinnerModule } from '@angular/material/progress-spinner';
import { MatSnackBar, MatSnackBarModule } from '@angular/material/snack-bar';
import { getAuthErrorMessage } from '../../auth/error-message';
import { OnboardingService } from '../../onboarding/onboarding.service';
import { UserList, UserListItem } from '../lists.models';
import { ListRealtimeEvent, UserList, UserListItem } from '../lists.models';
import { ListsRealtimeService } from '../lists-realtime.service';
import { ListsService } from '../lists.service';
@Component({
@@ -35,8 +37,10 @@ import { ListsService } from '../lists.service';
styleUrl: './list-detail.component.scss',
})
export class ListDetailComponent implements OnInit {
private readonly destroyRef = inject(DestroyRef);
private readonly formBuilder = inject(NonNullableFormBuilder);
private readonly listsService = inject(ListsService);
private readonly listsRealtimeService = inject(ListsRealtimeService);
private readonly route = inject(ActivatedRoute);
private readonly router = inject(Router);
private readonly snackBar = inject(MatSnackBar);
@@ -77,6 +81,7 @@ export class ListDetailComponent implements OnInit {
if (listId) {
this.onboarding.listOpened(listId);
this.subscribeToRealtime(listId);
}
this.loadList();
@@ -233,12 +238,42 @@ export class ListDetailComponent implements OnInit {
await this.router.navigateByUrl('/lists');
}
private setList(list: UserList): void {
private subscribeToRealtime(listId: string): void {
this.listsRealtimeService
.events()
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe({
next: (event) => this.applyRealtimeEvent(listId, event),
});
}
private applyRealtimeEvent(listId: string, event: ListRealtimeEvent): void {
if (event.type === 'list.snapshot' && event.data.id === listId) {
this.errorMessage.set(null);
this.loading.set(false);
// Remote snapshots should update visible item state immediately, but they
// must not overwrite a title/description form while the user is editing it.
this.setList(event.data, !this.showEditor());
return;
}
if (event.type === 'list.deleted' && event.data.listId === listId) {
this.list.set(null);
this.loading.set(false);
this.editing.set(false);
this.errorMessage.set('Diese Liste wurde geloescht.');
}
}
private setList(list: UserList, resetForm = true): void {
this.list.set(list);
this.listForm.reset({
name: list.name,
description: list.description ?? '',
});
if (resetForm) {
this.listForm.reset({
name: list.name,
description: list.description ?? '',
});
}
}
private listId(): string | null {

View File

@@ -0,0 +1,217 @@
import { Injectable, inject } from '@angular/core';
import { Router } from '@angular/router';
import { Observable, Subject, firstValueFrom, share } from 'rxjs';
import { AuthService } from '../auth/auth.service';
import { ListRealtimeEvent } from './lists.models';
const EVENTS_URL = '/api/lists/events';
const INITIAL_RECONNECT_DELAY_MS = 1_000;
const MAX_RECONNECT_DELAY_MS = 10_000;
@Injectable({ providedIn: 'root' })
export class ListsRealtimeService {
private readonly auth = inject(AuthService);
private readonly router = inject(Router);
private events$?: Observable<ListRealtimeEvent>;
/**
* Shared live list stream for the current browser session. Native EventSource
* cannot send Authorization headers, so this uses fetch and parses SSE frames.
*/
events(): Observable<ListRealtimeEvent> {
this.events$ ??= new Observable<ListRealtimeEvent>((observer) => {
let stopped = false;
let reconnectDelay = INITIAL_RECONNECT_DELAY_MS;
let reconnectTimeout: ReturnType<typeof setTimeout> | undefined;
let abortController: AbortController | undefined;
const scheduleReconnect = () => {
if (stopped) {
return;
}
reconnectTimeout = setTimeout(() => {
reconnectDelay = Math.min(
reconnectDelay * 2,
MAX_RECONNECT_DELAY_MS,
);
void connect();
}, reconnectDelay);
};
const connect = async () => {
abortController = new AbortController();
try {
const response = await this.openStream(abortController.signal);
reconnectDelay = INITIAL_RECONNECT_DELAY_MS;
await this.readEvents(response, observer, abortController.signal);
if (!stopped && !abortController.signal.aborted) {
scheduleReconnect();
}
} catch (error: unknown) {
if (stopped || abortController.signal.aborted) {
return;
}
if (this.isUnauthorizedResponse(error)) {
const refreshed = await this.tryRefreshSession();
if (!refreshed) {
observer.error(error);
return;
}
}
scheduleReconnect();
}
};
void connect();
return () => {
stopped = true;
abortController?.abort();
if (reconnectTimeout) {
clearTimeout(reconnectTimeout);
}
};
}).pipe(
share({
connector: () => new Subject<ListRealtimeEvent>(),
resetOnComplete: true,
resetOnError: true,
resetOnRefCountZero: true,
}),
);
return this.events$;
}
private async openStream(signal: AbortSignal): Promise<Response> {
const token = this.auth.accessToken();
if (!token) {
throw new Response(null, { status: 401 });
}
const response = await fetch(EVENTS_URL, {
headers: {
Accept: 'text/event-stream',
Authorization: `Bearer ${token}`,
},
signal,
});
if (response.status === 401) {
throw response;
}
if (!response.ok || !response.body) {
throw response;
}
return response;
}
private async readEvents(
response: Response,
observer: { next: (event: ListRealtimeEvent) => void },
signal: AbortSignal,
): Promise<void> {
const reader = response.body?.getReader();
if (!reader) {
throw new Error('Realtime response body is missing.');
}
const decoder = new TextDecoder();
let buffer = '';
while (!signal.aborted) {
const { done, value } = await reader.read();
if (done) {
break;
}
buffer += decoder.decode(value, { stream: true });
buffer = this.flushEventBuffer(buffer, observer);
}
}
private flushEventBuffer(
buffer: string,
observer: { next: (event: ListRealtimeEvent) => void },
): string {
const normalizedBuffer = buffer.replace(/\r\n/g, '\n');
const blocks = normalizedBuffer.split('\n\n');
const remainingBuffer = blocks.pop() ?? '';
blocks
.map((block) => this.parseEventBlock(block))
.filter((event): event is ListRealtimeEvent => event !== null)
.forEach((event) => observer.next(event));
return remainingBuffer;
}
private parseEventBlock(block: string): ListRealtimeEvent | null {
let eventType = 'message';
const dataLines: string[] = [];
block.split('\n').forEach((line) => {
if (line.startsWith(':')) {
return;
}
if (line.startsWith('event:')) {
eventType = line.slice('event:'.length).trim();
return;
}
if (line.startsWith('data:')) {
dataLines.push(line.slice('data:'.length).trimStart());
}
});
if (dataLines.length === 0) {
return null;
}
const data = JSON.parse(dataLines.join('\n')) as unknown;
if (
eventType === 'list.snapshot' ||
eventType === 'list.deleted' ||
eventType === 'heartbeat'
) {
return {
type: eventType,
data,
} as ListRealtimeEvent;
}
return null;
}
// Treat one 401 as an expired access token. If refresh fails, the normal auth
// logout flow takes over and the stream stops reconnecting with stale tokens.
private isUnauthorizedResponse(error: unknown): boolean {
return error instanceof Response && error.status === 401;
}
private async tryRefreshSession(): Promise<boolean> {
try {
await firstValueFrom(this.auth.refreshSession());
return true;
} catch {
this.auth.logout();
await this.router.navigateByUrl('/login');
return false;
}
}
}

View File

@@ -1,5 +1,6 @@
import { DatePipe } from '@angular/common';
import { Component, OnInit, computed, inject, signal } from '@angular/core';
import { Component, DestroyRef, OnInit, computed, inject, signal } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { RouterLink } from '@angular/router';
import { MatButtonModule } from '@angular/material/button';
import { MatButtonToggleModule } from '@angular/material/button-toggle';
@@ -12,7 +13,8 @@ import { MatSelectModule } from '@angular/material/select';
import { getAuthErrorMessage } from '../auth/error-message';
import { OnboardingService } from '../onboarding/onboarding.service';
import { ListTemplateKind } from '../templates/templates.models';
import { UserList } from './lists.models';
import { ListRealtimeEvent, UserList } from './lists.models';
import { ListsRealtimeService } from './lists-realtime.service';
import { ListsService } from './lists.service';
type ListStatusFilter = 'all' | 'open' | 'completed';
@@ -42,7 +44,9 @@ type ListKindFilter = ListTemplateKind | 'all';
styleUrls: ['../workspace-page.scss', './lists.component.scss'],
})
export class ListsComponent implements OnInit {
private readonly destroyRef = inject(DestroyRef);
private readonly listsService = inject(ListsService);
private readonly listsRealtimeService = inject(ListsRealtimeService);
protected readonly onboarding = inject(OnboardingService);
protected readonly lists = signal<UserList[]>([]);
@@ -114,6 +118,7 @@ export class ListsComponent implements OnInit {
ngOnInit(): void {
this.loadLists();
this.subscribeToRealtime();
}
protected loadLists(): void {
@@ -162,6 +167,40 @@ export class ListsComponent implements OnInit {
this.sortOption.set('updated-desc');
}
private subscribeToRealtime(): void {
this.listsRealtimeService
.events()
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe({
next: (event) => this.applyRealtimeEvent(event),
});
}
private applyRealtimeEvent(event: ListRealtimeEvent): void {
if (event.type === 'list.snapshot') {
// The overview keeps its local collection hot so progress and timestamps
// update when another tab changes a list.
this.lists.update((lists) => {
const existingIndex = lists.findIndex((list) => list.id === event.data.id);
if (existingIndex === -1) {
return [...lists, event.data];
}
return lists.map((list, index) =>
index === existingIndex ? event.data : list,
);
});
return;
}
if (event.type === 'list.deleted') {
this.lists.update((lists) =>
lists.filter((list) => list.id !== event.data.listId),
);
}
}
private compareLists(a: UserList, b: UserList): number {
switch (this.sortOption()) {
case 'created-desc':

View File

@@ -54,3 +54,19 @@ export interface UpdateListItemRequest {
required?: boolean;
checked?: boolean;
}
// Wire contract for /api/lists/events. Keep this in sync with the API
// ListRealtimeEvent type; consumers should ignore unknown event types.
export type ListRealtimeEvent =
| {
type: 'list.snapshot';
data: UserList;
}
| {
type: 'list.deleted';
data: { listId: string };
}
| {
type: 'heartbeat';
data: { at: string };
};