1
+ namespace NServiceBus . Metrics . AcceptanceTests
2
+ {
3
+ using System ;
4
+ using System . Collections . Concurrent ;
5
+ using System . Collections . Generic ;
6
+ using System . Linq ;
7
+ using System . Threading ;
8
+ using System . Threading . Tasks ;
9
+ using AcceptanceTesting ;
10
+ using Extensibility ;
11
+ using Features ;
12
+ using global ::Newtonsoft . Json . Linq ;
13
+ using NServiceBus . AcceptanceTests ;
14
+ using NServiceBus . AcceptanceTests . EndpointTemplates ;
15
+ using NUnit . Framework ;
16
+ using ObjectBuilder ;
17
+ using Pipeline ;
18
+ using Routing ;
19
+ using Transport ;
20
+
21
+ public class When_publishing_message : NServiceBusAcceptanceTest
22
+ {
23
+ static Guid HostId = Guid . NewGuid ( ) ;
24
+
25
+ [ Test ]
26
+ public async Task Should_enhance_it_with_queue_length_properties ( )
27
+ {
28
+ var context = await Scenario . Define < Context > ( )
29
+ . WithEndpoint < Publisher > ( c => c . When ( ctx => ctx . SubscriptionCount == 2 , async s =>
30
+ {
31
+ await s . Publish ( new TestEventMessage1 ( ) ) ;
32
+ await s . Publish ( new TestEventMessage1 ( ) ) ;
33
+ await s . Publish ( new TestEventMessage2 ( ) ) ;
34
+ await s . Publish ( new TestEventMessage2 ( ) ) ;
35
+ } ) )
36
+ . WithEndpoint < Subscriber > ( b => b . When ( async ( session , c ) =>
37
+ {
38
+ await session . Subscribe < TestEventMessage1 > ( ) ;
39
+ await session . Subscribe < TestEventMessage2 > ( ) ;
40
+ } ) )
41
+ . Done ( c => c . Headers1 . Count == 2 && c . Headers2 . Count == 2 )
42
+ . Run ( )
43
+ . ConfigureAwait ( false ) ;
44
+
45
+ var sessionIds = new [ ] { AssertHeaders ( context . Headers1 ) , AssertHeaders ( context . Headers2 ) } ;
46
+
47
+ var data = JObject . Parse ( context . Data ) ;
48
+ var counters = ( JArray ) data [ "Counters" ] ;
49
+ var counterTokens = counters . Where ( c => c . Value < string > ( "Name" ) . StartsWith ( "Sent sequence for" ) ) ;
50
+
51
+ foreach ( var counter in counterTokens )
52
+ {
53
+ var tags = counter [ "Tags" ] . ToObject < string [ ] > ( ) ;
54
+ var counterBasedKey = tags . GetTagValue ( "key" ) ;
55
+ var type = tags . GetTagValue ( "type" ) ;
56
+
57
+ CollectionAssert . Contains ( sessionIds , counterBasedKey ) ;
58
+ Assert . AreEqual ( 2 , counter . Value < int > ( "Count" ) ) ;
59
+ Assert . AreEqual ( "queue-length.sent" , type ) ;
60
+ }
61
+ }
62
+
63
+ static string AssertHeaders ( IProducerConsumerCollection < IReadOnlyDictionary < string , string > > oneReceiverHeaders )
64
+ {
65
+ const string keyHeader = "NServiceBus.Metrics.QueueLength.Key" ;
66
+ const string valueHeader = "NServiceBus.Metrics.QueueLength.Value" ;
67
+
68
+ var headers = oneReceiverHeaders . ToArray ( ) ;
69
+
70
+ var sessionKey1 = headers [ 0 ] [ keyHeader ] ;
71
+ var sessionKey2 = headers [ 1 ] [ keyHeader ] ;
72
+
73
+ var sequence1 = long . Parse ( headers [ 0 ] [ valueHeader ] ) ;
74
+ var sequence2 = long . Parse ( headers [ 1 ] [ valueHeader ] ) ;
75
+
76
+ Assert . AreEqual ( sessionKey1 , sessionKey2 ) ;
77
+ Assert . AreEqual ( 1 , sequence1 ) ;
78
+ Assert . AreEqual ( 2 , sequence2 ) ;
79
+
80
+ return sessionKey1 ;
81
+ }
82
+
83
+ static void Parse ( IReadOnlyDictionary < string , string > headers , out Guid sessionId , out long sequence )
84
+ {
85
+ var rawHeader = headers [ "NServiceBus.Metrics.QueueLength" ] ;
86
+ var parts = rawHeader . Split ( '_' ) ;
87
+ sessionId = Guid . Parse ( parts [ 0 ] ) ;
88
+ sequence = long . Parse ( parts [ 1 ] ) ;
89
+ }
90
+
91
+ class Context : ScenarioContext
92
+ {
93
+ public volatile int SubscriptionCount ;
94
+ public ConcurrentQueue < IReadOnlyDictionary < string , string > > Headers1 { get ; } = new ConcurrentQueue < IReadOnlyDictionary < string , string > > ( ) ;
95
+ public ConcurrentQueue < IReadOnlyDictionary < string , string > > Headers2 { get ; } = new ConcurrentQueue < IReadOnlyDictionary < string , string > > ( ) ;
96
+ public string Data { get ; set ; }
97
+ }
98
+
99
+ class Publisher : EndpointConfigurationBuilder
100
+ {
101
+ public Publisher ( )
102
+ {
103
+ EndpointSetup < DefaultServer > ( ( c , r ) =>
104
+ {
105
+ var context = ( Context ) r . ScenarioContext ;
106
+
107
+ c . UniquelyIdentifyRunningInstance ( ) . UsingCustomIdentifier ( HostId ) ;
108
+ c . OnEndpointSubscribed < Context > ( ( s , ctx ) =>
109
+ {
110
+ if ( s . SubscriberReturnAddress . Contains ( "Subscriber" ) )
111
+ {
112
+ Interlocked . Increment ( ref ctx . SubscriptionCount ) ;
113
+ }
114
+ } ) ;
115
+
116
+ c . Pipeline . Register ( new PreQueueLengthStep ( ) ) ;
117
+ c . Pipeline . Register ( new PostQueueLengthStep ( ) ) ;
118
+
119
+ c . EnableMetrics ( ) . EnableCustomReport ( payload =>
120
+ {
121
+ context . Data = payload ;
122
+ return Task . FromResult ( 0 ) ;
123
+ } , TimeSpan . FromMilliseconds ( 5 ) ) ;
124
+ } ) ;
125
+ }
126
+ }
127
+
128
+ class Subscriber : EndpointConfigurationBuilder
129
+ {
130
+ public Subscriber ( )
131
+ {
132
+ EndpointSetup < DefaultServer > ( c =>
133
+ {
134
+ c . LimitMessageProcessingConcurrencyTo ( 1 ) ;
135
+ c . DisableFeature < AutoSubscribe > ( ) ;
136
+
137
+ var routing = c . UseTransport < MsmqTransport > ( )
138
+ . Routing ( ) ;
139
+ var publisher = AcceptanceTesting . Customization . Conventions . EndpointNamingConvention ( typeof ( Publisher ) ) ;
140
+ routing . RegisterPublisher ( typeof ( TestEventMessage1 ) , publisher ) ;
141
+ routing . RegisterPublisher ( typeof ( TestEventMessage2 ) , publisher ) ;
142
+ } ) ;
143
+ }
144
+
145
+ public class TestEventMessage1Handler : IHandleMessages < TestEventMessage1 >
146
+ {
147
+ public Context TestContext { get ; set ; }
148
+
149
+ public Task Handle ( TestEventMessage1 message , IMessageHandlerContext context )
150
+ {
151
+ TestContext . Headers1 . Enqueue ( context . MessageHeaders ) ;
152
+
153
+ return Task . FromResult ( 0 ) ;
154
+ }
155
+ }
156
+
157
+ public class TestEventMessage2Handler : IHandleMessages < TestEventMessage2 >
158
+ {
159
+ public Context TestContext { get ; set ; }
160
+
161
+ public Task Handle ( TestEventMessage2 message , IMessageHandlerContext context )
162
+ {
163
+ TestContext . Headers2 . Enqueue ( context . MessageHeaders ) ;
164
+ return Task . FromResult ( 0 ) ;
165
+ }
166
+ }
167
+ }
168
+
169
+ public class TestEventMessage1 : IEvent
170
+ {
171
+ }
172
+
173
+ public class TestEventMessage2 : IEvent
174
+ {
175
+ }
176
+
177
+ class PreQueueLengthStep : RegisterStep
178
+ {
179
+ public PreQueueLengthStep ( )
180
+ : base ( "PreQueueLengthStep" , typeof ( Behavior ) , "Registers behavior replacing context" )
181
+ {
182
+ InsertBefore ( "DispatchQueueLengthBehavior" ) ;
183
+ }
184
+
185
+ class Behavior : IBehavior < IDispatchContext , IDispatchContext >
186
+ {
187
+ public Task Invoke ( IDispatchContext context , Func < IDispatchContext , Task > next )
188
+ {
189
+ return next ( new MultiDispatchContext ( context ) ) ;
190
+ }
191
+ }
192
+ }
193
+
194
+ class PostQueueLengthStep : RegisterStep
195
+ {
196
+ public PostQueueLengthStep ( )
197
+ : base ( "PostQueueLengthStep" , typeof ( Behavior ) , "Registers behavior restoring context" )
198
+ {
199
+ InsertAfter ( "DispatchQueueLengthBehavior" ) ;
200
+ }
201
+
202
+ class Behavior : IBehavior < IDispatchContext , IDispatchContext >
203
+ {
204
+ public Task Invoke ( IDispatchContext context , Func < IDispatchContext , Task > next )
205
+ {
206
+ return next ( ( ( MultiDispatchContext ) context ) . Original ) ;
207
+ }
208
+ }
209
+ }
210
+
211
+ class MultiDispatchContext : IDispatchContext
212
+ {
213
+ public MultiDispatchContext ( IDispatchContext original )
214
+ {
215
+ Extensions = original . Extensions ;
216
+ Builder = original . Builder ;
217
+ Operations = original . Operations . Select ( t => new TransportOperation ( t . Message , new MulticastAddressTag ( Type . GetType ( t . Message . Headers [ Headers . EnclosedMessageTypes ] ) ) , t . RequiredDispatchConsistency , t . DeliveryConstraints ) ) . ToArray ( ) ;
218
+ Original = original ;
219
+ }
220
+
221
+ public IDispatchContext Original { get ; }
222
+ public ContextBag Extensions { get ; }
223
+ public IBuilder Builder { get ; }
224
+ public IEnumerable < TransportOperation > Operations { get ; }
225
+ }
226
+ }
227
+ }
0 commit comments