Skip to content

Commit

Permalink
feat: implement websockets (nodejs#1795)
Browse files Browse the repository at this point in the history
* initial handshake

* minor fixes

* feat: working initial handshake!

* feat(ws): initial WebSocket class implementation

* fix: allow http: and ws: urls

* fix(ws): use websocket spec

* fix(ws): use websocket spec

* feat: implement url getter

* feat: implement some of `WebSocket.close` and ready state

* fix: body is null for websockets & pass socket to response

* fix: store the fetch controller & response on ws

* fix: remove invalid tests

* feat: implement readyState getter

* feat: implement `protocol` and `extensions` getters

* feat: implement event listeners

* feat: implement binaryType attribute

* fix: add argument length checks

* feat: basic unfragmented message parsing

* fix: always remove previous listener

* feat: add in idlharness WPT

* implement sending a message for WS and add a websocketFrame class

* feat: allow sending ArrayBuffer/Views & Blob

* fix: remove duplicate `upgrade` and `connection` headers

* feat: add in WebSocket.close() and handle closing frames

* refactor WebsocketFrame and support receiving frames in multiple chunks

* fixes

* move WebsocketFrame to its own file

* feat: export WebSocket & add types

* fix: tsd

* feat(wpt): use WebSocketServer & run test

* fix: properly set/read close code & close reason

* fix: flakiness in websocket test runner

* fix: receive message with arraybuffer binary type

* feat: split WebsocketFrame into 2 classes (sent & received)

* fix: parse fragmented frames more efficiently & close frame

* fix: add types for MessageEvent and CloseEvent

* fix: subprotocol validation & add wpts

* fix: protocol validation & protocol webidl & add wpts

* fix: correct bufferedAmount calc. & message event w/ blob

* fix: don't truncate typedarrays

* feat: add remaining wpts

* fix: allow sending payloads > 65k bytes

* fix: mask data > 125 bytes properly

* revert changes to core

* fix: decrement bufferedAmount after write

* fix: handle ping and pong frames

* fix: simplify receiving frame logic

* fix: disable extensions & validate frames

* fix: send close frame upon receiving

* lint

* fix: validate status code & utf-8

* fix: add hooks

* fix: check if frame is unfragmented correctly

* fix: send ping app data in pong frames

* export websocket on node >= 18 & add diagnostic_channels

* mark test as flaky

* fix: couple bug fixes

* fix: fragmented frame end detection

* fix: use TextDecoder for utf-8 validation

* fix: handle incomplete chunks

* revert: handle incomplete chunks

* mark WebSockets as experimental

* fix: sending 65k bytes is still flaky on linux

* fix: apply suggestions

* fix: apply some suggestions

* add basic docs

* feat: use streaming parser for frames

* feat: validate some frames & remove WebsocketFrame class

* fix: parse close frame & move failWebsocketConnection

* fix: read close reason and read entire close body

* fix: echo close frame if one hasn't been sent

* fix: emit message event on message receive

* fix: minor fixes

* fix: ci

* fix: set was clean exit after server receives close frame

* fix: check if received close frame for clean close

* fix: set sent close after writing frame

* feat: implement error messages

* fix: add error event handler to socket

* fix: address reviews

Co-authored-by: Subhi Al Hasan <jodevsa@gmail.com>
  • Loading branch information
2 people authored and crysmags committed Feb 27, 2024
1 parent f02646b commit 4894fca
Show file tree
Hide file tree
Showing 92 changed files with 3,881 additions and 15 deletions.
41 changes: 41 additions & 0 deletions docs/api/DiagnosticsChannel.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,44 @@ diagnosticsChannel.channel('undici:client:connectError').subscribe(({ error, soc
// connector is a function that creates the socket
console.log(`Connect failed with ${error.message}`)
})
```

## `undici:websocket:open`

This message is published after the client has successfully connected to a server.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:open').subscribe(({ address, protocol, extensions }) => {
console.log(address) // address, family, and port
console.log(protocol) // negotiated subprotocols
console.log(extensions) // negotiated extensions
})
```

## `undici:websocket:close`

This message is published after the connection has closed.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:close').subscribe(({ websocket, code, reason }) => {
console.log(websocket) // the WebSocket object
console.log(code) // the closing status code
console.log(reason) // the closing reason
})
```

## `undici:websocket:socket_error`

This message is published if the socket experiences an error.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:socket_error').subscribe((error) => {
console.log(error)
})
```
20 changes: 20 additions & 0 deletions docs/api/WebSocket.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Class: WebSocket

