Skip to content

Commit

Permalink
...
Browse files Browse the repository at this point in the history
  • Loading branch information
indutny committed Jun 30, 2015
1 parent b6444ee commit 3d3709b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 56 deletions.
95 changes: 60 additions & 35 deletions lib/_stream_wrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const util = require('util');
const Socket = require('net').Socket;
const JSStream = process.binding('js_stream').JSStream;
const uv = process.binding('uv');
const debug = util.debuglog('stream_wrap');

function StreamWrap(stream) {
const handle = new JSStream();
Expand All @@ -15,6 +16,7 @@ function StreamWrap(stream) {

const self = this;
handle.close = function(cb) {
debug('close');
self.doClose(cb);
};
handle.isAlive = function() {
Expand All @@ -40,18 +42,23 @@ function StreamWrap(stream) {
this.stream.on('error', function(err) {
self.emit('error', err);
});

Socket.call(this, {
handle: handle
});

this.stream.on('data', function(chunk) {
if (self._handle)
self._handle.readBuffer(chunk);
setImmediate(function() {
debug('data', chunk.length);
if (self._handle)
self._handle.readBuffer(chunk);
});
});
this.stream.once('end', function() {
if (self._handle)
self._handle.emitEOF();
setImmediate(function() {
debug('end');
if (self._handle)
self._handle.emitEOF();
});
});

Socket.call(this, {
handle: handle
});
}
util.inherits(StreamWrap, Socket);
Expand All @@ -61,11 +68,11 @@ module.exports = StreamWrap;
StreamWrap.StreamWrap = StreamWrap;

StreamWrap.prototype.isAlive = function isAlive() {
return this.readable && this.writable;
return true;
};

StreamWrap.prototype.isClosing = function isClosing() {
return !this.isAlive();
return !this.readable || !this.writable;
};

StreamWrap.prototype.readStart = function readStart() {
Expand All @@ -79,11 +86,16 @@ StreamWrap.prototype.readStop = function readStop() {
};

StreamWrap.prototype.doShutdown = function doShutdown(req) {
const self = this;
const handle = this._handle;
const item = this._enqueue('shutdown', req);

this.stream.end(function() {
// Ensure that write was dispatched
setImmediate(function() {
if (!self._dequeue(item))
return;

handle.finishShutdown(req, 0);
});
});
Expand All @@ -97,7 +109,7 @@ StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
var pending = bufs.length;

// Queue the request to be able to cancel it
self._enqueue(req);
const item = self._enqueue('write', req);

self.stream.cork();
bufs.forEach(function(buf) {
Expand All @@ -115,7 +127,7 @@ StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
// Ensure that write was dispatched
setImmediate(function() {
// Do not invoke callback twice
if (!self._dequeue(req))
if (!self._dequeue(item))
return;

var errCode = 0;
Expand All @@ -134,39 +146,47 @@ StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
return 0;
};

StreamWrap.prototype._enqueue = function enqueue(req) {
function QueueItem(type, req) {
this.type = type;
this.req = req;
this.prev = this;
this.next = this;
}

StreamWrap.prototype._enqueue = function enqueue(type, req) {
const item = new QueueItem(type, req);
if (this._queue === null) {
this._queue = req;
req._prev = req;
req._next = req;
return;
this._queue = item;
return item;
}

req._next = this._queue._next;
req._prev = this._queue;
req._next._prev = req;
req._prev._next = req;
item.next = this._queue.next;
item.prev = this._queue;
item.next.prev = item;
item.prev.next = item;

return item;
};

StreamWrap.prototype._dequeue = function dequeue(req) {
var next = req._next;
var prev = req._prev;
StreamWrap.prototype._dequeue = function dequeue(item) {
var next = item.next;
var prev = item.prev;

if (next === null && prev === null)
return false;

req._next = null;
req._prev = null;
item.next = null;
item.prev = null;

if (next === req) {
if (next === item) {
prev = null;
next = null;
} else {
prev._next = next;
next._prev = prev;
prev.next = next;
next.prev = prev;
}

if (this._queue === req)
if (this._queue === item)
this._queue = next;

return true;
Expand All @@ -178,12 +198,17 @@ StreamWrap.prototype.doClose = function doClose(cb) {

setImmediate(function() {
while (self._queue !== null) {
const req = self._queue;
self._dequeue(req);
const item = self._queue;
const req = item.req;
self._dequeue(item);

const errCode = uv.UV_ECANCELED;
handle.doAfterWrite(req);
handle.finishWrite(req, errCode);
if (item.type === 'write') {
handle.doAfterWrite(req);
handle.finishWrite(req, errCode);
} else if (item.type === 'shutdown') {
handle.finishShutdown(req, errCode);
}
}

// Should be already set by net.js
Expand Down
35 changes: 14 additions & 21 deletions test/parallel/test-stream-wrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,34 @@ const StreamWrap = require('_stream_wrap');
const Duplex = require('stream').Duplex;
const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap;

var done = false;

function testShutdown(callback) {
var stream = new Duplex({
read: function() {
},
write: function(data, enc, callback) {
callback(null);
write: function() {
}
});

var wrap = new StreamWrap(stream);

var req = new ShutdownWrap();
req.oncomplete = function() {};
req.oncomplete = function(code) {
assert(code < 0);
callback();
};
req.handle = wrap._handle;
wrap._handle.shutdown(req);

// Close the handle to simulate
wrap.destroy();

process.nextTick(callback);
}

function testReadAfterClose(callback) {
var stream = new Duplex({
read: function() {
},
write: function(data, enc, callback) {
callback(null);
}
});
stream.push('data');
stream.push(null);

var wrap = new StreamWrap(stream);
req.handle.shutdown(req);
}

testShutdown(function() {
testReadAfterClose();
done = true;
});

process.on('exit', function() {
assert(done);
});

0 comments on commit 3d3709b

Please sign in to comment.