-
-
Notifications
You must be signed in to change notification settings - Fork 7.7k
/
Copy pathsse-stream.ts
125 lines (112 loc) · 3.49 KB
/
sse-stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import { MessageEvent } from '@nestjs/common/interfaces';
import { isObject } from '@nestjs/common/utils/shared.utils';
import { IncomingMessage, OutgoingHttpHeaders } from 'http';
import { Transform } from 'stream';
function toDataString(data: string | object): string {
if (isObject(data)) {
return toDataString(JSON.stringify(data));
}
return data
.split(/\r\n|\r|\n/)
.map(line => `data: ${line}\n`)
.join('');
}
export type AdditionalHeaders = Record<
string,
string[] | string | number | undefined
>;
interface ReadHeaders {
getHeaders?(): AdditionalHeaders;
}
interface WriteHeaders {
writableEnded?: boolean;
writeHead?(
statusCode: number,
reasonPhrase?: string,
headers?: OutgoingHttpHeaders,
): void;
writeHead?(statusCode: number, headers?: OutgoingHttpHeaders): void;
flushHeaders?(): void;
}
export type WritableHeaderStream = NodeJS.WritableStream & WriteHeaders;
export type HeaderStream = WritableHeaderStream & ReadHeaders;
/**
* Adapted from https://raw.githubusercontent.com/EventSource/node-ssestream
* Transforms "messages" to W3C event stream content.
* See https://html.spec.whatwg.org/multipage/server-sent-events.html
* A message is an object with one or more of the following properties:
* - data (String or object, which gets turned into JSON)
* - type
* - id
* - retry
*
* If constructed with a HTTP Request, it will optimise the socket for streaming.
* If this stream is piped to an HTTP Response, it will set appropriate headers.
*/
export class SseStream extends Transform {
private lastEventId: number = null;
constructor(req?: IncomingMessage) {
super({ objectMode: true });
if (req && req.socket) {
req.socket.setKeepAlive(true);
req.socket.setNoDelay(true);
req.socket.setTimeout(0);
}
}
pipe<T extends WritableHeaderStream>(
destination: T,
options?: {
additionalHeaders?: AdditionalHeaders;
end?: boolean;
},
): T {
if (destination.writeHead) {
destination.writeHead(200, {
...options?.additionalHeaders,
// See /~https://github.com/dunglas/mercure/blob/master/hub/subscribe.go#L124-L130
'Content-Type': 'text/event-stream',
Connection: 'keep-alive',
// Disable cache, even for old browsers and proxies
'Cache-Control':
'private, no-cache, no-store, must-revalidate, max-age=0, no-transform',
Pragma: 'no-cache',
Expire: '0',
// NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering
'X-Accel-Buffering': 'no',
});
destination.flushHeaders();
}
destination.write('\n');
return super.pipe(destination, options);
}
_transform(
message: MessageEvent,
encoding: string,
callback: (error?: Error | null, data?: any) => void,
) {
let data = message.type ? `event: ${message.type}\n` : '';
data += message.id ? `id: ${message.id}\n` : '';
data += message.retry ? `retry: ${message.retry}\n` : '';
data += message.data ? toDataString(message.data) : '';
data += '\n';
this.push(data);
callback();
}
/**
* Calls `.write` but handles the drain if needed
*/
writeMessage(
message: MessageEvent,
cb: (error: Error | null | undefined) => void,
) {
if (!message.id) {
this.lastEventId++;
message.id = this.lastEventId.toString();
}
if (!this.write(message, 'utf-8', cb)) {
this.once('drain', cb);
} else {
process.nextTick(cb);
}
}
}