src/session/session-events.service.ts
Service for managing session events and SSE streams. Provides real-time session status updates via Server-Sent Events.
Properties |
|
Methods |
| getSessionEvents | |||||||||||||||
getSessionEvents(sessionId: string, _includeFullSession: unknown)
|
|||||||||||||||
|
Defined in src/session/session-events.service.ts:43
|
|||||||||||||||
|
Get an observable stream of events for a specific session. Used by the SSE endpoint to stream updates to clients.
Parameters :
Returns :
Observable<MessageEvent>
|
| handleSessionStatusChanged | ||||||
handleSessionStatusChanged(event: SessionStatusChangedEvent)
|
||||||
Decorators :
@OnEvent(SESSION_STATUS_CHANGED)
|
||||||
|
Defined in src/session/session-events.service.ts:68
|
||||||
|
Internal event handler for session status changes. Forwards events to the Subject for SSE subscribers.
Parameters :
Returns :
void
|
| Private Readonly eventSubject |
Type : unknown
|
Default value : new Subject<SessionStatusChangedEvent>()
|
|
Defined in src/session/session-events.service.ts:34
|
| Private Readonly logger |
Type : unknown
|
Default value : new Logger(SessionEventsService.name)
|
|
Defined in src/session/session-events.service.ts:33
|
import { Injectable, Logger } from "@nestjs/common";
import { OnEvent } from "@nestjs/event-emitter";
import { filter, map, Observable, Subject } from "rxjs";
import { Session, SessionStatus } from "./entities/session.entity";
/**
* Event payload emitted when a session status changes.
*/
export interface SessionStatusChangedEvent {
sessionId: string;
status: SessionStatus;
updatedAt: Date;
session?: Session; // Full session data (only included for authenticated requests)
}
/**
* SSE message format sent to clients.
*/
export interface SessionEventMessage {
id: string;
status: SessionStatus;
updatedAt: string;
}
export const SESSION_STATUS_CHANGED = "session.status.changed";
/**
* Service for managing session events and SSE streams.
* Provides real-time session status updates via Server-Sent Events.
*/
@Injectable()
export class SessionEventsService {
private readonly logger = new Logger(SessionEventsService.name);
private readonly eventSubject = new Subject<SessionStatusChangedEvent>();
/**
* Get an observable stream of events for a specific session.
* Used by the SSE endpoint to stream updates to clients.
*
* @param sessionId - The session ID to subscribe to
* @param includeFullSession - Whether to include full session data (requires auth)
*/
getSessionEvents(
sessionId: string,
_includeFullSession = false,
): Observable<MessageEvent> {
return this.eventSubject.pipe(
filter((event) => event.sessionId === sessionId),
map((event) => {
const data: SessionEventMessage = {
id: event.sessionId,
status: event.status,
updatedAt: event.updatedAt.toISOString(),
};
return new MessageEvent("message", {
data: JSON.stringify(data),
});
}),
);
}
/**
* Internal event handler for session status changes.
* Forwards events to the Subject for SSE subscribers.
*/
@OnEvent(SESSION_STATUS_CHANGED)
handleSessionStatusChanged(event: SessionStatusChangedEvent): void {
this.logger.debug(
`Session ${event.sessionId} status changed to ${event.status}`,
);
// Forward to Subject for SSE subscribers
this.eventSubject.next(event);
}
}