Skip to content

davedoesdev/primus-backpressure

Repository files navigation

primus-backpressure   Build Status Coverage Status NPM version

Node streams2 over Primus: added back-pressure!

  • Pass in a Primus client or spark, get back a stream.Duplex. Do this on both sides of a Primus connection.
  • write returns true when the receiver is full.
  • Use read and readable to exert back-pressure on the sender.
  • Unit tests with 100% coverage.
  • Browser unit tests using webpack and PhantomJS.

The API is described here.

Example

Exerting backpressure over Primus:

var Primus = require('primus'),
    PrimusDuplex = require('primus-backpressure').PrimusDuplex,
    primus = Primus.createServer({ port: 9000 }),
    Socket = primus.Socket,
    assert = require('assert'),
    read_a = false;

primus.once('connection', function (spark)
{
    var spark_duplex = new PrimusDuplex(spark,
    {
        highWaterMark: 2
    });

    assert.equal(spark_duplex.write('ab'), false);

    spark_duplex.on('drain', function ()
    {
        assert(read_a);
    });
});

var client_duplex = new PrimusDuplex(new Socket('http://localhost:9000'),
{
    highWaterMark: 1
});

client_duplex.once('readable', function ()
{
    assert.equal(this.read().toString(), 'a');
    read_a = true;
    assert.equal(this.read(), null);

    this.once('readable', function ()
    {
        assert.equal(this.read().toString(), 'b');
        primus.end();
        console.log('done')
    });
});

Another Example

Piping data over Primus:

var Primus = require('primus'),
    PrimusDuplex = require('primus-backpressure').PrimusDuplex,
    primus = Primus.createServer({ port: 9000 }),
    Socket = primus.Socket,
    assert = require('assert'),
    crypto = require('crypto'),
    tmp = require('tmp'),
    fs = require('fs');

primus.once('connection', function (spark)
{
    var spark_duplex = new PrimusDuplex(spark);

    tmp.tmpName(function (err, random_file)
    {
        assert.ifError(err);

        var random_buf = crypto.randomBytes(1024 * 1024);

        fs.writeFile(random_file, random_buf, function (err)
        {
            assert.ifError(err);

            tmp.tmpName(function (err, out_file)
            {
                assert.ifError(err);

                var random_stream = fs.createReadStream(random_file),
                    out_stream = fs.createWriteStream(out_file);

                out_stream.on('finish', function ()
                {
                    fs.readFile(out_file, function (err, out_buf)
                    {
                        assert.ifError(err);
                        assert.deepEqual(out_buf, random_buf);

                        fs.unlink(random_file, function (err)
                        {
                            assert.ifError(err);
                            fs.unlink(out_file, function (err)
                            {
                                assert.ifError(err);
                                primus.end();
                                console.log('done');
                            });
                        });
                    });
                });

                spark_duplex.pipe(out_stream);
                random_stream.pipe(spark_duplex);
            });
        });
    });
});

var client_duplex = new PrimusDuplex(new Socket('http://localhost:9000'));
client_duplex.pipe(client_duplex);

Installation

npm install primus-backpressure

Licence

MIT

Test

Node client to Node server:

grunt test

Browser client to Node server (requires PhantomJS):

grunt test-browser

Code Coverage

grunt coverage

c8 results are available here.

Coveralls page is here.

Lint

grunt lint

API

PrimusDuplex inherits from stream.Duplex so you can call any method from stream.Readable and stream.Writable.

Extra constructor options and an additional parameter to readable.read are described below.

Source: index.js

PrimusDuplex(msg_stream, [options])

Creates a new PrimusDuplex object which exerts back-pressure over a Primus connection.

Both sides of a Primus connection must use PrimusDuplex — create one for your Primus client and one for your spark as soon as you have them.

Parameters:

  • {Object} msg_stream The Primus client or spark you wish to exert back-pressure over.
  • {Object} [options] Configuration options. This is passed onto stream.Duplex and can contain the following extra properties:
    • {Function} [encode_data(chunk, encoding, start, end, internal)] Optional encoding function for data passed to writable.write. chunk and encoding are as described in the writable.write documentation. The difference is that encode_data is synchronous (it must return the encoded data) and it should only encode data between the start and end positions in chunk. Defaults to a function which does chunk.toString('base64', start, end). Note that PrimusDuplex may also pass some internal data through this function (always with chunk as a Buffer, encoding=null and internal=true).

    • {Function} [decode_data(chunk, internal)] Optional decoding function for data received on the Primus connection. The type of chunk will depend on how the peer PrimusDuplex encoded it. Defaults to a function which does Buffer.from(chunk, 'base64'). If the data can't be decoded, return null (and optionally call this.emit to emit an error). Note that PrimusDuplex may also pass some internal data through this function (always with internal=true) — in which case you must return a Buffer.

    • {Integer} [max_write_size] Maximum number of bytes to write onto the Primus connection at once, regardless of how many bytes the peer is free to receive. Defaults to 0 (no limit).

    • {Boolean} [check_read_overflow] Whether to check if more data than expected is being received. If true and the high-water mark for reading is exceeded then the PrimusDuplex object emits an error event. This should not normally occur unless you add data yourself using readable.unshift — in which case you should set check_read_overflow to false. Defaults to true.

Go: TOC

PrimusDuplex.prototype.read([size], [send_status])

See readable.read. PrimusDuplex adds an extra optional parameter, send_status.

Parameters:

  • {Number} [size] Optional argument to specify how much data to read. Defaults to undefined (you can also specify null) which means return all the data available.
  • {Boolean} [send_status] Every time you call read, a status message is sent to the peer PrimusDuplex indicating how much space is left in the internal buffer to receive new data. To prevent deadlock, these status messages are always sent — they aren't subject to back-pressure. Normally this is fine because status messages are small. However, if your application reads data one byte at a time, for example, you may wish to control when status messages are sent. To stop a status message being sent when you call read, pass send_status as false. send_status defaults to true. To force a status message to be sent without reading any data, call read(0).

Go: TOC | PrimusDuplex.prototype

—generated by apidox

About

Node streams2 over Primus: added back-pressure!

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published