Skip to content

Commit

Permalink
fix(NODE-3176): handle errors from MessageStream (#2774)
Browse files Browse the repository at this point in the history
If there's a socket that gets a large amount of data written to it our MessageStream will throw an error about exceeding the permitted BSON size, this error is now captured in SDAM.
  • Loading branch information
nbbeeken authored Apr 8, 2021
1 parent b1363c2 commit f1afcc4
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 32 deletions.
68 changes: 36 additions & 32 deletions lib/cmap/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,38 +58,9 @@ class Connection extends EventEmitter {
/* ignore errors, listen to `close` instead */
});

stream.on('close', () => {
if (this.closed) {
return;
}

this.closed = true;
this[kQueue].forEach(op =>
op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`))
);
this[kQueue].clear();

this.emit('close');
});

stream.on('timeout', () => {
if (this.closed) {
return;
}

stream.destroy();
this.closed = true;
this[kQueue].forEach(op =>
op.cb(
new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, {
beforeHandshake: this[kIsMaster] == null
})
)
);

this[kQueue].clear();
this.emit('close');
});
this[kMessageStream].on('error', error => this.handleIssue({ destroy: error }));
stream.on('close', () => this.handleIssue({ isClose: true }));
stream.on('timeout', () => this.handleIssue({ isTimeout: true, destroy: true }));

// hook the message stream up to the passed in stream
stream.pipe(this[kMessageStream]);
Expand Down Expand Up @@ -132,6 +103,39 @@ class Connection extends EventEmitter {
this[kLastUseTime] = now();
}

/**
* @param {{ isTimeout?: boolean; isClose?: boolean; destroy?: boolean | Error }} issue
*/
handleIssue(issue) {
if (this.closed) {
return;
}

if (issue.destroy) {
this[kStream].destroy(typeof issue.destroy === 'boolean' ? undefined : issue.destroy);
}

this.closed = true;

for (const idAndOp of this[kQueue]) {
const op = idAndOp[1];
if (issue.isTimeout) {
op.cb(
new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, {
beforeHandshake: !!this[kIsMaster]
})
);
} else if (issue.isClose) {
op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`));
} else {
op.cb(typeof issue.destroy === 'boolean' ? undefined : issue.destroy);
}
}

this[kQueue].clear();
this.emit('close');
}

destroy(options, callback) {
if (typeof options === 'function') {
callback = options;
Expand Down
30 changes: 30 additions & 0 deletions test/unit/sdam/topology.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -291,5 +291,35 @@ describe('Topology (unit)', function() {
});
});
});

it('should encounter a server selection timeout on garbled server responses', function() {
const net = require('net');
const server = net.createServer();
const p = Promise.resolve();
server.listen(0, 'localhost', 2, () => {
server.on('connection', c => c.on('data', () => c.write('garbage_data')));
const address = server.address();
const client = this.configuration.newClient(
`mongodb://${address.address}:${address.port}`,
{ serverSelectionTimeoutMS: 1000 }
);
p.then(() =>
client
.connect()
.then(() => {
server.close();
client.close();
expect.fail('Should throw a server selection error!');
})
.catch(error => {
server.close();
const closePromise = client.close();
expect(error).to.exist;
return closePromise;
})
);
});
return p;
});
});
});

0 comments on commit f1afcc4

Please sign in to comment.