1
1
'use strict'
2
2
3
+ const { randomUUID } = require ( 'crypto' )
3
4
const sinon = require ( 'sinon' )
4
5
const { withNamingSchema, withPeerService, withVersions } = require ( '../../dd-trace/test/setup/mocha' )
5
6
const agent = require ( '../../dd-trace/test/plugins/agent' )
6
7
const { setup } = require ( './spec_helpers' )
7
8
const semver = require ( 'semver' )
8
9
const { rawExpectedSchema } = require ( './sqs-naming' )
9
-
10
- const queueName = 'SQS_QUEUE_NAME'
11
- const queueNameDSM = 'SQS_QUEUE_NAME_DSM'
12
- const queueNameDSMConsumerOnly = 'SQS_QUEUE_NAME_DSM_CONSUMER_ONLY'
10
+ const { computePathwayHash } = require ( '../../dd-trace/src/datastreams/pathway' )
11
+ const { ENTRY_PARENT_HASH } = require ( '../../dd-trace/src/datastreams/processor' )
13
12
14
13
const getQueueParams = ( queueName ) => {
15
14
return {
@@ -20,24 +19,42 @@ const getQueueParams = (queueName) => {
20
19
}
21
20
}
22
21
23
- const queueOptions = getQueueParams ( queueName )
24
- const queueOptionsDsm = getQueueParams ( queueNameDSM )
25
- const queueOptionsDsmConsumerOnly = getQueueParams ( queueNameDSMConsumerOnly )
26
-
27
22
describe ( 'Plugin' , ( ) => {
28
23
describe ( 'aws-sdk (sqs)' , function ( ) {
29
24
setup ( )
30
25
31
26
withVersions ( 'aws-sdk' , [ 'aws-sdk' , '@aws-sdk/smithy-client' ] , ( version , moduleName ) => {
32
27
let AWS
33
28
let sqs
34
- const QueueUrl = 'http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME'
35
- const QueueUrlDsm = 'http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME_DSM'
36
- const QueueUrlDsmConsumerOnly = 'http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME_DSM_CONSUMER_ONLY'
29
+ let queueName
30
+ let queueNameDSM
31
+ let queueNameDSMConsumerOnly
32
+ let queueOptions
33
+ let queueOptionsDsm
34
+ let queueOptionsDsmConsumerOnly
35
+ let QueueUrl
36
+ let QueueUrlDsm
37
+ let QueueUrlDsmConsumerOnly
37
38
let tracer
38
39
39
40
const sqsClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-sqs' : 'aws-sdk'
40
41
42
+ beforeEach ( ( ) => {
43
+ const id = randomUUID ( )
44
+
45
+ queueName = `SQS_QUEUE_NAME-${ id } `
46
+ queueNameDSM = `SQS_QUEUE_NAME_DSM-${ id } `
47
+ queueNameDSMConsumerOnly = `SQS_QUEUE_NAME_DSM_CONSUMER_ONLY-${ id } `
48
+
49
+ queueOptions = getQueueParams ( queueName )
50
+ queueOptionsDsm = getQueueParams ( queueNameDSM )
51
+ queueOptionsDsmConsumerOnly = getQueueParams ( queueNameDSMConsumerOnly )
52
+
53
+ QueueUrl = `http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME-${ id } `
54
+ QueueUrlDsm = `http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME_DSM-${ id } `
55
+ QueueUrlDsmConsumerOnly = `http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME_DSM_CONSUMER_ONLY-${ id } `
56
+ } )
57
+
41
58
describe ( 'without configuration' , ( ) => {
42
59
before ( ( ) => {
43
60
process . env . DD_DATA_STREAMS_ENABLED = 'true'
@@ -49,18 +66,20 @@ describe('Plugin', () => {
49
66
)
50
67
} )
51
68
52
- before ( done => {
69
+ before ( ( ) => {
53
70
AWS = require ( `../../../versions/${ sqsClientName } @${ version } ` ) . get ( )
54
-
55
71
sqs = new AWS . SQS ( { endpoint : 'http://127.0.0.1:4566' , region : 'us-east-1' } )
72
+ } )
73
+
74
+ beforeEach ( done => {
56
75
sqs . createQueue ( queueOptions , ( err , res ) => {
57
76
if ( err ) return done ( err )
58
77
59
78
done ( )
60
79
} )
61
80
} )
62
81
63
- after ( done => {
82
+ afterEach ( done => {
64
83
sqs . deleteQueue ( { QueueUrl } , done )
65
84
} )
66
85
@@ -75,7 +94,7 @@ describe('Plugin', () => {
75
94
MessageBody : 'test body' ,
76
95
QueueUrl
77
96
} , done ) ,
78
- 'SQS_QUEUE_NAME' ,
97
+ ( ) => queueName ,
79
98
'queuename'
80
99
)
81
100
@@ -125,7 +144,7 @@ describe('Plugin', () => {
125
144
126
145
expect ( span . resource . startsWith ( 'sendMessage' ) ) . to . equal ( true )
127
146
expect ( span . meta ) . to . include ( {
128
- queuename : 'SQS_QUEUE_NAME'
147
+ queuename : queueName
129
148
} )
130
149
131
150
parentId = span . span_id . toString ( )
@@ -164,7 +183,7 @@ describe('Plugin', () => {
164
183
165
184
expect ( span . resource . startsWith ( 'sendMessageBatch' ) ) . to . equal ( true )
166
185
expect ( span . meta ) . to . include ( {
167
- queuename : 'SQS_QUEUE_NAME'
186
+ queuename : queueName
168
187
} )
169
188
170
189
parentId = span . span_id . toString ( )
@@ -314,18 +333,20 @@ describe('Plugin', () => {
314
333
)
315
334
} )
316
335
317
- before ( done => {
336
+ before ( ( ) => {
318
337
AWS = require ( `../../../versions/${ sqsClientName } @${ version } ` ) . get ( )
319
-
320
338
sqs = new AWS . SQS ( { endpoint : 'http://127.0.0.1:4566' , region : 'us-east-1' } )
339
+ } )
340
+
341
+ beforeEach ( done => {
321
342
sqs . createQueue ( queueOptions , ( err , res ) => {
322
343
if ( err ) return done ( err )
323
344
324
345
done ( )
325
346
} )
326
347
} )
327
348
328
- after ( done => {
349
+ afterEach ( done => {
329
350
sqs . deleteQueue ( { QueueUrl } , done )
330
351
} )
331
352
@@ -345,7 +366,7 @@ describe('Plugin', () => {
345
366
} )
346
367
347
368
expect ( span . meta ) . to . include ( {
348
- queuename : 'SQS_QUEUE_NAME' ,
369
+ queuename : queueName ,
349
370
aws_service : 'SQS' ,
350
371
region : 'us-east-1'
351
372
} )
@@ -385,8 +406,8 @@ describe('Plugin', () => {
385
406
} )
386
407
387
408
describe ( 'data stream monitoring' , ( ) => {
388
- const expectedProducerHash = '4673734031235697865'
389
- const expectedConsumerHash = '9749472979704578383'
409
+ let expectedProducerHash
410
+ let expectedConsumerHash
390
411
let nowStub
391
412
392
413
before ( ( ) => {
@@ -398,40 +419,47 @@ describe('Plugin', () => {
398
419
before ( async ( ) => {
399
420
return agent . load ( 'aws-sdk' , {
400
421
sqs : {
401
- consumer : false ,
402
422
dsmEnabled : true
403
423
}
404
424
} ,
405
425
{ dsmEnabled : true } )
406
426
} )
407
427
408
- before ( done => {
428
+ before ( ( ) => {
409
429
AWS = require ( `../../../versions/${ sqsClientName } @${ version } ` ) . get ( )
410
-
411
430
sqs = new AWS . SQS ( { endpoint : 'http://127.0.0.1:4566' , region : 'us-east-1' } )
412
- sqs . createQueue ( queueOptionsDsm , ( err , res ) => {
413
- if ( err ) return done ( err )
414
-
415
- done ( )
416
- } )
417
431
} )
418
432
419
- before ( done => {
420
- AWS = require ( `../../../versions/${ sqsClientName } @${ version } ` ) . get ( )
433
+ beforeEach ( ( ) => {
434
+ const producerHash = computePathwayHash (
435
+ 'test' ,
436
+ 'tester' ,
437
+ [ 'direction:out' , 'topic:' + queueNameDSM , 'type:sqs' ] ,
438
+ ENTRY_PARENT_HASH
439
+ )
421
440
422
- sqs = new AWS . SQS ( { endpoint : 'http://127.0.0.1:4566' , region : 'us-east-1' } )
423
- sqs . createQueue ( queueOptionsDsmConsumerOnly , ( err , res ) => {
424
- if ( err ) return done ( err )
441
+ expectedProducerHash = producerHash . readBigUInt64BE ( 0 ) . toString ( )
442
+ expectedConsumerHash = computePathwayHash (
443
+ 'test' ,
444
+ 'tester' ,
445
+ [ 'direction:in' , 'topic:' + queueNameDSM , 'type:sqs' ] ,
446
+ producerHash
447
+ ) . readBigUInt64BE ( 0 ) . toString ( )
448
+ } )
425
449
426
- done ( )
427
- } )
450
+ beforeEach ( done => {
451
+ sqs . createQueue ( queueOptionsDsm , ( err , res ) => err ? done ( err ) : done ( ) )
452
+ } )
453
+
454
+ beforeEach ( done => {
455
+ sqs . createQueue ( queueOptionsDsmConsumerOnly , ( err , res ) => err ? done ( err ) : done ( ) )
428
456
} )
429
457
430
- after ( done => {
458
+ afterEach ( done => {
431
459
sqs . deleteQueue ( { QueueUrl : QueueUrlDsm } , done )
432
460
} )
433
461
434
- after ( done => {
462
+ afterEach ( done => {
435
463
sqs . deleteQueue ( { QueueUrl : QueueUrlDsmConsumerOnly } , done )
436
464
} )
437
465
@@ -502,28 +530,23 @@ describe('Plugin', () => {
502
530
if ( sqsClientName === 'aws-sdk' && semver . intersects ( version , '>=2.3' ) ) {
503
531
it ( 'Should set pathway hash tag on a span when consuming and promise() was used over a callback' ,
504
532
async ( ) => {
505
- await sqs . sendMessage ( { MessageBody : 'test DSM' , QueueUrl : QueueUrlDsm } )
506
- await sqs . receiveMessage ( { QueueUrl : QueueUrlDsm } ) . promise ( )
507
-
508
533
let consumeSpanMeta = { }
509
- return new Promise ( ( resolve , reject ) => {
510
- agent . assertSomeTraces ( traces => {
511
- const span = traces [ 0 ] [ 0 ]
534
+ const tracePromise = agent . assertSomeTraces ( traces => {
535
+ const span = traces [ 0 ] [ 0 ]
512
536
513
- if ( span . name === 'aws.request' && span . meta [ 'aws.operation' ] === 'receiveMessage' ) {
514
- consumeSpanMeta = span . meta
515
- }
537
+ if ( span . name === 'aws.request' && span . meta [ 'aws.operation' ] === 'receiveMessage' ) {
538
+ consumeSpanMeta = span . meta
539
+ }
516
540
517
- try {
518
- expect ( consumeSpanMeta ) . to . include ( {
519
- 'pathway.hash' : expectedConsumerHash
520
- } )
521
- resolve ( )
522
- } catch ( error ) {
523
- reject ( error )
524
- }
541
+ expect ( consumeSpanMeta ) . to . include ( {
542
+ 'pathway.hash' : expectedConsumerHash
525
543
} )
526
544
} )
545
+
546
+ await sqs . sendMessage ( { MessageBody : 'test DSM' , QueueUrl : QueueUrlDsm } ) . promise ( )
547
+ await sqs . receiveMessage ( { QueueUrl : QueueUrlDsm } ) . promise ( )
548
+
549
+ return tracePromise
527
550
} )
528
551
}
529
552
0 commit comments