1
1
'use strict'
2
2
3
+ const { randomUUID } = require ( 'crypto' )
3
4
const { expect } = require ( 'chai' )
4
5
const semver = require ( 'semver' )
5
6
const dc = require ( 'dc-polyfill' )
@@ -13,10 +14,9 @@ const DataStreamsContext = require('../../dd-trace/src/datastreams/context')
13
14
const { computePathwayHash } = require ( '../../dd-trace/src/datastreams/pathway' )
14
15
const { ENTRY_PARENT_HASH , DataStreamsProcessor } = require ( '../../dd-trace/src/datastreams/processor' )
15
16
16
- const testTopic = 'test-topic'
17
17
const testKafkaClusterId = '5L6g3nShT-eMCtK--X86sw'
18
18
19
- const getDsmPathwayHash = ( clusterIdAvailable , isProducer , parentHash ) => {
19
+ const getDsmPathwayHash = ( testTopic , clusterIdAvailable , isProducer , parentHash ) => {
20
20
let edgeTags
21
21
if ( isProducer ) {
22
22
edgeTags = [ 'direction:out' , 'topic:' + testTopic , 'type:kafka' ]
@@ -41,18 +41,14 @@ describe('Plugin', () => {
41
41
} )
42
42
withVersions ( 'kafkajs' , 'kafkajs' , ( version ) => {
43
43
let kafka
44
+ let admin
44
45
let tracer
45
46
let Kafka
46
47
let Broker
47
48
let clusterIdAvailable
48
49
let expectedProducerHash
49
50
let expectedConsumerHash
50
-
51
- before ( ( ) => {
52
- clusterIdAvailable = semver . intersects ( version , '>=1.13' )
53
- expectedProducerHash = getDsmPathwayHash ( clusterIdAvailable , true , ENTRY_PARENT_HASH )
54
- expectedConsumerHash = getDsmPathwayHash ( clusterIdAvailable , false , expectedProducerHash )
55
- } )
51
+ let testTopic
56
52
57
53
describe ( 'without configuration' , ( ) => {
58
54
const messages = [ { key : 'key1' , value : 'test2' } ]
@@ -70,6 +66,18 @@ describe('Plugin', () => {
70
66
brokers : [ '127.0.0.1:9092' ] ,
71
67
logLevel : lib . logLevel . WARN
72
68
} )
69
+ testTopic = `test-topic-${ randomUUID ( ) } `
70
+ admin = kafka . admin ( )
71
+ await admin . createTopics ( {
72
+ topics : [ {
73
+ topic : testTopic ,
74
+ numPartitions : 1 ,
75
+ replicationFactor : 1
76
+ } ]
77
+ } )
78
+ clusterIdAvailable = semver . intersects ( version , '>=1.13' )
79
+ expectedProducerHash = getDsmPathwayHash ( testTopic , clusterIdAvailable , true , ENTRY_PARENT_HASH )
80
+ expectedConsumerHash = getDsmPathwayHash ( testTopic , clusterIdAvailable , false , expectedProducerHash )
73
81
} )
74
82
75
83
describe ( 'producer' , ( ) => {
@@ -78,7 +86,7 @@ describe('Plugin', () => {
78
86
'span.kind' : 'producer' ,
79
87
component : 'kafkajs' ,
80
88
'pathway.hash' : expectedProducerHash . readBigUInt64BE ( 0 ) . toString ( ) ,
81
- 'messaging.destination.name' : 'test-topic' ,
89
+ 'messaging.destination.name' : testTopic ,
82
90
'messaging.kafka.bootstrap.servers' : '127.0.0.1:9092'
83
91
}
84
92
if ( clusterIdAvailable ) meta [ 'kafka.cluster_id' ] = testKafkaClusterId
@@ -246,7 +254,7 @@ describe('Plugin', () => {
246
254
'span.kind' : 'consumer' ,
247
255
component : 'kafkajs' ,
248
256
'pathway.hash' : expectedConsumerHash . readBigUInt64BE ( 0 ) . toString ( ) ,
249
- 'messaging.destination.name' : 'test-topic'
257
+ 'messaging.destination.name' : testTopic
250
258
} ,
251
259
resource : testTopic ,
252
260
error : 0 ,
@@ -436,8 +444,8 @@ describe('Plugin', () => {
436
444
437
445
before ( ( ) => {
438
446
clusterIdAvailable = semver . intersects ( version , '>=1.13' )
439
- expectedProducerHash = getDsmPathwayHash ( clusterIdAvailable , true , ENTRY_PARENT_HASH )
440
- expectedConsumerHash = getDsmPathwayHash ( clusterIdAvailable , false , expectedProducerHash )
447
+ expectedProducerHash = getDsmPathwayHash ( testTopic , clusterIdAvailable , true , ENTRY_PARENT_HASH )
448
+ expectedConsumerHash = getDsmPathwayHash ( testTopic , clusterIdAvailable , false , expectedProducerHash )
441
449
} )
442
450
443
451
afterEach ( async ( ) => {
@@ -531,7 +539,11 @@ describe('Plugin', () => {
531
539
it ( 'Should add backlog on consumer explicit commit' , async ( ) => {
532
540
// Send a message, consume it, and record the last consumed offset
533
541
let commitMeta
534
- await sendMessages ( kafka , testTopic , messages )
542
+ const deferred = { }
543
+ deferred . promise = new Promise ( ( resolve , reject ) => {
544
+ deferred . resolve = resolve
545
+ deferred . reject = reject
546
+ } )
535
547
await consumer . run ( {
536
548
eachMessage : async payload => {
537
549
const { topic, partition, message } = payload
@@ -540,10 +552,12 @@ describe('Plugin', () => {
540
552
partition,
541
553
offset : Number ( message . offset )
542
554
}
555
+ deferred . resolve ( )
543
556
} ,
544
557
autoCommit : false
545
558
} )
546
- await new Promise ( resolve => setTimeout ( resolve , 50 ) ) // Let eachMessage be called
559
+ await sendMessages ( kafka , testTopic , messages )
560
+ await deferred . promise
547
561
await consumer . disconnect ( ) // Flush ongoing `eachMessage` calls
548
562
for ( const call of setOffsetSpy . getCalls ( ) ) {
549
563
expect ( call . args [ 0 ] ) . to . not . have . property ( 'type' , 'kafka_commit' )
0 commit comments