Skip to content

Commit b029913

Browse files
rochdevwatson
authored andcommitted
test: update confluent kafka tests to use dynamic topic (#6079)
1 parent 61fbe34 commit b029913

File tree

2 files changed

+39
-24
lines changed
  • packages

2 files changed

+39
-24
lines changed

packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
'use strict'
22

3+
const { randomUUID } = require('crypto')
34
const { expect } = require('chai')
45
const agent = require('../../dd-trace/test/plugins/agent')
56
const { expectSomeSpan, withDefaults } = require('../../dd-trace/test/plugins/helpers')
@@ -10,14 +11,12 @@ const DataStreamsContext = require('../../dd-trace/src/datastreams/context')
1011
const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway')
1112
const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor')
1213

13-
const testTopic = 'test-topic'
14-
15-
const getDsmPathwayHash = (isProducer, parentHash) => {
14+
const getDsmPathwayHash = (testTopic, isProducer, parentHash) => {
1615
let edgeTags
1716
if (isProducer) {
1817
edgeTags = ['direction:out', 'topic:' + testTopic, 'type:kafka']
1918
} else {
20-
edgeTags = ['direction:in', 'group:test-group', 'topic:' + testTopic, 'type:kafka']
19+
edgeTags = ['direction:in', 'group:test-group-confluent', 'topic:' + testTopic, 'type:kafka']
2120
}
2221

2322
edgeTags.sort()
@@ -26,6 +25,7 @@ const getDsmPathwayHash = (isProducer, parentHash) => {
2625

2726
describe('Plugin', () => {
2827
const module = '@confluentinc/kafka-javascript'
28+
const groupId = 'test-group-confluent'
2929

3030
describe('confluentinc-kafka-javascript', function () {
3131
this.timeout(30000)
@@ -36,11 +36,13 @@ describe('Plugin', () => {
3636

3737
withVersions('confluentinc-kafka-javascript', module, (version) => {
3838
let kafka
39+
let admin
3940
let tracer
4041
let Kafka
4142
let ConfluentKafka
4243
let messages
4344
let nativeApi
45+
let testTopic
4446

4547
describe('without configuration', () => {
4648
beforeEach(async () => {
@@ -60,11 +62,24 @@ describe('Plugin', () => {
6062
kafka = new Kafka({
6163
kafkaJS: {
6264
clientId: `kafkajs-test-${version}`,
63-
brokers: ['127.0.0.1:9092']
65+
brokers: ['127.0.0.1:9092'],
66+
logLevel: ConfluentKafka.logLevel.WARN
6467
}
6568
})
69+
testTopic = `test-topic-${randomUUID()}`
70+
admin = kafka.admin()
71+
await admin.connect()
72+
await admin.createTopics({
73+
topics: [{
74+
topic: testTopic,
75+
numPartitions: 1,
76+
replicationFactor: 1
77+
}]
78+
})
6679
})
6780

81+
afterEach(() => admin.disconnect())
82+
6883
describe('kafkaJS api', () => {
6984
describe('producer', () => {
7085
it('should be instrumented', async () => {
@@ -74,7 +89,7 @@ describe('Plugin', () => {
7489
meta: {
7590
'span.kind': 'producer',
7691
component: 'confluentinc-kafka-javascript',
77-
'messaging.destination.name': 'test-topic',
92+
'messaging.destination.name': testTopic,
7893
'messaging.kafka.bootstrap.servers': '127.0.0.1:9092'
7994
},
8095
metrics: {
@@ -125,7 +140,7 @@ describe('Plugin', () => {
125140
beforeEach(async () => {
126141
messages = [{ key: 'key1', value: 'test2' }]
127142
consumer = kafka.consumer({
128-
kafkaJS: { groupId: 'test-group' }
143+
kafkaJS: { groupId, fromBeginning: true, autoCommit: false }
129144
})
130145
await consumer.connect()
131146
await consumer.subscribe({ topic: testTopic })
@@ -142,7 +157,7 @@ describe('Plugin', () => {
142157
meta: {
143158
'span.kind': 'consumer',
144159
component: 'confluentinc-kafka-javascript',
145-
'messaging.destination.name': 'test-topic'
160+
'messaging.destination.name': testTopic
146161
},
147162
resource: testTopic,
148163
error: 0,
@@ -151,7 +166,7 @@ describe('Plugin', () => {
151166

152167
const consumerReceiveMessagePromise = new Promise(resolve => {
153168
consumer.run({
154-
eachMessage: async () => {
169+
eachMessage: () => {
155170
resolve()
156171
}
157172
})
@@ -221,7 +236,7 @@ describe('Plugin', () => {
221236
[ERROR_STACK]: fakeError.stack,
222237
'span.kind': 'consumer',
223238
component: 'confluentinc-kafka-javascript',
224-
'messaging.destination.name': 'test-topic'
239+
'messaging.destination.name': testTopic
225240
},
226241
resource: testTopic,
227242
error: 1,
@@ -344,7 +359,10 @@ describe('Plugin', () => {
344359
beforeEach(async () => {
345360
nativeConsumer = new Consumer({
346361
'bootstrap.servers': '127.0.0.1:9092',
347-
'group.id': 'test-group'
362+
'group.id': groupId,
363+
'enable.auto.commit': false,
364+
}, {
365+
'auto.offset.reset': 'earliest'
348366
})
349367

350368
await new Promise((resolve, reject) => {
@@ -491,15 +509,15 @@ describe('Plugin', () => {
491509
tracer.use('confluentinc-kafka-javascript', { dsmEnabled: true })
492510
messages = [{ key: 'key1', value: 'test2' }]
493511
consumer = kafka.consumer({
494-
kafkaJS: { groupId: 'test-group', fromBeginning: false }
512+
kafkaJS: { groupId, fromBeginning: true }
495513
})
496514
await consumer.connect()
497515
await consumer.subscribe({ topic: testTopic })
498516
})
499517

500-
before(() => {
501-
expectedProducerHash = getDsmPathwayHash(true, ENTRY_PARENT_HASH)
502-
expectedConsumerHash = getDsmPathwayHash(false, expectedProducerHash)
518+
beforeEach(() => {
519+
expectedProducerHash = getDsmPathwayHash(testTopic, true, ENTRY_PARENT_HASH)
520+
expectedConsumerHash = getDsmPathwayHash(testTopic, false, expectedProducerHash)
503521
})
504522

505523
afterEach(async () => {
@@ -617,24 +635,22 @@ describe('Plugin', () => {
617635
partition,
618636
offset: Number(message.offset)
619637
}
620-
// Signal that we've processed a message
621638
messageProcessedResolve()
622639
}
623640
})
624641

625-
consumerRunPromise.catch(() => {})
642+
await consumerRunPromise
626643

627644
// wait for the message to be processed before continuing
628-
await sendMessages(kafka, testTopic, messages).then(
629-
async () => await messageProcessedPromise
630-
)
645+
await sendMessages(kafka, testTopic, messages)
646+
await messageProcessedPromise
631647

632648
for (const call of setOffsetSpy.getCalls()) {
633649
expect(call.args[0]).to.not.have.property('type', 'kafka_commit')
634650
}
635651

636652
const newConsumer = kafka.consumer({
637-
kafkaJS: { groupId: 'test-group', autoCommit: false }
653+
kafkaJS: { groupId, fromBeginning: true, autoCommit: false }
638654
})
639655
await newConsumer.connect()
640656
await sendMessages(kafka, testTopic, [{ key: 'key1', value: 'test2' }])
@@ -648,12 +664,11 @@ describe('Plugin', () => {
648664

649665
// Check our work
650666
const runArg = setOffsetSpy.lastCall.args[0]
651-
expect(setOffsetSpy).to.be.calledOnce
652667
expect(runArg).to.have.property('offset', commitMeta.offset)
653668
expect(runArg).to.have.property('partition', commitMeta.partition)
654669
expect(runArg).to.have.property('topic', commitMeta.topic)
655670
expect(runArg).to.have.property('type', 'kafka_commit')
656-
expect(runArg).to.have.property('consumer_group', 'test-group')
671+
expect(runArg).to.have.property('consumer_group', groupId)
657672
})
658673

659674
it('Should add backlog on producer response', async () => {

packages/dd-trace/test/setup/services/kafka.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ const kafka = new Kafka({
99
})
1010
const admin = kafka.admin()
1111
const producer = kafka.producer()
12-
const consumer = kafka.consumer({ groupId: 'test-group' })
12+
const consumer = kafka.consumer({ groupId: 'setup-group' })
1313
const topic = 'test-topic'
1414
const messages = [{ key: 'setup', value: 'test' }]
1515

0 commit comments

Comments
 (0)