1010using  System . Linq ; 
1111using  System . Threading ; 
1212using  Elastic . Apm . Api ; 
13+ using  Elastic . Apm . Ingest ; 
1314using  Elastic . Apm . Libraries . Newtonsoft . Json . Linq ; 
1415using  Elastic . Apm . Logging ; 
1516using  Elastic . Apm . Metrics ; 
1617using  Elastic . Apm . Model ; 
1718using  Elastic . Apm . Report ; 
19+ using  Elastic . Transport ; 
1820using  FluentAssertions ; 
1921
22+ #nullable  enable
2023namespace  Elastic . Apm . Tests . Utilities 
2124{ 
22- 	internal  class  MockPayloadSender  :  IPayloadSender 
25+ 	internal  class  MockPayloadSender  :  ApmChannel 
2326	{ 
2427		private  static   readonly  JObject  JsonSpanTypesData  = 
2528			JObject . Parse ( File . ReadAllText ( "./TestResources/json-specs/span_types.json" ) ) ; 
2629
27- 		private  readonly  List < IError >  _errors  =  new  List < IError > ( ) ; 
28- 		private  readonly  List < Func < IError ,  IError > >  _errorFilters  =  new  List < Func < IError ,  IError > > ( ) ; 
29- 		private  readonly  object  _spanLock  =  new  object ( ) ; 
30- 		private  readonly  object  _transactionLock  =  new  object ( ) ; 
31- 		private  readonly  object  _metricsLock  =  new  object ( ) ; 
32- 		private  readonly  object  _errorLock  =  new  object ( ) ; 
33- 		private  readonly  List < IMetricSet >  _metrics  =  new  List < IMetricSet > ( ) ; 
34- 		private  readonly  List < Func < ISpan ,  ISpan > >  _spanFilters  =  new  List < Func < ISpan ,  ISpan > > ( ) ; 
35- 		private  readonly  List < ISpan >  _spans  =  new  List < ISpan > ( ) ; 
36- 		private  readonly  List < Func < ITransaction ,  ITransaction > >  _transactionFilters  =  new  List < Func < ITransaction ,  ITransaction > > ( ) ; 
37- 		private  readonly  List < ITransaction >  _transactions  =  new  List < ITransaction > ( ) ; 
38- 
39- 		public  MockPayloadSender ( IApmLogger  logger  =  null ) 
30+ 		private  readonly  object  _spanLock  =  new ( ) ; 
31+ 		private  readonly  object  _transactionLock  =  new ( ) ; 
32+ 		private  readonly  object  _metricsLock  =  new ( ) ; 
33+ 		private  readonly  object  _errorLock  =  new ( ) ; 
34+ 		private  readonly  List < IMetricSet >  _metrics  =  new ( ) ; 
35+ 		private  readonly  List < IError >  _errors  =  new ( ) ; 
36+ 		private  readonly  List < ISpan >  _spans  =  new ( ) ; 
37+ 		private  readonly  List < ITransaction >  _transactions  =  new ( ) ; 
38+ 
39+ 		public  MockPayloadSender ( IApmLogger ?  logger  =  null ) 
40+ 			:  base ( new  ApmChannelOptions ( new  Uri ( "http://localhost:8080" ) ,  transportClient :  new  InMemoryConnection ( ) ) ,  logger ) 
4041		{ 
4142			_waitHandles  =  new [ ]  {  new  AutoResetEvent ( false ) ,  new  AutoResetEvent ( false ) ,  new  AutoResetEvent ( false ) ,  new  AutoResetEvent ( false )  } ; 
4243
@@ -45,7 +46,6 @@ public MockPayloadSender(IApmLogger logger = null)
4546			_errorWaitHandle  =  _waitHandles [ 2 ] ; 
4647			_metricSetWaitHandle  =  _waitHandles [ 3 ] ; 
4748
48- 			PayloadSenderV2 . SetUpFilters ( _transactionFilters ,  _spanFilters ,  _errorFilters ,  MockApmServerInfo . Version710 ,  logger  ??  new  NoopLogger ( ) ) ; 
4949		} 
5050
5151		/// <summary> 
@@ -61,6 +61,54 @@ public MockPayloadSender(IApmLogger logger = null)
6161		private  readonly  AutoResetEvent [ ]  _waitHandles ; 
6262		private  static   readonly  TimeSpan  DefaultTimeout  =  TimeSpan . FromMinutes ( 1 ) ; 
6363
64+ 		public  override  bool  TryWrite ( IIntakeRoot  item ) 
65+ 		{ 
66+ 			var  written  =  base . TryWrite ( item ) ; 
67+ 			switch  ( item ) 
68+ 			{ 
69+ 				case  IError  error : 
70+ 					 _errors . Add ( error ) ; 
71+ 					 _errorWaitHandle . Set ( ) ; 
72+ 					 break ; 
73+ 				case  ITransaction  transaction : 
74+ 					 _transactions . Add ( transaction ) ; 
75+ 					 _transactionWaitHandle . Set ( ) ; 
76+ 					 break ; 
77+ 				case  ISpan  span : 
78+ 					 _spans . Add ( span ) ; 
79+ 					 _spanWaitHandle . Set ( ) ; 
80+ 					 break ; 
81+ 				case  IMetricSet  metricSet : 
82+ 					 _metrics . Add ( metricSet ) ; 
83+ 					 _metricSetWaitHandle . Set ( ) ; 
84+ 					 break ; 
85+ 			} 
86+ 			return  written ; 
87+ 		} 
88+ 
89+ 		public  override  void  QueueError ( IError  error ) 
90+ 		{ 
91+ 			lock  ( _errorLock )  base . QueueError ( error ) ; 
92+ 		} 
93+ 
94+ 		public  override  void  QueueTransaction ( ITransaction  transaction ) 
95+ 		{ 
96+ 			lock  ( _transactionLock )  base . QueueTransaction ( transaction ) ; 
97+ 		} 
98+ 
99+ 		public  override  void  QueueSpan ( ISpan  span ) 
100+ 		{ 
101+ 			VerifySpan ( span ) ; 
102+ 			lock  ( _spanLock )  base . QueueSpan ( span ) ; 
103+ 		} 
104+ 
105+ 		public  override  void  QueueMetrics ( IMetricSet  metricSet ) 
106+ 		{ 
107+ 			lock  ( _metricsLock )  base . QueueMetrics ( metricSet ) ; 
108+ 		} 
109+ 
110+ 
111+ 
64112		/// <summary> 
65113		/// Waits for any events to be queued 
66114		/// </summary> 
@@ -191,27 +239,27 @@ public IReadOnlyList<IError> Errors
191239			get 
192240			{ 
193241				lock  ( _errorLock ) 
194- 					return  CreateImmutableSnapshot < IError > ( _errors ) ; 
242+ 					return  CreateImmutableSnapshot ( _errors ) ; 
195243			} 
196244		} 
197245
198- 		public  Error  FirstError  =>  Errors . FirstOrDefault ( )  as  Error ; 
199- 		public  MetricSet  FirstMetric  =>  Metrics . FirstOrDefault ( )  as  MetricSet ; 
246+ 		public  Error ?  FirstError  =>  Errors . FirstOrDefault ( )  as  Error ; 
247+ 		public  MetricSet ?  FirstMetric  =>  Metrics . FirstOrDefault ( )  as  MetricSet ; 
200248
201249		/// <summary> 
202250		/// The 1. Span on the 1. Transaction 
203251		/// </summary> 
204- 		public  Span  FirstSpan  =>  Spans . FirstOrDefault ( )  as  Span ; 
252+ 		public  Span ?  FirstSpan  =>  Spans . FirstOrDefault ( )  as  Span ; 
205253
206- 		public  Transaction  FirstTransaction  => 
254+ 		public  Transaction ?  FirstTransaction  => 
207255			Transactions . FirstOrDefault ( )  as  Transaction ; 
208256
209257		public  IReadOnlyList < IMetricSet >  Metrics 
210258		{ 
211259			get 
212260			{ 
213261				lock  ( _metricsLock ) 
214- 					return  CreateImmutableSnapshot < IMetricSet > ( _metrics ) ; 
262+ 					return  CreateImmutableSnapshot ( _metrics ) ; 
215263			} 
216264		} 
217265
@@ -220,7 +268,7 @@ public IReadOnlyList<ISpan> Spans
220268			get 
221269			{ 
222270				lock  ( _spanLock ) 
223- 					return  CreateImmutableSnapshot < ISpan > ( _spans ) ; 
271+ 					return  CreateImmutableSnapshot ( _spans ) ; 
224272			} 
225273		} 
226274
@@ -229,45 +277,15 @@ public IReadOnlyList<ITransaction> Transactions
229277			get 
230278			{ 
231279				lock  ( _transactionLock ) 
232- 					return  CreateImmutableSnapshot < ITransaction > ( _transactions ) ; 
280+ 					return  CreateImmutableSnapshot ( _transactions ) ; 
233281			} 
234282		} 
235283
236284		public  Span [ ]  SpansOnFirstTransaction  => 
237- 			Spans . Where ( n =>  n . TransactionId  ==  Transactions . First ( ) . Id ) . Select ( n =>  n  as  Span ) . ToArray ( ) ; 
238- 
239- 		public  void  QueueError ( IError  error ) 
240- 		{ 
241- 			lock  ( _errorLock ) 
242- 			{ 
243- 				error  =  _errorFilters . Aggregate ( error , 
244- 					( current ,  filter )  =>  filter ( current ) ) ; 
245- 				_errors . Add ( error ) ; 
246- 				_errorWaitHandle . Set ( ) ; 
247- 			} 
248- 		} 
249- 
250- 		public  virtual  void  QueueTransaction ( ITransaction  transaction ) 
251- 		{ 
252- 			lock  ( _transactionLock ) 
253- 			{ 
254- 				transaction  =  _transactionFilters . Aggregate ( transaction , 
255- 					( current ,  filter )  =>  filter ( current ) ) ; 
256- 				_transactions . Add ( transaction ) ; 
257- 				_transactionWaitHandle . Set ( ) ; 
258- 			} 
259- 		} 
260- 
261- 		public  void  QueueSpan ( ISpan  span ) 
262- 		{ 
263- 			VerifySpan ( span ) ; 
264- 			lock  ( _spanLock ) 
265- 			{ 
266- 				span  =  _spanFilters . Aggregate ( span ,  ( current ,  filter )  =>  filter ( current ) ) ; 
267- 				_spans . Add ( span ) ; 
268- 				_spanWaitHandle . Set ( ) ; 
269- 			} 
270- 		} 
285+ 			Spans 
286+ 				. Where ( n =>  n . TransactionId  ==  Transactions . First ( ) . Id ) 
287+ 				. Select ( n =>  ( Span ) n ) 
288+ 				. ToArray ( ) ; 
271289
272290		private  void  VerifySpan ( ISpan  span ) 
273291		{ 
@@ -279,7 +297,7 @@ private void VerifySpan(ISpan span)
279297				var  spanTypeInfo  =  JsonSpanTypesData [ type ]  as  JObject ; 
280298				spanTypeInfo . Should ( ) . NotBeNull ( $ "span type '{ type } ' is not allowed by the spec") ; 
281299
282- 				var  allowNullSubtype  =  spanTypeInfo [ "allow_null_subtype" ] ? . Value < bool > ( ) ; 
300+ 				var  allowNullSubtype  =  spanTypeInfo ! [ "allow_null_subtype" ] ? . Value < bool > ( ) ; 
283301				var  allowUnlistedSubtype  =  spanTypeInfo [ "allow_unlisted_subtype" ] ? . Value < bool > ( ) ; 
284302				var  subTypes  =  spanTypeInfo [ "subtypes" ] ; 
285303				var  hasSubtypes  =  subTypes  !=  null  &&  subTypes . Any ( ) ; 
@@ -289,7 +307,7 @@ private void VerifySpan(ISpan span)
289307				{ 
290308					if  ( ! allowUnlistedSubtype . GetValueOrDefault ( )  &&  hasSubtypes ) 
291309					{ 
292- 						var  subTypeInfo  =  subTypes [ subType ] ; 
310+ 						var  subTypeInfo  =  subTypes ! [ subType ] ; 
293311						subTypeInfo . Should ( ) 
294312							. NotBeNull ( $ "span subtype '{ subType } ' is not allowed by the spec for type '{ type } '") ; 
295313					} 
@@ -305,15 +323,6 @@ private void VerifySpan(ISpan span)
305323			} 
306324		} 
307325
308- 		public  void  QueueMetrics ( IMetricSet  metricSet ) 
309- 		{ 
310- 			lock  ( _metricsLock ) 
311- 			{ 
312- 				_metrics . Add ( metricSet ) ; 
313- 				_metricSetWaitHandle . Set ( ) ; 
314- 			} 
315- 		} 
316- 
317326		public  void  Clear ( ) 
318327		{ 
319328			lock  ( _spanLock ) 
0 commit comments