From e23b7bb40e20bacf7f64c627333918e7d5137560 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sun, 20 Sep 2020 23:42:52 -0700 Subject: [PATCH] fix: new 'worker-protocol' module to do Array-to-Buffer conversion --- packages/SwingSet/src/worker-protocol.js | 41 ++++++++++ .../SwingSet/test/test-worker-protocol.js | 76 +++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 packages/SwingSet/src/worker-protocol.js create mode 100644 packages/SwingSet/test/test-worker-protocol.js diff --git a/packages/SwingSet/src/worker-protocol.js b/packages/SwingSet/src/worker-protocol.js new file mode 100644 index 00000000000..3fd0abc5cf1 --- /dev/null +++ b/packages/SwingSet/src/worker-protocol.js @@ -0,0 +1,41 @@ +import { Transform } from 'stream'; + +// Transform objects which convert from hardened Arrays of JSON-serializable +// data into Buffers suitable for netstring conversion. + +export function arrayEncoderStream() { + function transform(object, encoding, callback) { + if (!Array.isArray(object)) { + throw Error('stream requires Arrays'); + } + let err; + try { + this.push(Buffer.from(JSON.stringify(object))); + } catch (e) { + err = e; + } + callback(err); + } + // Array in, Buffer out, hence writableObjectMode + return new Transform({ transform, writableObjectMode: true }); +} + +export function arrayDecoderStream() { + function transform(buf, encoding, callback) { + let err; + try { + if (!Buffer.isBuffer(buf)) { + throw Error('stream expects Buffers'); + } + this.push(JSON.parse(buf)); + } catch (e) { + err = e; + } + // this Transform is a one-to-one conversion of Buffer into Array, so we + // always consume the input each time we're called + callback(err); + } + + // Buffer in, Array out, hence readableObjectMode + return new Transform({ transform, readableObjectMode: true }); +} diff --git a/packages/SwingSet/test/test-worker-protocol.js b/packages/SwingSet/test/test-worker-protocol.js new file mode 100644 index 00000000000..01ce1d1873c --- /dev/null +++ b/packages/SwingSet/test/test-worker-protocol.js @@ -0,0 +1,76 @@ +import '@agoric/install-ses'; // adds 'harden' to global + +import test from 'ava'; +import { arrayEncoderStream, arrayDecoderStream } from '../src/worker-protocol'; +import { + encode, + netstringEncoderStream, + netstringDecoderStream, +} from '../src/netstring'; + +test('arrayEncoderStream', async t => { + const e = arrayEncoderStream(); + const chunks = []; + e.on('data', data => chunks.push(data)); + e.write([]); + + function eq(expected) { + t.deepEqual( + chunks.map(buf => buf.toString()), + expected, + ); + } + eq([`[]`]); + + e.write(['command', { foo: 1 }]); + eq([`[]`, `["command",{"foo":1}]`]); +}); + +test('encode stream', async t => { + const aStream = arrayEncoderStream(); + const nsStream = netstringEncoderStream(); + aStream.pipe(nsStream); + const chunks = []; + nsStream.on('data', data => chunks.push(data)); + function eq(expected) { + t.deepEqual( + chunks.map(buf => buf.toString()), + expected, + ); + } + + aStream.write([1]); + eq(['3:[1],']); + + aStream.write(['command', { foo: 4 }]); + eq(['3:[1],', '21:["command",{"foo":4}],']); +}); + +test('decode stream', async t => { + const nsStream = netstringDecoderStream(); + const aStream = arrayDecoderStream(); + nsStream.pipe(aStream); + function write(s) { + nsStream.write(Buffer.from(s)); + } + + const msgs = []; + aStream.on('data', msg => msgs.push(msg)); + + function eq(expected) { + t.deepEqual(msgs, expected); + } + + let buf = encode(Buffer.from(JSON.stringify([1]))); + write(buf.slice(0, 1)); + eq([]); + write(buf.slice(1)); + eq([[1]]); + msgs.pop(); + + buf = encode(Buffer.from(JSON.stringify(['command', { foo: 2 }]))); + write(buf.slice(0, 4)); + eq([]); + write(buf.slice(4)); + eq([['command', { foo: 2 }]]); +});