Skip to content

Commit

Permalink
feat(db): add database-level aggregation
Browse files Browse the repository at this point in the history
Fixes NODE-1783
  • Loading branch information
kvwalker authored Feb 19, 2019
1 parent 3b6957d commit b629b21
Show file tree
Hide file tree
Showing 8 changed files with 514 additions and 125 deletions.
95 changes: 2 additions & 93 deletions lib/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const deprecate = require('util').deprecate;
const deprecateOptions = require('./utils').deprecateOptions;
const checkCollectionName = require('./utils').checkCollectionName;
const ObjectID = require('mongodb-core').BSON.ObjectID;
const AggregationCursor = require('./aggregation_cursor');
const MongoError = require('mongodb-core').MongoError;
const toError = require('./utils').toError;
const normalizeHintField = require('./utils').normalizeHintField;
Expand All @@ -19,10 +18,10 @@ const unordered = require('./bulk/unordered');
const ordered = require('./bulk/ordered');
const ChangeStream = require('./change_stream');
const executeOperation = require('./utils').executeOperation;
const applyWriteConcern = require('./utils').applyWriteConcern;
const resolveReadPreference = require('./utils').resolveReadPreference;

// Operations
const aggregate = require('./operations/aggregate').aggregate;
const bulkWrite = require('./operations/collection_ops').bulkWrite;
const checkForAtomicOperators = require('./operations/collection_ops').checkForAtomicOperators;
const count = require('./operations/collection_ops').count;
Expand Down Expand Up @@ -1733,97 +1732,7 @@ Collection.prototype.aggregate = function(pipeline, options, callback) {
pipeline = args;
}

// Ignore readConcern option
let ignoreReadConcern = false;

// Build the command
const command = { aggregate: this.s.name, pipeline: pipeline };

// If out was specified
if (typeof options.out === 'string') {
pipeline.push({ $out: options.out });
// Ignore read concern
ignoreReadConcern = true;
} else if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out']) {
ignoreReadConcern = true;
}

// Decorate command with writeConcern if out has been specified
if (
pipeline.length > 0 &&
pipeline[pipeline.length - 1]['$out'] &&
this.s.topology.capabilities().commandsTakeWriteConcern
) {
applyWriteConcern(command, { db: this.s.db, collection: this }, options);
}

// Have we specified collation
try {
decorateWithCollation(command, this, options);
} catch (err) {
if (typeof callback === 'function') return callback(err, null);
throw err;
}

// If we have bypassDocumentValidation set
if (options.bypassDocumentValidation === true) {
command.bypassDocumentValidation = options.bypassDocumentValidation;
}

// Do we have a readConcern specified
if (!ignoreReadConcern) {
decorateWithReadConcern(command, this, options);
}

// If we have allowDiskUse defined
if (options.allowDiskUse) command.allowDiskUse = options.allowDiskUse;
if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS;

// If we are giving a hint
if (options.hint) command.hint = options.hint;

options = Object.assign({}, options);
// Ensure we have the right read preference inheritance
options.readPreference = resolveReadPreference(options, { db: this.s.db, collection: this });

// If explain has been specified add it
if (options.explain) {
if (command.readConcern || command.writeConcern) {
throw toError('"explain" cannot be used on an aggregate call with readConcern/writeConcern');
}
command.explain = options.explain;
}

if (typeof options.comment === 'string') command.comment = options.comment;

// Validate that cursor options is valid
if (options.cursor != null && typeof options.cursor !== 'object') {
throw toError('cursor options must be an object');
}

options.cursor = options.cursor || {};
if (options.batchSize) options.cursor.batchSize = options.batchSize;
command.cursor = options.cursor;

// promiseLibrary
options.promiseLibrary = this.s.promiseLibrary;

// Set the AggregationCursor constructor
options.cursorFactory = AggregationCursor;
if (typeof callback !== 'function') {
if (!this.s.topology.capabilities()) {
throw new MongoError('cannot connect to server');
}

// Allow disk usage command
if (typeof options.allowDiskUse === 'boolean') command.allowDiskUse = options.allowDiskUse;
if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS;

// Execute the cursor
return this.s.topology.cursor(this.s.namespace, command, options);
}

