Node streams2 over Primus: added back-pressure!
- Pass in a Primus client or spark, get back a
stream.Duplex
. Do this on both sides of a Primus connection. write
returnstrue
when the receiver is full.- Use
read
andreadable
to exert back-pressure on the sender. - Unit tests with 100% coverage.
- Browser unit tests using webpack and PhantomJS.
The API is described here.
Exerting backpressure over Primus:
var Primus = require('primus'),
PrimusDuplex = require('primus-backpressure').PrimusDuplex,
primus = Primus.createServer({ port: 9000 }),
Socket = primus.Socket,
assert = require('assert'),
read_a = false;
primus.once('connection', function (spark)
{
var spark_duplex = new PrimusDuplex(spark,
{
highWaterMark: 2
});
assert.equal(spark_duplex.write('ab'), false);
spark_duplex.on('drain', function ()
{
assert(read_a);
});
});
var client_duplex = new PrimusDuplex(new Socket('http://localhost:9000'),
{
highWaterMark: 1
});
client_duplex.once('readable', function ()
{
assert.equal(this.read().toString(), 'a');
read_a = true;
assert.equal(this.read(), null);
this.once('readable', function ()
{
assert.equal(this.read().toString(), 'b');
primus.end();
console.log('done')
});
});
Piping data over Primus:
var Primus = require('primus'),
PrimusDuplex = require('primus-backpressure').PrimusDuplex,
primus = Primus.createServer({ port: 9000 }),
Socket = primus.Socket,
assert = require('assert'),
crypto = require('crypto'),
tmp = require('tmp'),
fs = require('fs');
primus.once('connection', function (spark)
{
var spark_duplex = new PrimusDuplex(spark);
tmp.tmpName(function (err, random_file)
{
assert.ifError(err);
var random_buf = crypto.randomBytes(1024 * 1024);
fs.writeFile(random_file, random_buf, function (err)
{
assert.ifError(err);
tmp.tmpName(function (err, out_file)
{
assert.ifError(err);
var random_stream = fs.createReadStream(random_file),
out_stream = fs.createWriteStream(out_file);
out_stream.on('finish', function ()
{
fs.readFile(out_file, function (err, out_buf)
{
assert.ifError(err);
assert.deepEqual(out_buf, random_buf);
fs.unlink(random_file, function (err)
{
assert.ifError(err);
fs.unlink(out_file, function (err)
{
assert.ifError(err);
primus.end();
console.log('done');
});
});
});
});
spark_duplex.pipe(out_stream);
random_stream.pipe(spark_duplex);
});
});
});
});
var client_duplex = new PrimusDuplex(new Socket('http://localhost:9000'));
client_duplex.pipe(client_duplex);
npm install primus-backpressure
Node client to Node server:
grunt test
Browser client to Node server (requires PhantomJS):
grunt test-browser
grunt coverage
c8 results are available here.
Coveralls page is here.
grunt lint
PrimusDuplex
inherits from stream.Duplex
so you can call any method from stream.Readable
and stream.Writable
.
Extra constructor options and an additional parameter to readable.read
are described below.
Source: index.js
Creates a new
PrimusDuplex
object which exerts back-pressure over a Primus connection.
Both sides of a Primus connection must use PrimusDuplex
— create one for your Primus client and one for your spark as soon as you have them.
Parameters:
{Object} msg_stream
The Primus client or spark you wish to exert back-pressure over.{Object} [options]
Configuration options. This is passed ontostream.Duplex
and can contain the following extra properties:-
{Function} [encode_data(chunk, encoding, start, end, internal)]
Optional encoding function for data passed towritable.write
.chunk
andencoding
are as described in thewritable.write
documentation. The difference is thatencode_data
is synchronous (it must return the encoded data) and it should only encode data between thestart
andend
positions inchunk
. Defaults to a function which doeschunk.toString('base64', start, end)
. Note thatPrimusDuplex
may also pass some internal data through this function (always withchunk
as aBuffer
,encoding=null
andinternal=true
). -
{Function} [decode_data(chunk, internal)]
Optional decoding function for data received on the Primus connection. The type ofchunk
will depend on how the peerPrimusDuplex
encoded it. Defaults to a function which doesBuffer.from(chunk, 'base64')
. If the data can't be decoded, returnnull
(and optionally callthis.emit
to emit an error). Note thatPrimusDuplex
may also pass some internal data through this function (always withinternal=true
) — in which case you must return aBuffer
. -
{Integer} [max_write_size]
Maximum number of bytes to write onto the Primus connection at once, regardless of how many bytes the peer is free to receive. Defaults to 0 (no limit). -
{Boolean} [check_read_overflow]
Whether to check if more data than expected is being received. Iftrue
and the high-water mark for reading is exceeded then thePrimusDuplex
object emits anerror
event. This should not normally occur unless you add data yourself usingreadable.unshift
— in which case you should setcheck_read_overflow
tofalse
. Defaults totrue
.
-
Go: TOC
See
readable.read
.PrimusDuplex
adds an extra optional parameter,send_status
.
Parameters:
{Number} [size]
Optional argument to specify how much data to read. Defaults toundefined
(you can also specifynull
) which means return all the data available.{Boolean} [send_status]
Every time you callread
, a status message is sent to the peerPrimusDuplex
indicating how much space is left in the internal buffer to receive new data. To prevent deadlock, these status messages are always sent — they aren't subject to back-pressure. Normally this is fine because status messages are small. However, if your application reads data one byte at a time, for example, you may wish to control when status messages are sent. To stop a status message being sent when you callread
, passsend_status
asfalse
.send_status
defaults totrue
. To force a status message to be sent without reading any data, callread(0)
.
Go: TOC | PrimusDuplex.prototype
—generated by apidox—