Street includes a bounded WebSocket server built on the ws library. It enforces a maximum connection count, sends periodic heartbeats to detect dead connections, and exposes a typed event emitter API.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import 'reflect-metadata';
import {
streetApp, StreetWebSocketServer, StreetSocket,
container, TelemetryTracker,
} from '@streetjs/core';
const app = streetApp({ port: 3000 });
const wss = new StreetWebSocketServer({
heartbeatIntervalMs: 30_000, // ping every 30s
maxConnections: 10_000, // reject with 1013 when exceeded
});
container.register(StreetWebSocketServer, wss);
// Handle new connections
wss.on('connection', (socket: StreetSocket) => {
console.log('Client connected');
socket.on('message', (data: unknown) => {
console.log('Received:', data);
socket.emit('echo', data);
});
socket.on('close', () => {
console.log('Client disconnected');
});
});
await app.listen();
Each connected client is represented by a StreetSocket instance:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Send a typed event to this client
socket.emit('eventName', payload);
// Listen for events from this client
socket.on('eventName', (data: unknown) => { /* ... */ });
// Remove a listener
socket.off('eventName', handler);
// Check if socket is still open
if (!socket.closed) {
socket.emit('update', { ts: Date.now() });
}
// Close the connection
socket.close(1000, 'Normal closure');
Send a message to all connected clients:
1
2
3
4
5
6
7
8
9
// Broadcast to all clients
wss.broadcast('announcement', { text: 'Server restarting in 60s' });
// Broadcast to a subset (manual filter)
for (const socket of wss.clients) {
if (socket.userId === targetUserId) {
socket.emit('notification', { message: 'You have a new message' });
}
}
A complete real-time chat implementation:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// src/gateways/chat.gateway.ts
import { StreetSocket } from '@streetjs/core';
import type { IncomingMessage } from 'node:http';
interface ChatMessage {
type: 'join' | 'message' | 'leave';
user: string;
text: string;
timestamp: number;
}
const connections = new Map<string, { socket: StreetSocket; user: string }>();
export function chatConnectionHandler(
socket: StreetSocket,
_req: IncomingMessage
): void {
const clientId = crypto.randomUUID();
let userName = `Anonymous-${clientId.slice(0, 6)}`;
socket.on('message', (data: unknown) => {
const msg = data as ChatMessage;
switch (msg.type) {
case 'join':
userName = msg.user || userName;
connections.set(clientId, { socket, user: userName });
broadcast({ type: 'join', user: userName, text: `${userName} joined`, timestamp: Date.now() });
break;
case 'message':
broadcast({ type: 'message', user: userName, text: msg.text, timestamp: Date.now() });
break;
}
});
socket.on('close', () => {
connections.delete(clientId);
broadcast({ type: 'leave', user: userName, text: `${userName} left`, timestamp: Date.now() });
});
}
function broadcast(msg: ChatMessage): void {
const data = JSON.stringify(msg);
for (const [id, conn] of connections) {
try {
conn.socket.emit('chat', data);
} catch {
connections.delete(id);
}
}
}
Register in main.ts:
1
2
3
import { chatConnectionHandler } from './gateways/chat.gateway.js';
wss.on('connection', chatConnectionHandler);
Validate a JWT token from the query string on connection:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import { JwtService, container } from '@streetjs/core';
import type { IncomingMessage } from 'node:http';
wss.on('connection', (socket: StreetSocket, req: IncomingMessage) => {
const url = new URL(req.url ?? '/', `http://localhost`);
const token = url.searchParams.get('token');
if (!token) {
socket.close(4001, 'Unauthorized');
return;
}
const jwt = container.resolve(JwtService);
try {
const payload = jwt.verify(token) as { userId: string };
(socket as StreetSocket & { userId: string }).userId = payload.userId;
} catch {
socket.close(4001, 'Invalid token');
return;
}
// Connection is authenticated — proceed
socket.emit('connected', { userId: (socket as any).userId });
});
1
2
3
4
5
6
7
// Current connection count
console.log(wss.connectionCount);
// All active sockets
for (const socket of wss.clients) {
console.log(socket.closed ? 'dead' : 'alive');
}
1
2
3
4
5
process.once('SIGTERM', async () => {
await wss.close(); // closes all connections, stops accepting new ones
await pool.close();
process.exit(0);
});
For one-way server-to-client streaming, use SSE instead of WebSockets:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import { createSse } from '@streetjs/core';
import type { StreetContext } from '@streetjs/core';
@Controller('/api/events')
class EventController {
@Get('/stream')
async stream(ctx: StreetContext): Promise<void> {
const sse = createSse(ctx.res, 15_000); // 15s heartbeat
const interval = setInterval(() => {
if (sse.closed) { clearInterval(interval); return; }
sse.send({ time: new Date().toISOString() }, 'tick');
}, 1000);
ctx.req.once('close', () => {
clearInterval(interval);
sse.close();
});
}
}
Client-side:
1
2
3
4
const es = new EventSource('/api/events/stream');
es.addEventListener('tick', (e) => {
console.log(JSON.parse(e.data));
});