return handleCallback(callback, null, this.s.topology.cursor(this.s.namespace, command, options));
return aggregate(this.s.db, this, pipeline, options, callback);
};

/**
Expand Down
46 changes: 46 additions & 0 deletions lib/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const CONSTANTS = require('./constants');

// Operations
const addUser = require('./operations/db_ops').addUser;
const aggregate = require('./operations/aggregate').aggregate;
const collections = require('./operations/db_ops').collections;
const createCollection = require('./operations/db_ops').createCollection;
const createIndex = require('./operations/db_ops').createIndex;
Expand Down Expand Up @@ -263,6 +264,44 @@ Db.prototype.command = function(command, options, callback) {
return executeOperation(this.s.topology, executeCommand, [this, command, options, callback]);
};

/**
* Execute an aggregation framework pipeline against the database, needs MongoDB >= 3.6
* @method
* @param {object} [pipeline=[]] Array containing all the aggregation framework commands for the execution.
* @param {object} [options] Optional settings.
* @param {(ReadPreference|string)} [options.readPreference] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
* @param {object} [options.cursor] Return the query as cursor, on 2.6 > it returns as a real cursor on pre 2.6 it returns as an emulated cursor.
* @param {number} [options.cursor.batchSize] The batchSize for the cursor
* @param {boolean} [options.explain=false] Explain returns the aggregation execution plan (requires mongodb 2.6 >).
* @param {boolean} [options.allowDiskUse=false] allowDiskUse lets the server know if it can use disk to store temporary results for the aggregation (requires mongodb 2.6 >).
* @param {number} [options.maxTimeMS] maxTimeMS specifies a cumulative time limit in milliseconds for processing operations on the cursor. MongoDB interrupts the operation at the earliest following interrupt point.
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
* @param {boolean} [options.raw=false] Return document results as raw BSON buffers.
* @param {boolean} [options.promoteLongs=true] Promotes Long values to number if they fit inside the 53 bits resolution.
* @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
* @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
* @param {object} [options.collation] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
* @param {string} [options.comment] Add a comment to an aggregation command
* @param {string|object} [options.hint] Add an index selection hint to an aggregation command
* @param {ClientSession} [options.session] optional session to use for this operation
* @param {Database~aggregationCallback} callback The command result callback
* @return {(null|AggregationCursor)}
*/
Db.prototype.aggregate = function(pipeline, options, callback) {
if (typeof options === 'function') {
callback = options;
options = {};
}

// If we have no options or callback we are doing
// a cursor based aggregation
if (options == null && callback == null) {
options = {};
}

return aggregate(this, '1', pipeline, options, callback);
};

/**
* Return the Admin db instance
* @method
Expand All @@ -281,6 +320,13 @@ Db.prototype.admin = function() {
* @param {Collection} collection The collection instance.
*/

/**
* The callback format for an aggregation call
* @callback Database~aggregationCallback
* @param {MongoError} error An error instance representing the error during the execution.
* @param {AggregationCursor} cursor The cursor if the aggregation command was executed successfully.
*/

