Skip to content

Commit c4710cc

Browse files
authored
remove async storage from cassandra-driver (#5949)
remove async storage from cassandra-driver
1 parent 9099b48 commit c4710cc

File tree

4 files changed

+58
-69
lines changed

4 files changed

+58
-69
lines changed

packages/datadog-instrumentations/src/cassandra-driver.js

Lines changed: 43 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22

33
const {
44
channel,
5-
addHook,
6-
AsyncResource
5+
addHook
76
} = require('./helpers/instrument')
87
const shimmer = require('../../datadog-shimmer')
98

@@ -12,36 +11,33 @@ const finishCh = channel('apm:cassandra-driver:query:finish')
1211
const errorCh = channel('apm:cassandra-driver:query:error')
1312
const connectCh = channel('apm:cassandra-driver:query:connect')
1413

14+
let startCtx = {}
15+
1516
addHook({ name: 'cassandra-driver', versions: ['>=3.0.0'] }, cassandra => {
1617
shimmer.wrap(cassandra.Client.prototype, 'batch', batch => function (queries, options, callback) {
1718
if (!startCh.hasSubscribers) {
1819
return batch.apply(this, arguments)
1920
}
20-
const callbackResource = new AsyncResource('bound-anonymous-fn')
21-
const asyncResource = new AsyncResource('bound-anonymous-fn')
2221
const lastIndex = arguments.length - 1
23-
let cb = arguments[lastIndex]
22+
const cb = arguments[lastIndex]
2423

25-
if (typeof cb === 'function') {
26-
cb = callbackResource.bind(cb)
27-
arguments[lastIndex] = wrapCallback(finishCh, errorCh, asyncResource, cb)
28-
}
24+
startCtx = { keyspace: this.keyspace, query: queries, contactPoints: this.options && this.options.contactPoints }
25+
return startCh.runStores(startCtx, () => {
26+
if (typeof cb === 'function') {
27+
arguments[lastIndex] = wrapCallback(finishCh, errorCh, startCtx, cb)
28+
}
2929

30-
return asyncResource.runInAsyncScope(() => {
31-
const contactPoints = this.options && this.options.contactPoints
32-
startCh.publish({ keyspace: this.keyspace, query: queries, contactPoints })
3330
try {
3431
const res = batch.apply(this, arguments)
3532
if (typeof res === 'function' || !res) {
36-
return wrapCallback(finishCh, errorCh, asyncResource, res)
33+
return wrapCallback(finishCh, errorCh, startCtx, res)
3734
}
38-
const promiseAsyncResource = new AsyncResource('bound-anonymous-fn')
3935
return res.then(
40-
promiseAsyncResource.bind(() => finish(finishCh, errorCh)),
41-
promiseAsyncResource.bind(err => finish(finishCh, errorCh, err))
36+
() => finish(finishCh, errorCh, startCtx),
37+
err => finish(finishCh, errorCh, startCtx, err)
4238
)
4339
} catch (e) {
44-
finish(finishCh, errorCh, e)
40+
finish(finishCh, errorCh, startCtx, e)
4541
throw e
4642
}
4743
})
@@ -54,17 +50,13 @@ addHook({ name: 'cassandra-driver', versions: ['>=4.4'] }, cassandra => {
5450
if (!startCh.hasSubscribers) {
5551
return _execute.apply(this, arguments)
5652
}
57-
const asyncResource = new AsyncResource('bound-anonymous-fn')
58-
return asyncResource.runInAsyncScope(() => {
59-
const contactPoints = this.options && this.options.contactPoints
60-
startCh.publish({ keyspace: this.keyspace, query, contactPoints })
53+
startCtx = { keyspace: this.keyspace, query, contactPoints: this.options && this.options.contactPoints }
54+
return startCh.runStores(startCtx, () => {
6155
const promise = _execute.apply(this, arguments)
6256

63-
const promiseAsyncResource = new AsyncResource('bound-anonymous-fn')
64-
6557
promise.then(
66-
promiseAsyncResource.bind(() => finish(finishCh, errorCh)),
67-
promiseAsyncResource.bind(err => finish(finishCh, errorCh, err))
58+
() => finish(finishCh, errorCh, startCtx),
59+
err => finish(finishCh, errorCh, startCtx, err)
6860
)
6961
return promise
7062
})
@@ -82,29 +74,23 @@ addHook({ name: 'cassandra-driver', versions: ['3 - 4.3'] }, cassandra => {
8274
if (!startCh.hasSubscribers) {
8375
return _innerExecute.apply(this, arguments)
8476
}
85-
const callbackResource = new AsyncResource('bound-anonymous-fn')
86-
const asyncResource = new AsyncResource('bound-anonymous-fn')
87-
8877
if (!isValid(arguments)) {
8978
return _innerExecute.apply(this, arguments)
9079
}
9180

92-
return asyncResource.runInAsyncScope(() => {
93-
const contactPoints = this.options && this.options.contactPoints
94-
startCh.publish({ keyspace: this.keyspace, query, contactPoints })
95-
81+
startCtx = { keyspace: this.keyspace, query, contactPoints: this.options && this.options.contactPoints }
82+
return startCh.runStores(startCtx, () => {
9683
const lastIndex = arguments.length - 1
97-
let cb = arguments[lastIndex]
84+
const cb = arguments[lastIndex]
9885

9986
if (typeof cb === 'function') {
100-
cb = callbackResource.bind(cb)
101-
arguments[lastIndex] = wrapCallback(finishCh, errorCh, asyncResource, cb)
87+
arguments[lastIndex] = wrapCallback(finishCh, errorCh, startCtx, cb)
10288
}
10389

10490
try {
10591
return _innerExecute.apply(this, arguments)
10692
} catch (e) {
107-
finish(finishCh, errorCh, e)
93+
finish(finishCh, errorCh, startCtx, e)
10894
throw e
10995
}
11096
})
@@ -118,7 +104,8 @@ addHook({ name: 'cassandra-driver', versions: ['>=3.3'], file: 'lib/request-exec
118104
if (!startCh.hasSubscribers) {
119105
return _sendOnConnection.apply(this, arguments)
120106
}
121-
connectCh.publish({ hostname: this._connection.address, port: this._connection.port })
107+
startCtx = { hostname: this._connection.address, port: this._connection.port, ...startCtx }
108+
connectCh.publish(startCtx)
122109
return _sendOnConnection.apply(this, arguments)
123110
})
124111
return RequestExecution
@@ -129,19 +116,16 @@ addHook({ name: 'cassandra-driver', versions: ['3.3 - 4.3'], file: 'lib/request-
129116
if (!startCh.hasSubscribers) {
130117
return getHostCallback.apply(this, arguments)
131118
}
132-
const asyncResource = new AsyncResource('bound-anonymous-fn')
133119
const execution = this
134120

135121
if (!isRequestValid(this, arguments, 1)) {
136122
return start.apply(this, arguments)
137123
}
138124

139-
getHostCallback = asyncResource.bind(getHostCallback)
140-
141-
arguments[0] = AsyncResource.bind(function () {
142-
connectCh.publish({ hostname: execution._connection.address, port: execution._connection.port })
143-
return getHostCallback.apply(this, arguments)
144-
})
125+
arguments[0] = function () {
126+
startCtx = { hostname: execution._connection.address, port: execution._connection.port, ...startCtx }
127+
return connectCh.runStores(startCtx, getHostCallback, this, ...arguments)
128+
}
145129

146130
return start.apply(this, arguments)
147131
})
@@ -158,34 +142,33 @@ addHook({ name: 'cassandra-driver', versions: ['3 - 3.2'], file: 'lib/request-ha
158142
if (!isRequestValid(this, arguments, 3)) {
159143
return send.apply(this, arguments)
160144
}
161-
const asyncResource = new AsyncResource('bound-anonymous-fn')
162-
163-
callback = asyncResource.bind(callback)
164145

165-
arguments[2] = AsyncResource.bind(function () {
166-
connectCh.publish({ hostname: handler.connection.address, port: handler.connection.port })
167-
return callback.apply(this, arguments)
168-
})
146+
arguments[2] = function () {
147+
startCtx = { hostname: handler.connection.address, port: handler.connection.port, ...startCtx }
148+
return connectCh.runStores(startCtx, callback, this, ...arguments)
149+
}
169150

170151
return send.apply(this, arguments)
171152
})
172153
return RequestHandler
173154
})
174155

175-
function finish (finishCh, errorCh, error) {
156+
function finish (finishCh, errorCh, ctx, error) {
176157
if (error) {
177-
errorCh.publish(error)
158+
ctx.error = error
159+
errorCh.publish(ctx)
178160
}
179-
finishCh.publish()
161+
finishCh.runStores(ctx, () => {})
180162
}
181163

182-
function wrapCallback (finishCh, errorCh, asyncResource, callback) {
183-
return shimmer.wrapFunction(callback, callback => asyncResource.bind(function (err) {
184-
finish(finishCh, errorCh, err)
185-
if (callback) {
186-
return callback.apply(this, arguments)
164+
function wrapCallback (finishCh, errorCh, ctx, callback) {
165+
return shimmer.wrapFunction(callback, callback => function (err) {
166+
if (err) {
167+
ctx.error = err
168+
errorCh.publish(ctx)
187169
}
188-
}))
170+
return finishCh.runStores(ctx, callback, this, ...arguments)
171+
})
189172
}
190173

191174
function isRequestValid (exec, args, length) {

packages/datadog-plugin-cassandra-driver/src/index.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ class CassandraDriverPlugin extends DatabasePlugin {
88
static get system () { return 'cassandra' }
99
static get peerServicePrecursors () { return [CASSANDRA_CONTACT_POINTS_KEY] }
1010

11-
start ({ keyspace, query, contactPoints = {} }) {
11+
bindStart (ctx) {
12+
let { keyspace, query, contactPoints = {} } = ctx
13+
1214
if (Array.isArray(query)) {
1315
query = combine(query)
1416
}
@@ -24,7 +26,9 @@ class CassandraDriverPlugin extends DatabasePlugin {
2426
'cassandra.keyspace': keyspace,
2527
[CASSANDRA_CONTACT_POINTS_KEY]: contactPoints.join(',') || null
2628
}
27-
})
29+
}, ctx)
30+
31+
return ctx.currentStore
2832
}
2933
}
3034

packages/datadog-plugin-moleculer/src/client.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class MoleculerClientPlugin extends ClientPlugin {
3030
const endpoint = promiseCtx.endpoint || {}
3131
const node = endpoint.node || {}
3232

33-
this.addHost(node.hostname, node.port)
33+
this.addHost({ hostname: node.hostname, port: node.port })
3434

3535
span.addTags(moleculerTags(broker, promiseCtx, this.config))
3636
}

packages/dd-trace/src/plugins/outbound.js

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ class OutboundPlugin extends TracingPlugin {
2121
constructor (...args) {
2222
super(...args)
2323

24-
this.addTraceSub('connect', message => {
25-
this.connect(message)
24+
this.addTraceSub('connect', ctx => {
25+
this.connect(ctx)
2626
})
2727
}
2828

@@ -105,12 +105,14 @@ class OutboundPlugin extends TracingPlugin {
105105
}
106106
}
107107

108-
connect (url) {
109-
this.addHost(url.hostname, url.port)
108+
connect (ctx) {
109+
this.addHost(ctx)
110110
}
111111

112-
addHost (hostname, port) {
113-
const span = this.activeSpan
112+
addHost (ctx) {
113+
const { hostname, port } = ctx
114+
115+
const span = ctx?.currentStore?.span || this.activeSpan
114116

115117
if (!span) return
116118

0 commit comments

Comments
 (0)