Skip to content

remove async storage from couchbase #5948

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 60 additions & 51 deletions packages/datadog-instrumentations/src/couchbase.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
const { errorMonitor } = require('events')
const {
channel,
addHook,
AsyncResource
addHook
} = require('./helpers/instrument')
const shimmer = require('../../datadog-shimmer')

// these channels are for maintaining store context for their respective methods
const callbackCh = channel('apm:couchbase:query:callback')
const queryCh = channel('apm:couchbase:query')

function findCallbackIndex (args, lowerbound = 2) {
for (let i = args.length - 1; i >= lowerbound; i--) {
if (typeof args[i] === 'function') return i
Expand All @@ -25,26 +28,31 @@ function wrapAllNames (names, action) {
}

// semver >=2 <3
function wrapMaybeInvoke (_maybeInvoke) {
function wrapMaybeInvoke (_maybeInvoke, callbackCh) {
const wrapped = function (fn, args) {
if (!Array.isArray(args)) return _maybeInvoke.apply(this, arguments)

const callbackIndex = args.length - 1
const callback = args[callbackIndex]

if (typeof callback === 'function') {
args[callbackIndex] = AsyncResource.bind(callback)
args[callbackIndex] = callbackCh.runStores({}, () => {
return callback
})
}

return _maybeInvoke.apply(this, arguments)
}
return wrapped
}

function wrapQuery (query) {
function wrapQuery (query, queryCh) {
const wrapped = function (q, params, callback) {
if (typeof arguments[arguments.length - 1] === 'function') {
arguments[arguments.length - 1] = AsyncResource.bind(arguments[arguments.length - 1])
const cb = arguments[arguments.length - 1]
if (typeof cb === 'function') {
arguments[arguments.length - 1] = queryCh.runStores({}, () => {
return cb
})
}

return query.apply(this, arguments)
Expand All @@ -66,27 +74,24 @@ function wrap (prefix, fn) {

if (callbackIndex < 0) return fn.apply(this, arguments)

const callbackResource = new AsyncResource('bound-anonymous-fn')
const asyncResource = new AsyncResource('bound-anonymous-fn')

return asyncResource.runInAsyncScope(() => {
const cb = callbackResource.bind(arguments[callbackIndex])
const ctx = { bucket: { name: this.name || this._name }, seedNodes: this._dd_hosts }
return startCh.runStores(ctx, () => {
const cb = arguments[callbackIndex]

startCh.publish({ bucket: { name: this.name || this._name }, seedNodes: this._dd_hosts })

arguments[callbackIndex] = shimmer.wrapFunction(cb, cb => asyncResource.bind(function (error, result) {
arguments[callbackIndex] = shimmer.wrapFunction(cb, cb => function (error, result) {
if (error) {
errorCh.publish(error)
ctx.error = error
errorCh.publish(ctx)
}
finishCh.publish(result)
return cb.apply(this, arguments)
}))
return finishCh.runStores(ctx, cb, this, ...arguments)
})

try {
return fn.apply(this, arguments)
} catch (error) {
ctx.error = error
error.stack // trigger getting the stack at the original throwing point
errorCh.publish(error)
errorCh.publish(ctx)

throw error
}
Expand All @@ -104,36 +109,40 @@ function wrapCBandPromise (fn, name, startData, thisArg, args) {

if (!startCh.hasSubscribers) return fn.apply(thisArg, args)

const asyncResource = new AsyncResource('bound-anonymous-fn')
const callbackResource = new AsyncResource('bound-anonymous-fn')

return asyncResource.runInAsyncScope(() => {
startCh.publish(startData)

const ctx = startData
return startCh.runStores(ctx, () => {
try {
const cbIndex = findCallbackIndex(args, 1)
if (cbIndex >= 0) {
// v3 offers callback or promises event handling
// NOTE: this does not work with v3.2.0-3.2.1 cluster.query, as there is a bug in the couchbase source code
const cb = callbackResource.bind(args[cbIndex])
args[cbIndex] = shimmer.wrapFunction(cb, cb => asyncResource.bind(function (error, result) {
args[cbIndex] = shimmer.wrapFunction(args[cbIndex], cb => function (error, result) {
if (error) {
errorCh.publish(error)
ctx.error = error
errorCh.publish(ctx)
}
finishCh.publish({ result })
return cb.apply(thisArg, arguments)
}))
ctx.result = result
return finishCh.runStores(ctx, cb, thisArg, ...arguments)
})
}
const res = fn.apply(thisArg, args)

// semver >=3 will always return promise by default
res.then(
asyncResource.bind((result) => finishCh.publish({ result })),
asyncResource.bind((err) => errorCh.publish(err)))
(result) => {
ctx.result = result
finishCh.publish(ctx)
},
(err) => {
ctx.error = err
errorCh.publish(ctx)
}
)
return res
} catch (e) {
e.stack
errorCh.publish(e)
ctx.error = e
errorCh.publish(ctx)
throw e
}
})
Expand Down Expand Up @@ -164,8 +173,8 @@ addHook({ name: 'couchbase', file: 'lib/bucket.js', versions: ['^2.6.12'] }, Buc
const finishCh = channel('apm:couchbase:query:finish')
const errorCh = channel('apm:couchbase:query:error')

shimmer.wrap(Bucket.prototype, '_maybeInvoke', maybeInvoke => wrapMaybeInvoke(maybeInvoke))
shimmer.wrap(Bucket.prototype, 'query', query => wrapQuery(query))
shimmer.wrap(Bucket.prototype, '_maybeInvoke', maybeInvoke => wrapMaybeInvoke(maybeInvoke, callbackCh))
shimmer.wrap(Bucket.prototype, 'query', query => wrapQuery(query, queryCh))

shimmer.wrap(Bucket.prototype, '_n1qlReq', _n1qlReq => function (host, q, adhoc, emitter) {
if (!startCh.hasSubscribers) {
Expand All @@ -176,24 +185,24 @@ addHook({ name: 'couchbase', file: 'lib/bucket.js', versions: ['^2.6.12'] }, Buc

const n1qlQuery = getQueryResource(q)

const asyncResource = new AsyncResource('bound-anonymous-fn')
return asyncResource.runInAsyncScope(() => {
startCh.publish({ resource: n1qlQuery, bucket: { name: this.name || this._name }, seedNodes: this._dd_hosts })

emitter.once('rows', asyncResource.bind(() => {
finishCh.publish()
}))
const ctx = { resource: n1qlQuery, bucket: { name: this.name || this._name }, seedNodes: this._dd_hosts }
return startCh.runStores(ctx, () => {
emitter.once('rows', () => {
finishCh.publish(ctx)
})

emitter.once(errorMonitor, asyncResource.bind((error) => {
errorCh.publish(error)
finishCh.publish()
}))
emitter.once(errorMonitor, (error) => {
ctx.error = error
errorCh.publish(ctx)
finishCh.publish(ctx)
})

try {
return _n1qlReq.apply(this, arguments)
} catch (err) {
err.stack // trigger getting the stack at the original throwing point
errorCh.publish(err)
ctx.error = err
errorCh.publish(ctx)

throw err
}
Expand All @@ -208,8 +217,8 @@ addHook({ name: 'couchbase', file: 'lib/bucket.js', versions: ['^2.6.12'] }, Buc
})

addHook({ name: 'couchbase', file: 'lib/cluster.js', versions: ['^2.6.12'] }, Cluster => {
shimmer.wrap(Cluster.prototype, '_maybeInvoke', maybeInvoke => wrapMaybeInvoke(maybeInvoke))
shimmer.wrap(Cluster.prototype, 'query', query => wrapQuery(query))
shimmer.wrap(Cluster.prototype, '_maybeInvoke', maybeInvoke => wrapMaybeInvoke(maybeInvoke, callbackCh))
shimmer.wrap(Cluster.prototype, 'query', query => wrapQuery(query, queryCh))

shimmer.wrap(Cluster.prototype, 'openBucket', openBucket => {
return function () {
Expand Down
38 changes: 23 additions & 15 deletions packages/datadog-plugin-couchbase/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
static get id () { return 'couchbase' }
static get peerServicePrecursors () { return ['db.couchbase.seed.nodes'] }

addSubs (func, start) {
this.addSub(`apm:couchbase:${func}:start`, start)
this.addSub(`apm:couchbase:${func}:error`, error => this.addError(error))
this.addSub(`apm:couchbase:${func}:finish`, message => this.finish(message))
addBinds (func, start) {
this.addBind(`apm:couchbase:${func}:start`, start)
this.addBind(`apm:couchbase:${func}:error`, error => this.addError(error))
this.addBind(`apm:couchbase:${func}:finish`, message => this.finish(message))
}

startSpan (operation, customTags, store, { bucket, collection, seedNodes }) {
startSpan (operation, customTags, { bucket, collection, seedNodes }, ctx) {
const tags = {
'db.type': 'couchbase',
component: 'couchbase',
Expand All @@ -34,25 +34,30 @@
{
service: this.serviceName({ pluginConfig: this.config }),
meta: tags
}
},
ctx
)
}

constructor (...args) {
super(...args)

this.addSubs('query', ({ resource, bucket, seedNodes }) => {
const store = storage('legacy').getStore()
const span = this.startSpan(
'query', {
this.addBinds('query', (ctx) => {
const { resource, bucket, seedNodes } = ctx
console.log('startSpan', 'query')

Check failure on line 47 in packages/datadog-plugin-couchbase/src/index.js

View workflow job for this annotation

GitHub Actions / lint

Unexpected console statement

this.startSpan(
'query',
{
'span.type': 'sql',
'resource.name': resource,
'span.kind': this.constructor.kind
},
store,
{ bucket, seedNodes }
{ bucket, seedNodes },
ctx
)
this.enter(span, store)

return ctx.currentStore
})

this._addCommandSubs('upsert')
Expand All @@ -63,9 +68,12 @@
}

_addCommandSubs (name) {
this.addSubs(name, ({ bucket, collection, seedNodes }) => {
this.addBinds(name, (ctx) => {
const { bucket, collection, seedNodes } = ctx

const store = storage('legacy').getStore()
const span = this.startSpan(name, {}, store, { bucket, collection, seedNodes })
const span = this.startSpan(name, {}, store, { bucket, collection, seedNodes }, ctx)

this.enter(span, store)
})
}
Expand Down
2 changes: 2 additions & 0 deletions packages/dd-trace/src/plugins/outbound.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
}

bindFinish (ctx) {
console.log('bindFinish', 'query')
return ctx.parentStore

Check failure on line 31 in packages/dd-trace/src/plugins/outbound.js

View workflow job for this annotation

GitHub Actions / lint

Unexpected console statement
}

startSpan (...args) {
Expand Down Expand Up @@ -91,7 +92,8 @@
}

finish (ctx) {
console.log('finish', 'query')
const span = ctx?.currentStore?.span || this.activeSpan

Check failure on line 96 in packages/dd-trace/src/plugins/outbound.js

View workflow job for this annotation

GitHub Actions / lint

Unexpected console statement
this.tagPeerService(span)
super.finish(...arguments)
}
Expand Down
Loading