const collectionKeys = [
'pkFactory',
'readPreference',
Expand Down
121 changes: 121 additions & 0 deletions lib/operations/aggregate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
'use strict';

const AggregationCursor = require('../aggregation_cursor');
const applyWriteConcern = require('../utils').applyWriteConcern;
const decorateWithCollation = require('../utils').decorateWithCollation;
const decorateWithReadConcern = require('../utils').decorateWithReadConcern;
const handleCallback = require('../utils').handleCallback;
const MongoError = require('mongodb-core').MongoError;
const resolveReadPreference = require('../utils').resolveReadPreference;
const toError = require('../utils').toError;

const DB_AGGREGATE_COLLECTION = 1;

/**
* Perform an aggregate operation. See Collection.prototype.aggregate or Db.prototype.aggregate for more information.
*
* @method
* @param {Db} db A Db instance.
* @param {Collection|string} coll A collection instance or the string '1', used for db.aggregate.
* @param {object} [pipeline=[]] Array containing all the aggregation framework commands for the execution.
* @param {object} [options] Optional settings. See Collection.prototype.aggregate or Db.prototype.aggregate for a list of options.
* @param {Db~aggregationCallback|Collection~aggregationCallback} callback The command result callback
*/
function aggregate(db, coll, pipeline, options, callback) {
const isDbAggregate = typeof coll === 'string';
const target = isDbAggregate ? db : coll;
const topology = target.s.topology;
let ignoreReadConcern = false;

if (typeof options.out === 'string') {
pipeline = pipeline.concat({ $out: options.out });
ignoreReadConcern = true;
} else if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out']) {
ignoreReadConcern = true;
}

let command;
let namespace;
let optionSources;

if (isDbAggregate) {
command = { aggregate: DB_AGGREGATE_COLLECTION, pipeline: pipeline };
namespace = `${db.s.databaseName}.${DB_AGGREGATE_COLLECTION}`;

optionSources = { db };
} else {
command = { aggregate: coll.s.name, pipeline: pipeline };
namespace = coll.s.namespace;

optionSources = { db: coll.s.db, collection: coll };
}

const takesWriteConcern = topology.capabilities().commandsTakeWriteConcern;

if (!ignoreReadConcern) {
decorateWithReadConcern(command, target, options);
}

if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out'] && takesWriteConcern) {
applyWriteConcern(command, optionSources, options);
}

try {
decorateWithCollation(command, target, options);
} catch (err) {
if (typeof callback === 'function') return callback(err, null);
throw err;
}

if (options.bypassDocumentValidation === true) {
command.bypassDocumentValidation = options.bypassDocumentValidation;
}

if (typeof options.allowDiskUse === 'boolean') command.allowDiskUse = options.allowDiskUse;
if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS;

if (options.hint) command.hint = options.hint;

options = Object.assign({}, options);

// Ensure we have the right read preference inheritance
options.readPreference = resolveReadPreference(options, optionSources);

if (options.explain) {
if (command.readConcern || command.writeConcern) {
throw toError('"explain" cannot be used on an aggregate call with readConcern/writeConcern');
}
command.explain = options.explain;
}

if (typeof options.comment === 'string') command.comment = options.comment;

// Validate that cursor options is valid
if (options.cursor != null && typeof options.cursor !== 'object') {
throw toError('cursor options must be an object');
}

options.cursor = options.cursor || {};
if (options.batchSize) options.cursor.batchSize = options.batchSize;
command.cursor = options.cursor;

// promiseLibrary
options.promiseLibrary = target.s.promiseLibrary;

// Set the AggregationCursor constructor
options.cursorFactory = AggregationCursor;

if (typeof callback !== 'function') {
if (!topology.capabilities()) {
throw new MongoError('cannot connect to server');
}

return topology.cursor(namespace, command, options);
}

return handleCallback(callback, null, topology.cursor(namespace, command, options));
}

module.exports = {
aggregate
};
36 changes: 36 additions & 0 deletions test/functional/aggregation_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,42 @@ describe('Aggregation', function() {
}
});

it('should correctly execute db.aggregate() with $currentOp', {
metadata: {
requires: {
mongodb: '>=3.6.0',
topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger']
}
},

test: function(done) {
const client = this.configuration.newClient({ w: 1 }, { poolSize: 1 });

client.connect(function(err, client) {
expect(err).to.be.null;

// get admin db for $currentOp
const db = client.db('admin');

db.aggregate([{ $currentOp: {} }], (err, cursor) => {
expect(err).to.be.null;

cursor.toArray((err, result) => {
expect(err).to.be.null;

expect(result[0].command.aggregate).to.equal(1);
expect(result[0].command.pipeline).to.eql([{ $currentOp: {} }]);
expect(result[0].command.cursor).to.deep.equal({});
expect(result[0].command['$db']).to.equal('admin');

client.close();
done();
});
});
});
}
});

/**
* Correctly call the aggregation framework using a pipeline expressed as an argument list.
*
Expand Down
Loading

0 comments on commit b629b21

Please sign in to comment.