diff --git a/packages/datadog-instrumentations/src/mongodb-core.js b/packages/datadog-instrumentations/src/mongodb-core.js index dd3ec6b2d6b..4a27e919a3a 100644 --- a/packages/datadog-instrumentations/src/mongodb-core.js +++ b/packages/datadog-instrumentations/src/mongodb-core.js @@ -148,61 +148,66 @@ function wrapCommand (command, operation, name) { return wrapped } -function instrument (operation, command, ctx, args, server, ns, ops, options = {}) { +function instrument (operation, command, instance, args, server, ns, ops, options = {}) { const name = options.name || (ops && Object.keys(ops)[0]) const index = args.length - 1 - let callback = args[index] + const callback = args[index] - if (typeof callback !== 'function') return command.apply(ctx, args) + if (typeof callback !== 'function') return command.apply(instance, args) const serverInfo = server && server.s && server.s.options - const callbackResource = new AsyncResource('bound-anonymous-fn') - const asyncResource = new AsyncResource('bound-anonymous-fn') - callback = callbackResource.bind(callback) - - return asyncResource.runInAsyncScope(() => { - startCh.publish({ ns, ops, options: serverInfo, name }) - - args[index] = shimmer.wrapFunction(callback, callback => asyncResource.bind(function (err, res) { + const ctx = { + ns, + ops, + options: serverInfo, + name + } + return startCh.runStores(ctx, () => { + args[index] = shimmer.wrapFunction(callback, callback => function (err, res) { if (err) { - errorCh.publish(err) + ctx.error = err + errorCh.publish(ctx) } - finishCh.publish() - - if (callback) { - return callback.apply(this, arguments) - } - })) + return finishCh.runStores(ctx, callback, this, ...arguments) + }) try { - return command.apply(ctx, args) + return command.apply(instance, args) } catch (err) { - errorCh.publish(err) + ctx.error = err + errorCh.publish(ctx) throw err } }) } -function instrumentPromise (operation, command, ctx, args, server, ns, ops, options = {}) { +function instrumentPromise (operation, command, instance, args, server, ns, ops, options = {}) { const name = options.name || (ops && Object.keys(ops)[0]) const serverInfo = server && server.s && server.s.options - const asyncResource = new AsyncResource('bound-anonymous-fn') - return asyncResource.runInAsyncScope(() => { - startCh.publish({ ns, ops, options: serverInfo, name }) + const ctx = { + ns, + ops, + options: serverInfo, + name + } - const promise = command.apply(ctx, args) + return startCh.runStores(ctx, () => { + const promise = command.apply(instance, args) return promise.then(function (res) { - finishCh.publish() - return res + ctx.result = res + return finishCh.runStores(ctx, () => { + return res + }) }, function (err) { - errorCh.publish(err) - finishCh.publish() + ctx.error = err + errorCh.publish(ctx) + finishCh.publish(ctx) throw err }) diff --git a/packages/datadog-instrumentations/src/mongodb.js b/packages/datadog-instrumentations/src/mongodb.js index f73aa21652a..dcc1b2ca96b 100644 --- a/packages/datadog-instrumentations/src/mongodb.js +++ b/packages/datadog-instrumentations/src/mongodb.js @@ -4,8 +4,7 @@ require('./mongodb-core') const { channel, - addHook, - AsyncResource + addHook } = require('./helpers/instrument') const shimmer = require('../../datadog-shimmer') @@ -41,19 +40,16 @@ addHook({ name: 'mongodb', versions: ['>=3.3 <5', '5', '>=6'] }, mongodb => { return method.apply(this, arguments) } - const asyncResource = new AsyncResource('bound-anonymous-fn') - - return asyncResource.runInAsyncScope(() => { - const filters = [arguments[0]] - if (useTwoArguments) { - filters.push(arguments[1]) - } + const ctx = { + filters: [arguments[0]], + methodName + } - startCh.publish({ - filters, - methodName - }) + if (useTwoArguments) { + ctx.filters.push(arguments[1]) + } + return startCh.runStores(ctx, () => { return method.apply(this, arguments) }) } diff --git a/packages/datadog-instrumentations/src/mongoose.js b/packages/datadog-instrumentations/src/mongoose.js index 8116e38380e..41ee2f9fd87 100644 --- a/packages/datadog-instrumentations/src/mongoose.js +++ b/packages/datadog-instrumentations/src/mongoose.js @@ -2,21 +2,41 @@ const { addHook, channel } = require('./helpers/instrument') const { wrapThen } = require('./helpers/promise') -const { AsyncResource } = require('./helpers/instrument') const shimmer = require('../../datadog-shimmer') +const startCh = channel('datadog:mongoose:model:filter:start') +const finishCh = channel('datadog:mongoose:model:filter:finish') +// this channel is for wrapping the callback of exec methods and handling store context, it doesn't have any subscribers +const callbackCh = channel('datadog:mongoose:model:exec:callback') + function wrapAddQueue (addQueue) { - return function addQueueWithTrace (name) { + return function (name, args, options) { if (typeof name === 'function') { - arguments[0] = AsyncResource.bind(name) - } else if (typeof this[name] === 'function') { - arguments[0] = AsyncResource.bind((...args) => this[name](...args)) + const ctx = {} + arguments[0] = callbackCh.runStores(ctx, addQueue, this, ...arguments) } - return addQueue.apply(this, arguments) } } +function wrapExec (exec) { + return function (op, callback) { + if (typeof op === 'function') { + callback = op + op = undefined + } + + if (typeof callback === 'function') { + const ctx = {} + const bound = callbackCh.runStores(ctx, callback, this, ...arguments) + + return op === undefined ? exec.call(this, bound) : exec.call(this, op, bound) + } + + return exec.apply(this, arguments) + } +} + addHook({ name: 'mongoose', versions: ['>=4.6.4 <5', '5', '6', '>=7'] @@ -26,14 +46,21 @@ addHook({ shimmer.wrap(mongoose.Promise.prototype, 'then', wrapThen) } - shimmer.wrap(mongoose.Collection.prototype, 'addQueue', wrapAddQueue) + if (mongoose.Query) { + shimmer.wrap(mongoose.Query.prototype, 'exec', wrapExec) + } + + if (mongoose.Aggregate) { + shimmer.wrap(mongoose.Aggregate.prototype, 'exec', wrapExec) + } + + if (mongoose.Collection) { + shimmer.wrap(mongoose.Collection.prototype, 'addQueue', wrapAddQueue) + } return mongoose }) -const startCh = channel('datadog:mongoose:model:filter:start') -const finishCh = channel('datadog:mongoose:model:filter:finish') - const collectionMethodsWithFilter = [ 'count', 'countDocuments', @@ -68,27 +95,21 @@ addHook({ return method.apply(this, arguments) } - const asyncResource = new AsyncResource('bound-anonymous-fn') - const filters = [arguments[0]] if (useTwoArguments) { filters.push(arguments[1]) } - const finish = asyncResource.bind(function () { - finishCh.publish() - }) - let callbackWrapped = false - const wrapCallbackIfExist = (args) => { + const wrapCallbackIfExist = (args, ctx) => { const lastArgumentIndex = args.length - 1 if (typeof args[lastArgumentIndex] === 'function') { // is a callback, wrap it to execute finish() shimmer.wrap(args, lastArgumentIndex, originalCb => { return function () { - finish() + finishCh.publish(ctx) return originalCb.apply(this, arguments) } @@ -98,13 +119,13 @@ addHook({ } } - wrapCallbackIfExist(arguments) + const ctx = { + filters, + methodName + } - return asyncResource.runInAsyncScope(() => { - startCh.publish({ - filters, - methodName - }) + return startCh.runStores(ctx, () => { + wrapCallbackIfExist(arguments, ctx) const res = method.apply(this, arguments) @@ -129,7 +150,7 @@ addHook({ const reject = arguments[1] arguments[0] = shimmer.wrapFunction(resolve, resolve => function wrappedResolve () { - finish() + finishCh.publish(ctx) if (resolve) { return resolve.apply(this, arguments) @@ -137,7 +158,7 @@ addHook({ }) arguments[1] = shimmer.wrapFunction(reject, reject => function wrappedReject () { - finish() + finishCh.publish(ctx) if (reject) { return reject.apply(this, arguments) diff --git a/packages/datadog-plugin-mongodb-core/src/index.js b/packages/datadog-plugin-mongodb-core/src/index.js index d9d7330baaa..e672a88ac81 100644 --- a/packages/datadog-plugin-mongodb-core/src/index.js +++ b/packages/datadog-plugin-mongodb-core/src/index.js @@ -24,7 +24,9 @@ class MongodbCorePlugin extends DatabasePlugin { ) } - start ({ ns, ops, options = {}, name }) { + bindStart (ctx) { + const { ns, ops, options = {}, name } = ctx + // heartbeat commands can be disabled if this.config.heartbeatEnabled is false if (!this.config.heartbeatEnabled && isHeartbeat(ops, this.config)) { return @@ -44,11 +46,13 @@ class MongodbCorePlugin extends DatabasePlugin { 'out.host': options.host, 'out.port': options.port } - }) + }, ctx) const comment = this.injectDbmComment(span, ops.comment, service) if (comment) { ops.comment = comment } + + return ctx.currentStore } getPeerService (tags) {