Skip to content

Commit c2a1f8c

Browse files
authored
[SVLS-7133] Instrument Azure Service Bus (#6067)
* initial * plugin loading * full distributed trace * all span attributes set correctly * add consumer tests * update test * finalize service bus test * remove qpid from service bus test * instrument and test topics * remove localhost from connectionstring * lint * try removing localhost * revert localhost removal * try to use newer node versions * revert node versioning for integration tests * fix azure-functions test * fix sql server & lint * try actual name of sql service * clean up azure functions * formatting * lint
1 parent c67baf6 commit c2a1f8c

File tree

19 files changed

+387
-308
lines changed

19 files changed

+387
-308
lines changed

.github/workflows/apm-integrations.yml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,52 @@ jobs:
187187

188188
azure-functions:
189189
runs-on: ubuntu-latest
190+
services:
191+
azureservicebusemulator:
192+
image: mcr.microsoft.com/azure-messaging/servicebus-emulator:1.1.2
193+
ports:
194+
- "127.0.0.1:5672:5672"
195+
- "127.0.0.1:5300:5300"
196+
env:
197+
ACCEPT_EULA: "Y"
198+
MSSQL_SA_PASSWORD: "Localtestpass1!"
199+
SQL_SERVER: azuresqledge
200+
azuresqledge:
201+
image: mcr.microsoft.com/azure-sql-edge:1.0.7
202+
ports:
203+
- "127.0.0.1:1433:1433"
204+
env:
205+
ACCEPT_EULA: "Y"
206+
MSSQL_SA_PASSWORD: "Localtestpass1!"
190207
env:
191208
PLUGINS: azure-functions
209+
SERVICES: azureservicebusemulator,azuresqledge
210+
steps:
211+
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
212+
- uses: ./.github/actions/plugins/test
213+
214+
azure-service-bus:
215+
runs-on: ubuntu-latest
216+
services:
217+
azureservicebusemulator:
218+
image: mcr.microsoft.com/azure-messaging/servicebus-emulator:1.1.2
219+
ports:
220+
- "127.0.0.1:5672:5672"
221+
- "127.0.0.1:5300:5300"
222+
env:
223+
ACCEPT_EULA: "Y"
224+
MSSQL_SA_PASSWORD: "Localtestpass1!"
225+
SQL_SERVER: azuresqledge
226+
azuresqledge:
227+
image: mcr.microsoft.com/azure-sql-edge:1.0.7
228+
ports:
229+
- "127.0.0.1:1433:1433"
230+
env:
231+
ACCEPT_EULA: "Y"
232+
MSSQL_SA_PASSWORD: "Localtestpass1!"
233+
env:
234+
PLUGINS: azure-service-bus
235+
SERVICES: azureservicebusemulator,azuresqledge
192236
steps:
193237
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
194238
- uses: ./.github/actions/plugins/test

docker-compose.yml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,23 @@ services:
33
image: aerospike:ce-6.4.0.3
44
ports:
55
- "127.0.0.1:3000-3002:3000-3002"
6+
azureservicebusemulator:
7+
image: mcr.microsoft.com/azure-messaging/servicebus-emulator:1.1.2
8+
ports:
9+
- "127.0.0.1:5672:5672"
10+
- "127.0.0.1:5300:5300"
11+
environment:
12+
ACCEPT_EULA: "Y"
13+
MSSQL_SA_PASSWORD: "Localtestpass1!"
14+
SQL_SERVER: azuresqledge
15+
azuresqledge:
16+
image: mcr.microsoft.com/azure-sql-edge:1.0.7
17+
platform: linux/amd64
18+
ports:
19+
- "127.0.0.1:1433:1433"
20+
environment:
21+
ACCEPT_EULA: "Y"
22+
MSSQL_SA_PASSWORD: "Localtestpass1!"
623
couchbase:
724
image: ghcr.io/datadog/couchbase-server-sandbox:latest
825
ports:

index.d.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ interface Plugins {
167167
"avsc": tracer.plugins.avsc;
168168
"aws-sdk": tracer.plugins.aws_sdk;
169169
"azure-functions": tracer.plugins.azure_functions;
170+
"azure-service-bus": tracer.plugins.azure_service_bus;
170171
"bunyan": tracer.plugins.bunyan;
171172
"cassandra-driver": tracer.plugins.cassandra_driver;
172173
"child_process": tracer.plugins.child_process;
@@ -1381,6 +1382,11 @@ declare namespace tracer {
13811382
*/
13821383
interface azure_functions extends Instrumentation {}
13831384

1385+
/**
1386+
* This plugin automatically instruments the
1387+
* @azure/service-bus module
1388+
*/
1389+
interface azure_service_bus extends Integration {}
13841390
/**
13851391
* This plugin patches the [bunyan](https://github.com/trentm/node-bunyan)
13861392
* to automatically inject trace identifiers in log records when the

packages/datadog-instrumentations/src/azure-functions.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,18 @@ const azureFunctionsChannel = dc.tracingChannel('datadog:azure:functions:invoke'
1111
addHook({ name: '@azure/functions', versions: ['>=4'] }, azureFunction => {
1212
const { app } = azureFunction
1313

14+
// Http triggers
1415
shimmer.wrap(app, 'deleteRequest', wrapHandler)
1516
shimmer.wrap(app, 'http', wrapHandler)
1617
shimmer.wrap(app, 'get', wrapHandler)
1718
shimmer.wrap(app, 'patch', wrapHandler)
1819
shimmer.wrap(app, 'post', wrapHandler)
1920
shimmer.wrap(app, 'put', wrapHandler)
2021

22+
// Service Bus triggers
23+
shimmer.wrap(app, 'serviceBusQueue', wrapHandler)
24+
shimmer.wrap(app, 'serviceBusTopic', wrapHandler)
25+
2126
return azureFunction
2227
})
2328

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
'use strict'
2+
3+
const {
4+
channel,
5+
addHook
6+
} = require('./helpers/instrument')
7+
8+
const shimmer = require('../../datadog-shimmer')
9+
10+
const producerStartCh = channel('apm:azure-service-bus:send:start')
11+
const producerErrorCh = channel('apm:azure-service-bus:send:error')
12+
const producerFinishCh = channel('apm:azure-service-bus:send:finish')
13+
14+
addHook({ name: '@azure/service-bus', versions: ['>=7.9.2'] }, (obj) => {
15+
const ServiceBusClient = obj.ServiceBusClient
16+
shimmer.wrap(ServiceBusClient.prototype, 'createSender', createSender => function (queueOrTopicName) {
17+
const sender = createSender.apply(this, arguments)
18+
shimmer.wrap(sender._sender, 'send', send => function (msg) {
19+
const ctx = { sender, msg }
20+
return producerStartCh.runStores(ctx, () => {
21+
return send.apply(this, arguments)
22+
.then(
23+
response => {
24+
producerFinishCh.publish(ctx)
25+
},
26+
error => {
27+
ctx.error = error
28+
producerErrorCh.publish(ctx)
29+
producerFinishCh.publish(ctx)
30+
throw error
31+
}
32+
)
33+
})
34+
})
35+
return sender
36+
})
37+
return obj
38+
})

packages/datadog-instrumentations/src/helpers/hooks.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ module.exports = {
66
'apollo-server-core': () => require('../apollo-server-core'),
77
'@aws-sdk/smithy-client': () => require('../aws-sdk'),
88
'@azure/functions': () => require('../azure-functions'),
9+
'@azure/service-bus': () => require('../azure-service-bus'),
910
'@cucumber/cucumber': () => require('../cucumber'),
1011
'@playwright/test': () => require('../playwright'),
1112
'@elastic/elasticsearch': () => require('../elasticsearch'),
Lines changed: 57 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
'use strict'
22

33
const TracingPlugin = require('../../dd-trace/src/plugins/tracing')
4-
const { storage } = require('../../datadog-core')
54
const serverless = require('../../dd-trace/src/plugins/util/serverless')
65
const web = require('../../dd-trace/src/plugins/util/web')
76

@@ -11,7 +10,9 @@ const triggerMap = {
1110
get: 'Http',
1211
patch: 'Http',
1312
post: 'Http',
14-
put: 'Http'
13+
put: 'Http',
14+
serviceBusQueue: 'ServiceBus',
15+
serviceBusTopic: 'ServiceBus',
1516
}
1617

1718
class AzureFunctionsPlugin extends TracingPlugin {
@@ -22,24 +23,16 @@ class AzureFunctionsPlugin extends TracingPlugin {
2223
static get prefix () { return 'tracing:datadog:azure:functions:invoke' }
2324

2425
bindStart (ctx) {
25-
const { functionName, methodName, httpRequest } = ctx
26-
const store = storage('legacy').getStore()
27-
// httpRequest.headers is a map
28-
const childOf = this._tracer.extract('http_headers', Object.fromEntries(httpRequest.headers))
26+
const childOf = extractTraceContext(this._tracer, ctx)
27+
const meta = getMetaForTrigger(ctx)
2928
const span = this.startSpan(this.operationName(), {
3029
childOf,
3130
service: this.serviceName(),
3231
type: 'serverless',
33-
meta: {
34-
'aas.function.name': functionName,
35-
'aas.function.trigger': mapTriggerTag(methodName)
36-
}
37-
}, false)
32+
meta,
33+
}, ctx)
3834

3935
ctx.span = span
40-
ctx.parentStore = store
41-
ctx.currentStore = { ...store, span }
42-
4336
return ctx.currentStore
4437
}
4538

@@ -49,30 +42,66 @@ class AzureFunctionsPlugin extends TracingPlugin {
4942
}
5043

5144
asyncEnd (ctx) {
52-
const { httpRequest, result = {} } = ctx
53-
const path = (new URL(httpRequest.url)).pathname
54-
const req = {
55-
method: httpRequest.method,
56-
headers: Object.fromEntries(httpRequest.headers),
57-
url: path
58-
}
59-
60-
const context = web.patch(req)
61-
context.config = this.config
62-
context.paths = [path]
63-
context.res = { statusCode: result.status }
64-
context.span = ctx.currentStore.span
45+
const { httpRequest, methodName, result = {} } = ctx
46+
if (triggerMap[methodName] === 'Http') {
47+
// If the method is an HTTP trigger, we need to patch the request and finish the span
48+
const path = (new URL(httpRequest.url)).pathname
49+
const req = {
50+
method: httpRequest.method,
51+
headers: Object.fromEntries(httpRequest.headers),
52+
url: path
53+
}
54+
const context = web.patch(req)
55+
context.config = this.config
56+
context.paths = [path]
57+
context.res = { statusCode: result.status }
58+
context.span = ctx.currentStore.span
6559

66-
serverless.finishSpan(context)
60+
serverless.finishSpan(context)
61+
// Fallback for other trigger types
62+
} else {
63+
super.finish()
64+
}
6765
}
6866

6967
configure (config) {
7068
return super.configure(web.normalizeConfig(config))
7169
}
7270
}
7371

72+
function getMetaForTrigger ({ functionName, methodName, invocationContext }) {
73+
let meta = {
74+
'aas.function.name': functionName,
75+
'aas.function.trigger': mapTriggerTag(methodName)
76+
}
77+
78+
if (triggerMap[methodName] === 'ServiceBus') {
79+
const triggerEntity = invocationContext.options.trigger.queueName || invocationContext.options.trigger.topicName
80+
meta = {
81+
...meta,
82+
'messaging.message_id': invocationContext.triggerMetadata.messageId,
83+
'messaging.operation': 'receive',
84+
'messaging.system': 'servicebus',
85+
'messaging.destination.name': triggerEntity,
86+
'resource.name': `ServiceBus ${functionName}`,
87+
'span.kind': 'consumer'
88+
}
89+
}
90+
91+
return meta
92+
}
93+
7494
function mapTriggerTag (methodName) {
7595
return triggerMap[methodName] || 'Unknown'
7696
}
7797

98+
function extractTraceContext (tracer, ctx) {
99+
switch (String(triggerMap[ctx.methodName])) {
100+
case 'Http':
101+
return tracer.extract('http_headers', Object.fromEntries(ctx.httpRequest.headers))
102+
case 'ServiceBus':
103+
return tracer.extract('text_map', ctx.invocationContext.triggerMetadata.applicationProperties)
104+
}
105+
}
106+
78107
module.exports = AzureFunctionsPlugin

packages/datadog-plugin-azure-functions/test/integration-test/client.spec.js

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,13 @@ describe('esm', () => {
2020
withVersions('azure-functions', '@azure/functions', NODE_MAJOR < 20 ? '<4.7.3' : '*', version => {
2121
before(async function () {
2222
this.timeout(50000)
23-
sandbox = await createSandbox([`@azure/functions@${version}`, 'azure-functions-core-tools@4'], false,
24-
['./packages/datadog-plugin-azure-functions/test/integration-test/fixtures/*'])
23+
sandbox = await createSandbox([
24+
`@azure/functions@${version}`,
25+
'azure-functions-core-tools@4',
26+
'@azure/service-bus@7.9.2'
27+
],
28+
false,
29+
['./packages/datadog-plugin-azure-functions/test/integration-test/fixtures/*'])
2530
})
2631

2732
after(async function () {
@@ -54,15 +59,51 @@ describe('esm', () => {
5459
})
5560
}).timeout(50000)
5661

57-
it('propagates context to child requests', async () => {
62+
it('propagates context to child http requests', async () => {
5863
const envArgs = {
5964
PATH: `${sandbox.folder}/node_modules/azure-functions-core-tools/bin:${process.env.PATH}`
6065
}
6166
proc = await spawnPluginIntegrationTestProc(sandbox.folder, 'func', ['start'], agent.port, undefined, envArgs)
6267

6368
return curlAndAssertMessage(agent, 'http://127.0.0.1:7071/api/httptest2', ({ headers, payload }) => {
6469
assert.strictEqual(payload.length, 2)
65-
assert.strictEqual(payload[0][0].parent_id, payload[1][1].span_id)
70+
assert.strictEqual(payload[1][0].span_id, payload[1][1].parent_id)
71+
})
72+
}).timeout(50000)
73+
74+
it('propagates context through a service bus queue', async () => {
75+
const envArgs = {
76+
PATH: `${sandbox.folder}/node_modules/azure-functions-core-tools/bin:${process.env.PATH}`
77+
}
78+
proc = await spawnPluginIntegrationTestProc(sandbox.folder, 'func', ['start'], agent.port, undefined, envArgs)
79+
80+
return curlAndAssertMessage(agent, 'http://127.0.0.1:7071/api/httptest3', ({ headers, payload }) => {
81+
assert.strictEqual(payload.length, 3)
82+
assert.strictEqual(payload[1][1].span_id, payload[2][0].parent_id)
83+
assert.strictEqual(payload[2][0].name, 'azure.functions.invoke')
84+
assert.strictEqual(payload[2][0].resource, 'ServiceBus queueTest')
85+
assert.strictEqual(payload[2][0].meta['messaging.destination.name'], 'queue.1')
86+
assert.strictEqual(payload[2][0].meta['messaging.operation'], 'receive')
87+
assert.strictEqual(payload[2][0].meta['messaging.system'], 'servicebus')
88+
assert.strictEqual(payload[2][0].meta['span.kind'], 'consumer')
89+
})
90+
}).timeout(50000)
91+
92+
it('propagates context through a service bus topic', async () => {
93+
const envArgs = {
94+
PATH: `${sandbox.folder}/node_modules/azure-functions-core-tools/bin:${process.env.PATH}`
95+
}
96+
proc = await spawnPluginIntegrationTestProc(sandbox.folder, 'func', ['start'], agent.port, undefined, envArgs)
97+
98+
return curlAndAssertMessage(agent, 'http://127.0.0.1:7071/api/httptest4', ({ headers, payload }) => {
99+
assert.strictEqual(payload.length, 3)
100+
assert.strictEqual(payload[1][1].span_id, payload[2][0].parent_id)
101+
assert.strictEqual(payload[2][0].name, 'azure.functions.invoke')
102+
assert.strictEqual(payload[2][0].resource, 'ServiceBus topicTest')
103+
assert.strictEqual(payload[2][0].meta['messaging.destination.name'], 'topic.1')
104+
assert.strictEqual(payload[2][0].meta['messaging.operation'], 'receive')
105+
assert.strictEqual(payload[2][0].meta['messaging.system'], 'servicebus')
106+
assert.strictEqual(payload[2][0].meta['span.kind'], 'consumer')
66107
})
67108
}).timeout(50000)
68109
})

packages/datadog-plugin-azure-functions/test/integration-test/fixtures/local.settings.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"Values": {
44
"FUNCTIONS_WORKER_RUNTIME": "node",
55
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
6-
"AzureWebJobsStorage": ""
6+
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
7+
"MyServiceBus": "Endpoint=sb://127.0.0.1;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
78
}
89
}

packages/datadog-plugin-azure-functions/test/integration-test/fixtures/package.json

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,5 @@
55
"main": "src/functions/server.mjs",
66
"scripts": {
77
"start": "func start"
8-
},
9-
"dependencies": {
10-
"@azure/functions": "^4.6.0"
11-
},
12-
"devDependencies": {
13-
"azure-functions-core-tools": "^4.x"
148
}
159
}

0 commit comments

Comments
 (0)