Skip to content

Commit

Permalink
Rely on stream.destroy() whenever available (#4095)
Browse files Browse the repository at this point in the history
Co-authored-by: devin ivy <devin@bigroomstudios.com>
  • Loading branch information
kanongil and devinivy authored Mar 20, 2021
1 parent 035f7f1 commit 0e71bf4
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 77 deletions.
10 changes: 7 additions & 3 deletions lib/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -661,14 +661,18 @@ exports = module.exports = internals.Response = class {

static drain(stream) {

if (stream.destroy) {
stream.destroy();
return;
}

// Fallback for old-style streams

stream.unpipe();

if (stream.close) {
stream.close();
}
else {
stream.destroy();
}
}
};

Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
"@hapi/vision": "^6.0.1",
"@hapi/wreck": "^17.0.0",
"handlebars": "^4.7.4",
"joi": "^17.0.0"
"joi": "^17.0.0",
"legacy-readable-stream": "npm:readable-stream@^1.0.34"
},
"scripts": {
"test": "lab -a @hapi/code -t 100 -L -m 5000",
Expand Down
4 changes: 2 additions & 2 deletions test/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ describe('Request', () => {
expect(called).to.be.false();
});

it('closes response after server timeout', async () => {
it('destroys response after server timeout', async () => {

const team = new Teamwork.Team();
const handler = async (request) => {
Expand All @@ -998,7 +998,7 @@ describe('Request', () => {
this.push(null);
};

stream.close = () => team.attend();
stream._destroy = () => team.attend();
return stream;
};

Expand Down
38 changes: 20 additions & 18 deletions test/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const Stream = require('stream');
const Code = require('@hapi/code');
const Handlebars = require('handlebars');
const Hapi = require('..');
const Hoek = require('@hapi/hoek');
const Inert = require('@hapi/inert');
const Lab = require('@hapi/lab');
const Vision = require('@hapi/vision');
Expand Down Expand Up @@ -1188,45 +1187,43 @@ describe('Response', () => {
server.route({ method: 'GET', path: '/stream', handler: streamHandler });
server.route({ method: 'GET', path: '/writable', handler: writableHandler });

let updates = 0;
server.events.on({ name: 'request', channels: 'error' }, (request, event) => {

expect(event.error).to.be.an.error('Stream must have a readable interface');
++updates;
});

await server.initialize();

const log1 = server.events.once({ name: 'request', channels: 'error' });
const res1 = await server.inject('/stream');
expect(res1.statusCode).to.equal(500);

const [, event1] = await log1;
expect(event1.error).to.be.an.error('Stream must have a readable interface');

const log2 = server.events.once({ name: 'request', channels: 'error' });
const res2 = await server.inject('/writable');
expect(res2.statusCode).to.equal(500);

await Hoek.wait(10);

expect(updates).to.equal(2);
const [, event2] = await log2;
expect(event2.error).to.be.an.error('Stream must have a readable interface');
});

it('errors on an http client stream response', async () => {

const handler = (request, h) => {

return h.response('just a string');
};

const streamHandler = (request, h) => {

return h.response(Http.get(request.server.info + '/'));
const req = Http.get(request.server.info.uri);
req.abort();
return h.response(req);
};

const server = Hapi.server({ debug: false });
server.route({ method: 'GET', path: '/', handler });
server.route({ method: 'GET', path: '/stream', handler: streamHandler });

const log = server.events.once({ name: 'request', channels: 'error' });

await server.initialize();
const res = await server.inject('/stream');
expect(res.statusCode).to.equal(500);

const [, event] = await log;
expect(event.error).to.be.an.error('Stream must have a readable interface');
});

it('errors on objectMode stream response', async () => {
Expand Down Expand Up @@ -1260,8 +1257,13 @@ describe('Response', () => {
const server = Hapi.server({ debug: false });
server.route({ method: 'GET', path: '/', handler });

const log = server.events.once({ name: 'request', channels: 'error' });

const res = await server.inject('/');
expect(res.statusCode).to.equal(500);

const [, event] = await log;
expect(event.error).to.be.an.error('Cannot reply with stream in object mode');
});
});

Expand Down
Loading

0 comments on commit 0e71bf4

Please sign in to comment.