> ⚠️ Warning: the WebSocket API is experimental and has known bugs.
Extends: [`EventTarget`](https://developer.mozilla.org/en-US/docs/Web/API/EventTarget)

The WebSocket object provides a way to manage a WebSocket connection to a server, allowing bidirectional communication. The API follows the [WebSocket spec](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket).

## `new WebSocket(url[, protocol])`

Arguments:

* **url** `URL | string` - The url's protocol *must* be `ws` or `wss`.
* **protocol** `string | string[]` (optional) - Subprotocol(s) to request the server use.

## Read More

- [MDN - WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket)
- [The WebSocket Specification](https://www.rfc-editor.org/rfc/rfc6455)
- [The WHATWG WebSocket Specification](https://websockets.spec.whatwg.org/)
1 change: 1 addition & 0 deletions docsify/sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [MockErrors](/docs/api/MockErrors.md "Undici API - MockErrors")
* [API Lifecycle](/docs/api/api-lifecycle.md "Undici API - Lifecycle")
* [Diagnostics Channel Support](/docs/api/DiagnosticsChannel.md "Diagnostics Channel Support")
* [WebSocket](/docs/api/WebSocket.md "Undici API - WebSocket")
* Best Practices
* [Proxy](/docs/best-practices/proxy.md "Connecting through a proxy")
* [Client Certificate](/docs/best-practices/client-certificate.md "Connect using a client certificate")
Expand Down
1 change: 1 addition & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export * from './types/file'
export * from './types/filereader'
export * from './types/formdata'
export * from './types/diagnostics-channel'
export * from './types/websocket'
export { Interceptable } from './types/mock-interceptor'

export { Dispatcher, BalancedPool, Pool, Client, buildConnector, errors, Agent, request, stream, pipeline, connect, upgrade, setGlobalDispatcher, getGlobalDispatcher, setGlobalOrigin, getGlobalOrigin, MockClient, MockPool, MockAgent, mockErrors, ProxyAgent, RedirectHandler, DecoratorHandler }
Expand Down
6 changes: 6 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ if (nodeMajor > 16 || (nodeMajor === 16 && nodeMinor >= 8)) {
module.exports.getGlobalOrigin = getGlobalOrigin
}

if (nodeMajor >= 18) {
const { WebSocket } = require('./lib/websocket/websocket')

module.exports.WebSocket = WebSocket
}

module.exports.request = makeDispatcher(api.request)
module.exports.stream = makeDispatcher(api.stream)
module.exports.pipeline = makeDispatcher(api.pipeline)
Expand Down
46 changes: 40 additions & 6 deletions lib/fetch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const { dataURLProcessor, serializeAMimeType } = require('./dataURL')
const { TransformStream } = require('stream/web')
const { getGlobalDispatcher } = require('../../index')
const { webidl } = require('./webidl')
const { STATUS_CODES } = require('http')

/** @type {import('buffer').resolveObjectURL} */
let resolveObjectURL
Expand Down Expand Up @@ -1745,12 +1746,17 @@ async function httpNetworkFetch (
}

try {
const { body, status, statusText, headersList } = await dispatch({ body: requestBody })
// socket is only provided for websockets
const { body, status, statusText, headersList, socket } = await dispatch({ body: requestBody })

const iterator = body[Symbol.asyncIterator]()
fetchParams.controller.next = () => iterator.next()
if (socket) {
response = makeResponse({ status, statusText, headersList, socket })
} else {
const iterator = body[Symbol.asyncIterator]()
fetchParams.controller.next = () => iterator.next()

response = makeResponse({ status, statusText, headersList })
response = makeResponse({ status, statusText, headersList })
}
} catch (err) {
// 10. If aborted, then:
if (err.name === 'AbortError') {
Expand Down Expand Up @@ -1934,7 +1940,10 @@ async function httpNetworkFetch (

async function dispatch ({ body }) {
const url = requestCurrentURL(request)
return new Promise((resolve, reject) => fetchParams.controller.dispatcher.dispatch(
/** @type {import('../..').Agent} */
const agent = fetchParams.controller.dispatcher

return new Promise((resolve, reject) => agent.dispatch(
{
path: url.pathname + url.search,
origin: url.origin,
Expand All @@ -1943,7 +1952,8 @@ async function httpNetworkFetch (
headers: request.headersList[kHeadersCaseInsensitive],
maxRedirections: 0,
bodyTimeout: 300_000,
headersTimeout: 300_000
headersTimeout: 300_000,
upgrade: request.mode === 'websocket' ? 'websocket' : undefined
},
{
body: null,
Expand Down Expand Up @@ -2062,6 +2072,30 @@ async function httpNetworkFetch (
fetchParams.controller.terminate(error)

reject(error)
},

onUpgrade (status, headersList, socket) {
if (status !== 101) {
return
}

const headers = new Headers()

for (let n = 0; n < headersList.length; n += 2) {
const key = headersList[n + 0].toString('latin1')
const val = headersList[n + 1].toString('latin1')

headers.append(key, val)
}

resolve({
status,
statusText: STATUS_CODES[status],
headersList: headers[kHeadersList],
socket
})

return true
}
}
))
Expand Down
14 changes: 12 additions & 2 deletions lib/fetch/webidl.js
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,20 @@ webidl.converters['unsigned long long'] = function (V) {
return x
}

// https://webidl.spec.whatwg.org/#es-unsigned-long
webidl.converters['unsigned long'] = function (V) {
// 1. Let x be ? ConvertToInt(V, 32, "unsigned").
const x = webidl.util.ConvertToInt(V, 32, 'unsigned')

// 2. Return the IDL unsigned long value that
// represents the same numeric value as x.
return x
}

// https://webidl.spec.whatwg.org/#es-unsigned-short
webidl.converters['unsigned short'] = function (V) {
webidl.converters['unsigned short'] = function (V, opts) {
// 1. Let x be ? ConvertToInt(V, 16, "unsigned").
const x = webidl.util.ConvertToInt(V, 16, 'unsigned')
const x = webidl.util.ConvertToInt(V, 16, 'unsigned', opts)

// 2. Return the IDL unsigned short value that represents
// the same numeric value as x.
Expand Down
Loading

0 comments on commit 4894fca

Please sign in to comment.