Example: Streaming PostgreSQL Query
This example streams a large dataset from PostgreSQL directly to an HTTP response as NDJSON (newline-delimited JSON), keeping heap usage constant regardless of result set size.
The endpoint
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// src/controllers/export.controller.ts
import { Injectable } from '../core/container.js';
import { Controller, Get } from '../core/decorators.js';
import type { StreetContext } from '../core/context.js';
import { PgPool } from '../database/pool.js';
import { NotFoundException } from '../http/exceptions.js';
@Injectable()
@Controller('/api/export')
export class ExportController {
constructor(private readonly pool: PgPool) {}
/**
* Stream all users as NDJSON (one JSON object per line).
* Memory usage: ~2-5 MB regardless of how many users exist.
*/
@Get('/users')
async streamUsers(ctx: StreetContext): Promise<void> {
const conn = await this.pool.acquire();
ctx.setHeader('Content-Type', 'application/x-ndjson');
ctx.setHeader('Transfer-Encoding', 'chunked');
ctx.setHeader('X-Accel-Buffering', 'no'); // Disable nginx buffering
ctx.res.writeHead(200);
const stream = conn.queryStream(
`SELECT id, email, name, roles, created_at
FROM users
ORDER BY created_at ASC`
);
let rowCount = 0;
stream.on('data', (row: Record<string, string | null>) => {
rowCount++;
const line = JSON.stringify({
id: row['id'],
email: row['email'],
name: row['name'],
roles: JSON.parse(row['roles'] ?? '["user"]') as string[],
createdAt: row['created_at'],
}) + '\n';
const canContinue = ctx.res.write(line);
// Apply backpressure: HTTP buffer is full → pause DB stream
if (!canContinue) {
stream.pause();
}
});
// Resume DB stream when HTTP buffer drains
ctx.res.on('drain', () => stream.resume());
stream.on('end', () => {
ctx.res.end();
this.pool.release(conn);
console.log(`[export] Streamed ${rowCount} users`);
});
stream.on('error', (err) => {
console.error('[export] Stream error:', err);
ctx.res.destroy();
this.pool.release(conn);
});
}
/**
* Stream as CSV with proper escaping.
*/
@Get('/users.csv')
async streamUsersCsv(ctx: StreetContext): Promise<void> {
const conn = await this.pool.acquire();
ctx.setHeader('Content-Type', 'text/csv; charset=utf-8');
ctx.setHeader('Content-Disposition', 'attachment; filename="users.csv"');
ctx.setHeader('X-Accel-Buffering', 'no');
ctx.res.writeHead(200);
// Write CSV header
ctx.res.write('id,email,name,roles,created_at\n');
const stream = conn.queryStream(
'SELECT id, email, name, roles, created_at FROM users ORDER BY created_at ASC'
);
stream.on('data', (row: Record<string, string | null>) => {
const line = [
row['id'],
row['email'],
row['name'],
row['roles'],
row['created_at'],
]
.map((v) => `"${(v ?? '').replace(/"/g, '""')}"`)
.join(',') + '\n';
const canContinue = ctx.res.write(line);
if (!canContinue) stream.pause();
});
ctx.res.on('drain', () => stream.resume());
stream.on('end', () => { ctx.res.end(); this.pool.release(conn); });
stream.on('error', () => { ctx.res.destroy(); this.pool.release(conn); });
}
}
Consuming the stream
1
2
3
4
5
6
7
8
9
10
11
# Fetch NDJSON and process each line
curl -N http://localhost:3000/api/export/users | \
while IFS= read -r line; do
echo "$line" | jq '.email'
done
# Save to file
curl http://localhost:3000/api/export/users.csv -o users.csv
# Count rows streamed
curl -s http://localhost:3000/api/export/users | wc -l
Memory profile comparison
| Approach | 100K rows | 1M rows |
|---|---|---|
Buffered conn.query() |
~45 MB spike | OOM or 450 MB |
Streamed conn.queryStream() |
~3 MB steady | ~3 MB steady |
Example: WebSocket Chat Server
A minimal chat server with room support, NDJSON message logging, and bounded connection management.
Server setup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// src/main.ts (additions)
import { StreetWebSocketServer } from './websocket/server.js';
import { setupChatHandlers } from './websocket/chat.js';
import { createServer } from 'node:http';
const httpServer = createServer();
const wsServer = new StreetWebSocketServer({
heartbeatIntervalMs: 30_000,
maxConnections: 5_000,
});
container.register(StreetWebSocketServer, wsServer);
// After app.listen():
wsServer.attach(httpServer, setupChatHandlers(wsServer));
httpServer.listen(3001); // WS on separate port, or same port with path routing
Chat handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// src/websocket/chat.ts
import { StreetWebSocketServer, type StreetSocket } from './server.js';
import type { IncomingMessage } from 'node:http';
interface ChatUser {
id: string;
name: string;
room: string;
}
// In-memory room state (per-worker — not shared across cluster)
const MAX_ROOMS = 500;
const MAX_MEMBERS_PER_ROOM = 200;
const userMap = new Map<StreetSocket, ChatUser>();
export function setupChatHandlers(
wsServer: StreetWebSocketServer
): (socket: StreetSocket, req: IncomingMessage) => void {
return (socket: StreetSocket, req: IncomingMessage) => {
const remoteIp = req.socket.remoteAddress ?? 'unknown';
console.log(`[ws] New connection from ${remoteIp}`);
// ── Join a room ──────────────────────────────────────────────
socket.on('chat:join', (rawData) => {
const data = rawData as { name?: string; room?: string };
const name = String(data.name ?? 'Anonymous').slice(0, 50);
const room = String(data.room ?? 'general').slice(0, 50);
// Check room capacity
const members = [...userMap.values()].filter((u) => u.room === room);
if (members.length >= MAX_MEMBERS_PER_ROOM) {
socket.emit('chat:error', { message: 'Room is full' });
return;
}
// Check total rooms
const rooms = new Set([...userMap.values()].map((u) => u.room));
if (rooms.size >= MAX_ROOMS && !rooms.has(room)) {
socket.emit('chat:error', { message: 'Too many rooms' });
return;
}
const user: ChatUser = { id: generateId(), name, room };
userMap.set(socket, user);
socket.emit('chat:joined', { userId: user.id, room, name });
// Notify room members
broadcastToRoom(wsServer, room, 'chat:user_joined', {
userId: user.id,
name,
memberCount: members.length + 1,
}, socket);
console.log(`[ws] ${name} joined room "${room}" (${members.length + 1} members)`);
});
// ── Send a message ───────────────────────────────────────────
socket.on('chat:message', (rawData) => {
const user = userMap.get(socket);
if (!user) {
socket.emit('chat:error', { message: 'Join a room first' });
return;
}
const data = rawData as { text?: string };
const text = String(data.text ?? '').slice(0, 2000); // Bounded message
if (!text.trim()) return;
broadcastToRoom(wsServer, user.room, 'chat:message', {
from: user.name,
userId: user.id,
text,
ts: Date.now(),
});
});
// ── Leave ────────────────────────────────────────────────────
socket.on('chat:leave', () => {
handleLeave(wsServer, socket);
});
// ── Disconnect cleanup ───────────────────────────────────────
// The 'close' event on the underlying WS fires when the connection drops
// StreetSocket clears its listeners automatically — but we still need
// to remove the user from our application-level map:
const ws = (socket as unknown as { ws: { on: Function } }).ws;
ws.on('close', () => {
handleLeave(wsServer, socket);
});
};
}
function handleLeave(wsServer: StreetWebSocketServer, socket: StreetSocket): void {
const user = userMap.get(socket);
if (!user) return;
userMap.delete(socket);
broadcastToRoom(wsServer, user.room, 'chat:user_left', {
userId: user.id,
name: user.name,
});
console.log(`[ws] ${user.name} left room "${user.room}"`);
}
function broadcastToRoom(
wsServer: StreetWebSocketServer,
room: string,
event: string,
payload: unknown,
exclude?: StreetSocket
): void {
for (const [sock, user] of userMap.entries()) {
if (user.room === room && sock !== exclude) {
sock.emit(event, payload);
}
}
}
function generateId(): string {
return Math.random().toString(36).slice(2, 10);
}
Client-side JavaScript
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<script>
const ws = new WebSocket('ws://localhost:3001');
ws.onopen = () => {
ws.send(JSON.stringify({ type: 'chat:join', payload: { name: 'Alice', room: 'lobby' }, ts: Date.now() }));
};
ws.onmessage = (e) => {
const { type, payload } = JSON.parse(e.data);
if (type === 'chat:message') {
console.log(`${payload.from}: ${payload.text}`);
} else if (type === 'chat:joined') {
console.log('Joined!', payload);
}
};
function sendMessage(text) {
ws.send(JSON.stringify({ type: 'chat:message', payload: { text }, ts: Date.now() }));
}
</script>
Example: File Upload Service
A complete file upload API with type validation, size limits, metadata storage, and secure serving.
Database
1
2
3
4
5
6
7
8
9
10
11
12
13
-- migrations/004_create_file_uploads.sql
CREATE TABLE IF NOT EXISTS file_uploads (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
original_name VARCHAR(255) NOT NULL,
stored_name VARCHAR(255) NOT NULL UNIQUE,
mime_type VARCHAR(100) NOT NULL,
size_bytes BIGINT NOT NULL,
path TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS file_uploads_user_id_idx ON file_uploads (user_id);
Upload controller
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// src/controllers/upload.controller.ts
import { createReadStream, stat } from 'node:fs';
import { unlink } from 'node:fs/promises';
import { promisify } from 'node:util';
import { extname, join } from 'node:path';
import { Injectable } from '../core/container.js';
import { Controller, Post, Get, Delete } from '../core/decorators.js';
import type { StreetContext } from '../core/context.js';
import { PgPool } from '../database/pool.js';
import { BadRequestException, NotFoundException, ForbiddenException } from '../http/exceptions.js';
import { authMiddleware } from '../http/auth.middleware.js';
import { JwtService } from '../security/jwt.js';
import { AppConfig } from '../config/index.js';
const statAsync = promisify(stat);
const ALLOWED_MIME = new Set([
'image/jpeg', 'image/png', 'image/webp', 'image/gif',
'application/pdf', 'text/plain', 'text/csv',
]);
const MAX_SIZE = 10 * 1024 * 1024; // 10 MB per file
@Injectable()
@Controller('/api/files')
export class UploadController {
private readonly auth: ReturnType<typeof authMiddleware>;
constructor(
private readonly pool: PgPool,
private readonly config: AppConfig,
) {
this.auth = authMiddleware(new JwtService(this.config.jwtSecret));
}
@Post('/upload', /* this.auth — injected in constructor, used below */)
async upload(ctx: StreetContext): Promise<void> {
if (!ctx.user) throw new ForbiddenException();
if (ctx.files.length === 0) throw new BadRequestException('No file provided');
const results = [];
for (const file of ctx.files) {
// Validate MIME type
if (!ALLOWED_MIME.has(file.mimeType)) {
await unlink(file.path).catch(() => undefined);
throw new BadRequestException(`MIME type not allowed: ${file.mimeType}`);
}
// Validate size
if (file.size > MAX_SIZE) {
await unlink(file.path).catch(() => undefined);
throw new BadRequestException(`File too large: max ${MAX_SIZE / 1024 / 1024} MB`);
}
// Store metadata in database
const safeOriginal = file.originalName.replace(/'/g, "''");
const safeStored = file.path.split('/').pop()!.replace(/'/g, "''");
const safeMime = file.mimeType.replace(/'/g, "''");
const safePath = file.path.replace(/'/g, "''");
const result = await this.pool.query(
`INSERT INTO file_uploads (user_id, original_name, stored_name, mime_type, size_bytes, path)
VALUES ('${ctx.user.id}', '${safeOriginal}', '${safeStored}', '${safeMime}', ${file.size}, '${safePath}')
RETURNING id, original_name, mime_type, size_bytes, created_at`
);
const row = result.rows[0]!;
results.push({
id: row['id'],
originalName: row['original_name'],
mimeType: row['mime_type'],
sizeBytes: parseInt(row['size_bytes'] ?? '0', 10),
url: `/api/files/${row['id']}`,
createdAt: row['created_at'],
});
}
ctx.json({ uploaded: results.length, files: results }, 201);
}
@Get('/:id')
async serve(ctx: StreetContext): Promise<void> {
const fileId = ctx.params['id']!.replace(/'/g, "''");
const result = await this.pool.query(
`SELECT * FROM file_uploads WHERE id = '${fileId}' LIMIT 1`
);
if (result.rows.length === 0) throw new NotFoundException('File not found');
const row = result.rows[0] as Record<string, string | null>;
const filePath = row['path']!;
// Verify file exists on disk
try { await statAsync(filePath); } catch {
throw new NotFoundException('File not found on disk');
}
ctx.setHeader('Content-Type', row['mime_type'] ?? 'application/octet-stream');
ctx.setHeader('Content-Length', row['size_bytes'] ?? '0');
ctx.setHeader('Content-Disposition', `inline; filename="${row['original_name']}"`);
ctx.setHeader('Cache-Control', 'private, max-age=86400');
ctx.res.writeHead(200);
const readStream = createReadStream(filePath);
readStream.pipe(ctx.res);
readStream.on('error', () => ctx.res.destroy());
}
@Get('/my/files')
async listMyFiles(ctx: StreetContext): Promise<void> {
if (!ctx.user) throw new ForbiddenException();
const result = await this.pool.query(
`SELECT id, original_name, mime_type, size_bytes, created_at
FROM file_uploads
WHERE user_id = '${ctx.user.id}'
ORDER BY created_at DESC
LIMIT 100`
);
ctx.json({
files: result.rows.map((r) => ({
id: r['id'],
originalName: r['original_name'],
mimeType: r['mime_type'],
sizeBytes: parseInt(r['size_bytes'] ?? '0', 10),
url: `/api/files/${r['id']}`,
createdAt: r['created_at'],
})),
});
}
@Delete('/:id')
async remove(ctx: StreetContext): Promise<void> {
if (!ctx.user) throw new ForbiddenException();
const fileId = ctx.params['id']!.replace(/'/g, "''");
const result = await this.pool.query(
`DELETE FROM file_uploads WHERE id = '${fileId}' AND user_id = '${ctx.user.id}'
RETURNING path`
);
if (result.rows.length === 0) throw new NotFoundException('File not found or not yours');
const filePath = result.rows[0]?.['path'];
if (filePath) await unlink(filePath).catch(() => undefined);
ctx.send(204);
}
}
Example requests
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Upload a single file
curl -X POST http://localhost:3000/api/files/upload \
-H 'Authorization: Bearer ...' \
-F 'file=@/path/to/photo.jpg'
# Response:
# {
# "uploaded": 1,
# "files": [{
# "id": "uuid-...",
# "originalName": "photo.jpg",
# "mimeType": "image/jpeg",
# "sizeBytes": 245678,
# "url": "/api/files/uuid-...",
# "createdAt": "2024-01-15T10:23:45.123Z"
# }]
# }
# Upload multiple files
curl -X POST http://localhost:3000/api/files/upload \
-H 'Authorization: Bearer ...' \
-F 'files=@/path/to/doc1.pdf' \
-F 'files=@/path/to/doc2.pdf'
# Download a file
curl http://localhost:3000/api/files/uuid-... \
-H 'Authorization: Bearer ...' \
-o downloaded.jpg
# List my files
curl http://localhost:3000/api/files/my/files \
-H 'Authorization: Bearer ...'
# Delete a file
curl -X DELETE http://localhost:3000/api/files/uuid-... \
-H 'Authorization: Bearer ...'
# HTTP 204
Security notes
- Path traversal prevention — stored filenames are random hex strings (
randomBytes(16).toString('hex')) with the sanitized original name appended. They can never contain../. - MIME validation — allowlist-based. Only explicitly permitted types are accepted.
- Size limits — enforced at both the body parser level (
maxBodyBytes) and per-file in the handler. - Ownership check —
DELETEverifiesuser_idmatches before removing. Users cannot delete others’ files. - No execution — uploaded files are never executed. Never serve them from the same domain where
<script>execution would be trusted.