Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement websockets #1795

Merged
merged 82 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
bb688ac
initial handshake
KhafraDev Nov 28, 2022
e6e2686
minor fixes
KhafraDev Nov 29, 2022
9aea277
feat: working initial handshake!
KhafraDev Dec 1, 2022
f1ae69a
feat(ws): initial WebSocket class implementation
KhafraDev Dec 1, 2022
bec82fd
fix: allow http: and ws: urls
KhafraDev Dec 1, 2022
7e6725e
fix(ws): use websocket spec
KhafraDev Dec 1, 2022
5030683
fix(ws): use websocket spec
KhafraDev Dec 1, 2022
a928c58
feat: implement url getter
KhafraDev Dec 1, 2022
ecff3d7
feat: implement some of `WebSocket.close` and ready state
KhafraDev Dec 1, 2022
6ec2701
fix: body is null for websockets & pass socket to response
KhafraDev Dec 1, 2022
39c7b43
fix: store the fetch controller & response on ws
KhafraDev Dec 1, 2022
fb84cb8
fix: remove invalid tests
KhafraDev Dec 1, 2022
85a4046
feat: implement readyState getter
KhafraDev Dec 2, 2022
190cabe
feat: implement `protocol` and `extensions` getters
KhafraDev Dec 2, 2022
454a5c3
feat: implement event listeners
KhafraDev Dec 2, 2022
5ae04ee
feat: implement binaryType attribute
KhafraDev Dec 2, 2022
c9ff83a
fix: add argument length checks
KhafraDev Dec 2, 2022
484e732
feat: basic unfragmented message parsing
KhafraDev Dec 2, 2022
8619653
fix: always remove previous listener
KhafraDev Dec 3, 2022
c70bd8b
feat: add in idlharness WPT
KhafraDev Dec 3, 2022
34758e6
implement sending a message for WS and add a websocketFrame class
jodevsa Dec 4, 2022
6a6b1b3
Merge pull request #30 from jodevsa/ws
KhafraDev Dec 4, 2022
a87e53b
feat: allow sending ArrayBuffer/Views & Blob
KhafraDev Dec 4, 2022
d983115
fix: remove duplicate `upgrade` and `connection` headers
KhafraDev Dec 4, 2022
feee358
feat: add in WebSocket.close() and handle closing frames
KhafraDev Dec 4, 2022
5e61a84
refactor WebsocketFrame and support receiving frames in multiple chunks
jodevsa Dec 4, 2022
0e07468
fixes
jodevsa Dec 4, 2022
e288ea5
Merge pull request #31 from jodevsa/ws
KhafraDev Dec 5, 2022
cbad426
move WebsocketFrame to its own file
KhafraDev Dec 5, 2022
d2462a1
feat: export WebSocket & add types
KhafraDev Dec 5, 2022
1340dd4
fix: tsd
KhafraDev Dec 5, 2022
8d98382
feat(wpt): use WebSocketServer & run test
KhafraDev Dec 5, 2022
3cae2ec
fix: properly set/read close code & close reason
KhafraDev Dec 5, 2022
73c8389
fix: flakiness in websocket test runner
KhafraDev Dec 6, 2022
8b7bc44
fix: receive message with arraybuffer binary type
KhafraDev Dec 6, 2022
eb09dc6
feat: split WebsocketFrame into 2 classes (sent & received)
KhafraDev Dec 6, 2022
174c030
fix: parse fragmented frames more efficiently & close frame
KhafraDev Dec 6, 2022
7ee907b
fix: add types for MessageEvent and CloseEvent
KhafraDev Dec 6, 2022
6985bc3
fix: subprotocol validation & add wpts
KhafraDev Dec 6, 2022
eae7f76
fix: protocol validation & protocol webidl & add wpts
KhafraDev Dec 7, 2022
682d880
fix: correct bufferedAmount calc. & message event w/ blob
KhafraDev Dec 7, 2022
052a804
fix: don't truncate typedarrays
KhafraDev Dec 7, 2022
5f2fdab
feat: add remaining wpts
KhafraDev Dec 7, 2022
6caaa47
fix: allow sending payloads > 65k bytes
KhafraDev Dec 7, 2022
2d5b9ae
fix: mask data > 125 bytes properly
KhafraDev Dec 7, 2022
0880cef
revert changes to core
KhafraDev Dec 7, 2022
02805e8
fix: decrement bufferedAmount after write
KhafraDev Dec 8, 2022
58114ab
fix: handle ping and pong frames
KhafraDev Dec 8, 2022
a099471
fix: simplify receiving frame logic
KhafraDev Dec 8, 2022
346bdb8
fix: disable extensions & validate frames
KhafraDev Dec 9, 2022
5113fb2
fix: send close frame upon receiving
KhafraDev Dec 9, 2022
dee913c
lint
KhafraDev Dec 9, 2022
b3c4314
fix: validate status code & utf-8
KhafraDev Dec 9, 2022
7741277
fix: add hooks
KhafraDev Dec 9, 2022
b1339fb
fix: check if frame is unfragmented correctly
KhafraDev Dec 9, 2022
61c921c
fix: send ping app data in pong frames
KhafraDev Dec 10, 2022
c7249d3
export websocket on node >= 18 & add diagnostic_channels
KhafraDev Dec 10, 2022
aadf25a
mark test as flaky
KhafraDev Dec 10, 2022
30b1e23
fix: couple bug fixes
KhafraDev Dec 10, 2022
6a7134b
fix: fragmented frame end detection
KhafraDev Dec 10, 2022
b6411a6
fix: use TextDecoder for utf-8 validation
KhafraDev Dec 11, 2022
4fbfbf4
fix: handle incomplete chunks
KhafraDev Dec 11, 2022
e43fa5b
revert: handle incomplete chunks
KhafraDev Dec 11, 2022
553a0f2
mark WebSockets as experimental
KhafraDev Dec 11, 2022
e0289f7
fix: sending 65k bytes is still flaky on linux
KhafraDev Dec 11, 2022
dcc3801
fix: apply suggestions
KhafraDev Dec 11, 2022
797acc3
fix: apply some suggestions
KhafraDev Dec 12, 2022
dfc57ab
add basic docs
KhafraDev Dec 12, 2022
a0094a4
feat: use streaming parser for frames
KhafraDev Dec 13, 2022
883d9d9
feat: validate some frames & remove WebsocketFrame class
KhafraDev Dec 13, 2022
71848d9
fix: parse close frame & move failWebsocketConnection
KhafraDev Dec 14, 2022
b468029
fix: read close reason and read entire close body
KhafraDev Dec 14, 2022
8839538
fix: echo close frame if one hasn't been sent
KhafraDev Dec 14, 2022
78b77a8
fix: emit message event on message receive
KhafraDev Dec 14, 2022
6cfc42e
fix: minor fixes
KhafraDev Dec 14, 2022
7f1b5d4
fix: ci
KhafraDev Dec 14, 2022
24ae13b
fix: set was clean exit after server receives close frame
KhafraDev Dec 14, 2022
5086ce8
fix: check if received close frame for clean close
KhafraDev Dec 14, 2022
6441dc3
fix: set sent close after writing frame
KhafraDev Dec 14, 2022
476a192
feat: implement error messages
KhafraDev Dec 15, 2022
28c7ff6
fix: add error event handler to socket
KhafraDev Dec 15, 2022
73128fd
fix: address reviews
KhafraDev Dec 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/websocket/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ function receiveData (ws) {
// - type indicates that the data is Binary and binary type is "blob"
// a new Blob object, created in the relevant Realm of the
// WebSocket object, that represents data as its raw data
dataForEvent = new Blob(frame.payloadData)
dataForEvent = new Blob([frame.payloadData])
} else if (frame.opcode === opcodes.BINARY && ws[kBinaryType] === 'arraybuffer') {
// - type indicates that the data is Binary and binary type is
// "arraybuffer"
Expand Down
102 changes: 83 additions & 19 deletions lib/websocket/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,23 @@ class WebSocket extends EventTarget {
// https://datatracker.ietf.org/doc/html/rfc6455#section-6.1
// https://datatracker.ietf.org/doc/html/rfc6455#section-5.2

if (!isEstablished(this) || isClosing(this)) {
return
}

/** @type {import('stream').Duplex} */
const { socket } = this[kResponse]

const frame = new WebsocketFrame()
// TODO: support fragmentation later.
frame.FIN = true
frame.isMasked = true
frame.maskKey = Buffer.allocUnsafe(4)

for (let i = 0; i < 4; i++) {
frame.maskKey[i] = Math.floor(Math.random() * 256)
}

// If data is a string
if (typeof data === 'string') {
// If the WebSocket connection is established and the WebSocket
Expand All @@ -218,25 +235,72 @@ class WebSocket extends EventTarget {
// string argument that does not throw an exception must increase
// the bufferedAmount attribute by the number of bytes needed to
// express the argument as UTF-8.
if (isEstablished(this) && !isClosing(this)) {
const socket = this[kResponse].socket
const frame = new WebsocketFrame()
// 1. set FIN to true. TODO: support fragmentation later.
frame.FIN = true
// 2. enable masking
frame.isMasked = true
// 3. set mask key
frame.maskKey = Buffer.alloc(4)
for (let i = 0; i < 4; i++) {
frame.maskKey[i] = Math.floor(Math.random() * 256)
}

// TODO: support BINARY data
frame.opcode = opcodes.TEXT
frame.payloadData = Buffer.from(data)

socket.write(frame.toBuffer())
}

frame.opcode = opcodes.TEXT
frame.payloadData = Buffer.from(data)

const buffer = frame.toBuffer()
this.#bufferedAmount += buffer.byteLength
socket.write(buffer)
} else if (types.isArrayBuffer(data)) {
// If the WebSocket connection is established, and the WebSocket
// closing handshake has not yet started, then the user agent must
// send a WebSocket Message comprised of data using a binary frame
// opcode; if the data cannot be sent, e.g. because it would need
// to be buffered but the buffer is full, the user agent must flag
// the WebSocket as full and then close the WebSocket connection.
// The data to be sent is the data stored in the buffer described
// by the ArrayBuffer object. Any invocation of this method with an
// ArrayBuffer argument that does not throw an exception must
// increase the bufferedAmount attribute by the length of the
// ArrayBuffer in bytes.

frame.opcode = opcodes.BINARY
frame.payloadData = Buffer.from(data)

const buffer = frame.toBuffer()
this.#bufferedAmount += buffer.byteLength
socket.write(buffer)
} else if (ArrayBuffer.isView(data)) {
// If the WebSocket connection is established, and the WebSocket
// closing handshake has not yet started, then the user agent must
// send a WebSocket Message comprised of data using a binary frame
// opcode; if the data cannot be sent, e.g. because it would need to
// be buffered but the buffer is full, the user agent must flag the
// WebSocket as full and then close the WebSocket connection. The
// data to be sent is the data stored in the section of the buffer
// described by the ArrayBuffer object that data references. Any
// invocation of this method with this kind of argument that does
// not throw an exception must increase the bufferedAmount attribute
// by the length of data’s buffer in bytes.

frame.opcode = opcodes.BINARY
frame.payloadData = Buffer.from(data)

const buffer = frame.toBuffer()
this.#bufferedAmount += buffer.byteLength
socket.write(buffer)
} else if (isBlobLike(data)) {
// If the WebSocket connection is established, and the WebSocket
// closing handshake has not yet started, then the user agent must
// send a WebSocket Message comprised of data using a binary frame
// opcode; if the data cannot be sent, e.g. because it would need to
// be buffered but the buffer is full, the user agent must flag the
// WebSocket as full and then close the WebSocket connection. The data
// to be sent is the raw data represented by the Blob object. Any
// invocation of this method with a Blob argument that does not throw
// an exception must increase the bufferedAmount attribute by the size
// of the Blob object’s raw data, in bytes.

frame.opcode = opcodes.BINARY

data.arrayBuffer().then((ab) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have concurrent writes this might change the order of messages:

websocket.send(blob);
websocket.send(string);

The string message is sent before the blob.

Copy link
Member Author

@KhafraDev KhafraDev Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know of any good way of fixing this without needing a Promise for every send.. Preferably node would allow writing a blob to a writable/duplex stream.

But then there would be issues masking the body/creating a frame. I guess the only other solution is adding a queue, but that's better for another time.

Copy link
Member

@lpinca lpinca Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the reason why there is no Blob support in ws. This and because the user can already do

websocket.send(await blob.arrayBuffer());

I agree with this 1

Non-goals

  • Support Blob chunks. The old WebSocket API defaults to receiving messages as Blobs; however, creating and reading Blobs is more costly than creating and reading ArrayBuffers. In practice, even though it requires explicitly setting binaryType, 97% of messages are received as ArrayBuffers. On the send side, sending Blobs adds considerable complexity to the implementation because the contents are not available synchronously. Since less than 4% of sent messages are Blobs it is better to avoid this complexity where we can.

Footnotes

  1. /~https://github.com/ricea/websocketstream-explainer/tree/832f88d7f8eaecbe51da318d802951a3f8aadb2f#non-goals

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when I was writing out my comment is kinda just clicked - "oh... this is why ws doesn't support blobs", lol. I still think undici should implement it for the sake of complying with the spec.

When I get around to implementing websocketstream this shouldn't be a problem at least!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, ws already uses a queue (only when permessage-deflate is enabled and negotiated since compression is asynchronous) so support for Blob could be added, but I think it does not worth the effort. I think that Blob is of little use in Node.js if not natively supported by streams.

Copy link
Contributor

@jimmywarting jimmywarting Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, but send(await blob.arrayBuffer()) would require reading the hole blob into memory before sending...
sure you could do something like:

for await (const chunk of blob.stream()) {
  websocket.send(chunk)
}

But then you would get multiple messages and you would have to assemble them yourself on the receiving side as well 😞

if you can accept blob response types then it could as well be piped to a tmp location on the disc and wouldn't hold up any memory when receiving blobs

are there any limit on how large a message can be?

Copy link
Contributor

@jimmywarting jimmywarting Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, just went into the github repo and learned about a new WebSocketStream forgot that it existed, only head about it when it was in early proposal. Well, then you could instead maybe do: blob.stream().pipeTo(websocket)?

Should we implement a globalThis.WebSocketStream also?

Copy link
Member

@lpinca lpinca Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then you would get multiple messages and you would have to assemble them yourself on the receiving side as well 😞

That's up to you. With ws you could send each chunk as a message fragment and receive the whole thing as a single message on the receiving end.

for await (const chunk of blob.stream()) {
  websocket.send(chunk, { fin: false}); // For the sake of simplicity backpressure is not handled.
}
websocket.send(Buffer.alloc(0), { fin: true });

if you can accept blobs then it could as well be piped to a tmp location on the disc and wouldn't hold up any memory when receiving blobs

AFAIK the whole message is buffered in memory before the 'message' event is emitted. That message is then returned to the user as a Blob.

frame.payloadData = Buffer.from(ab)

const buffer = frame.toBuffer()
this.#bufferedAmount += buffer.byteLength
socket.write(buffer)
})
}
}

Expand Down