-
Notifications
You must be signed in to change notification settings - Fork 217
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: new 'worker-protocol' module to do Array-to-Buffer conversion
- Loading branch information
Showing
2 changed files
with
117 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import { Transform } from 'stream'; | ||
|
||
// Transform objects which convert from hardened Arrays of JSON-serializable | ||
// data into Buffers suitable for netstring conversion. | ||
|
||
export function arrayEncoderStream() { | ||
function transform(object, encoding, callback) { | ||
if (!Array.isArray(object)) { | ||
throw Error('stream requires Arrays'); | ||
} | ||
let err; | ||
try { | ||
this.push(Buffer.from(JSON.stringify(object))); | ||
} catch (e) { | ||
err = e; | ||
} | ||
callback(err); | ||
} | ||
// Array in, Buffer out, hence writableObjectMode | ||
return new Transform({ transform, writableObjectMode: true }); | ||
} | ||
|
||
export function arrayDecoderStream() { | ||
function transform(buf, encoding, callback) { | ||
let err; | ||
try { | ||
if (!Buffer.isBuffer(buf)) { | ||
throw Error('stream expects Buffers'); | ||
} | ||
this.push(JSON.parse(buf)); | ||
} catch (e) { | ||
err = e; | ||
} | ||
// this Transform is a one-to-one conversion of Buffer into Array, so we | ||
// always consume the input each time we're called | ||
callback(err); | ||
} | ||
|
||
// Buffer in, Array out, hence readableObjectMode | ||
return new Transform({ transform, readableObjectMode: true }); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
import '@agoric/install-ses'; // adds 'harden' to global | ||
|
||
import test from 'ava'; | ||
import { arrayEncoderStream, arrayDecoderStream } from '../src/worker-protocol'; | ||
import { | ||
encode, | ||
netstringEncoderStream, | ||
netstringDecoderStream, | ||
} from '../src/netstring'; | ||
|
||
test('arrayEncoderStream', async t => { | ||
const e = arrayEncoderStream(); | ||
const chunks = []; | ||
e.on('data', data => chunks.push(data)); | ||
e.write([]); | ||
|
||
function eq(expected) { | ||
t.deepEqual( | ||
chunks.map(buf => buf.toString()), | ||
expected, | ||
); | ||
} | ||
eq([`[]`]); | ||
|
||
e.write(['command', { foo: 1 }]); | ||
eq([`[]`, `["command",{"foo":1}]`]); | ||
}); | ||
|
||
test('encode stream', async t => { | ||
const aStream = arrayEncoderStream(); | ||
const nsStream = netstringEncoderStream(); | ||
aStream.pipe(nsStream); | ||
const chunks = []; | ||
nsStream.on('data', data => chunks.push(data)); | ||
function eq(expected) { | ||
t.deepEqual( | ||
chunks.map(buf => buf.toString()), | ||
expected, | ||
); | ||
} | ||
|
||
aStream.write([1]); | ||
eq(['3:[1],']); | ||
|
||
aStream.write(['command', { foo: 4 }]); | ||
eq(['3:[1],', '21:["command",{"foo":4}],']); | ||
}); | ||
|
||
test('decode stream', async t => { | ||
const nsStream = netstringDecoderStream(); | ||
const aStream = arrayDecoderStream(); | ||
nsStream.pipe(aStream); | ||
function write(s) { | ||
nsStream.write(Buffer.from(s)); | ||
} | ||
|
||
const msgs = []; | ||
aStream.on('data', msg => msgs.push(msg)); | ||
|
||
function eq(expected) { | ||
t.deepEqual(msgs, expected); | ||
} | ||
|
||
let buf = encode(Buffer.from(JSON.stringify([1]))); | ||
write(buf.slice(0, 1)); | ||
eq([]); | ||
write(buf.slice(1)); | ||
eq([[1]]); | ||
msgs.pop(); | ||
|
||
buf = encode(Buffer.from(JSON.stringify(['command', { foo: 2 }]))); | ||
write(buf.slice(0, 4)); | ||
eq([]); | ||
write(buf.slice(4)); | ||
eq([['command', { foo: 2 }]]); | ||
}); |