13
13
// limitations under the License.
14
14
15
15
using System ;
16
- using System . Collections . Concurrent ;
17
16
using System . Collections . Generic ;
18
17
using System . IO ;
19
18
using System . Linq ;
20
19
using System . Net ;
21
20
using System . Net . Http ;
22
- using System . Threading ;
23
21
using System . Threading . Tasks ;
24
- using Serilog . Core ;
25
22
using Serilog . Debugging ;
26
23
using Serilog . Events ;
27
24
using Serilog . Formatting ;
25
+ using Serilog . Sinks . PeriodicBatching ;
28
26
29
27
namespace Serilog . Sinks . Splunk
30
28
{
31
29
/// <summary>
32
30
/// A sink to log to the Event Collector available in Splunk 6.3
33
31
/// </summary>
34
- public class EventCollectorSink : ILogEventSink , IDisposable
32
+ public class EventCollectorSink : PeriodicBatchingSink
35
33
{
36
34
private readonly string _splunkHost ;
37
35
private readonly string _uriPath ;
38
- private readonly int _batchSizeLimitLimit ;
39
36
private readonly ITextFormatter _jsonFormatter ;
40
- private readonly ConcurrentQueue < LogEvent > _queue ;
41
37
private readonly EventCollectorClient _httpClient ;
42
38
39
+
43
40
/// <summary>
44
41
/// Taken from Splunk.Logging.Common
45
42
/// </summary>
@@ -115,6 +112,7 @@ public EventCollectorSink(
115
112
messageHandler )
116
113
{
117
114
}
115
+
118
116
/// <summary>
119
117
/// Creates a new instance of the sink with Customfields
120
118
/// </summary>
@@ -152,12 +150,11 @@ public EventCollectorSink(
152
150
uriPath ,
153
151
batchIntervalInSeconds ,
154
152
batchSizeLimit ,
155
- new SplunkJsonFormatter ( renderTemplate , formatProvider , source , sourceType , host , index , fields ) ,
153
+ new SplunkJsonFormatter ( renderTemplate , formatProvider , source , sourceType , host , index , fields ) ,
156
154
messageHandler )
157
155
{
158
156
}
159
157
160
-
161
158
/// <summary>
162
159
/// Creates a new instance of the sink
163
160
/// </summary>
@@ -176,67 +173,25 @@ public EventCollectorSink(
176
173
int batchSizeLimit ,
177
174
ITextFormatter jsonFormatter ,
178
175
HttpMessageHandler messageHandler = null )
176
+ : base ( batchSizeLimit , TimeSpan . FromSeconds ( batchIntervalInSeconds ) )
179
177
{
180
178
_uriPath = uriPath ;
181
179
_splunkHost = splunkHost ;
182
- _queue = new ConcurrentQueue < LogEvent > ( ) ;
183
180
_jsonFormatter = jsonFormatter ;
184
- _batchSizeLimitLimit = batchSizeLimit ;
185
181
186
- var batchInterval = TimeSpan . FromSeconds ( batchIntervalInSeconds ) ;
187
182
_httpClient = messageHandler != null
188
183
? new EventCollectorClient ( eventCollectorToken , messageHandler )
189
184
: new EventCollectorClient ( eventCollectorToken ) ;
190
-
191
- var cancellationToken = new CancellationToken ( ) ;
192
-
193
- RepeatAction . OnInterval (
194
- batchInterval ,
195
- async ( ) => await ProcessQueue ( ) ,
196
- cancellationToken ) ;
197
185
}
198
186
199
187
/// <summary>
200
- /// Emits the provided log event from a sink
188
+ /// Emit a batch of log events, running asynchronously.
201
189
/// </summary>
202
- /// <param name="logEvent"></param>
203
- public void Emit ( LogEvent logEvent )
204
- {
205
- if ( logEvent == null ) throw new ArgumentNullException ( nameof ( logEvent ) ) ;
206
-
207
- _queue . Enqueue ( logEvent ) ;
208
- }
209
-
210
- private async Task ProcessQueue ( )
211
- {
212
- try
213
- {
214
- do
215
- {
216
- var count = 0 ;
217
- var events = new Queue < LogEvent > ( ) ;
218
- LogEvent next ;
219
-
220
- while ( count < _batchSizeLimitLimit && _queue . TryDequeue ( out next ) )
221
- {
222
- count ++ ;
223
- events . Enqueue ( next ) ;
224
- }
225
-
226
- if ( events . Count == 0 )
227
- return ;
228
-
229
- await Send ( events ) ;
230
-
231
- } while ( true ) ;
232
- }
233
- catch ( Exception ex )
234
- {
235
- SelfLog . WriteLine ( "Exception while emitting batch from {0}: {1}" , this , ex ) ;
236
- }
237
- }
238
-
239
- private async Task Send ( IEnumerable < LogEvent > events )
190
+ /// <param name="events">The events to emit.</param>
191
+ /// <remarks>
192
+ /// Override either <see cref="PeriodicBatchingSink.EmitBatch" /> or <see cref="PeriodicBatchingSink.EmitBatchAsync" />, not both.
193
+ /// </remarks>
194
+ protected override async Task EmitBatchAsync ( IEnumerable < LogEvent > events )
240
195
{
241
196
var allEvents = new StringWriter ( ) ;
242
197
@@ -248,56 +203,22 @@ private async Task Send(IEnumerable<LogEvent> events)
248
203
var request = new EventCollectorRequest ( _splunkHost , allEvents . ToString ( ) , _uriPath ) ;
249
204
var response = await _httpClient . SendAsync ( request ) . ConfigureAwait ( false ) ;
250
205
251
- if ( response . IsSuccessStatusCode )
252
- {
253
- //Do Nothing?
254
- }
255
- else
206
+ if ( ! response . IsSuccessStatusCode )
256
207
{
257
208
//Application Errors sent via HTTP Event Collector
258
209
if ( HttpEventCollectorApplicationErrors . Any ( x => x == response . StatusCode ) )
259
210
{
211
+ // By not throwing an exception here the PeriodicBatchingSink will assume the batch succeeded and not send it again.
260
212
SelfLog . WriteLine (
261
213
"A status code of {0} was received when attempting to send to {1}. The event has been discarded and will not be placed back in the queue." ,
262
214
response . StatusCode . ToString ( ) , _splunkHost ) ;
263
215
}
264
216
else
265
217
{
266
- //Put the item back in the queue & retry on next go
267
- SelfLog . WriteLine (
268
- "A status code of {0} was received when attempting to send to {1}. The event has been placed back in the queue" ,
269
- response . StatusCode . ToString ( ) , _splunkHost ) ;
270
-
271
- foreach ( var logEvent in events )
272
- {
273
- _queue . Enqueue ( logEvent ) ;
274
- }
218
+ // EnsureSuccessStatusCode will throw an exception and the PeriodicBatchingSink will catch/log the exception and retry the batch.
219
+ response . EnsureSuccessStatusCode ( ) ;
275
220
}
276
221
}
277
222
}
278
-
279
- /// <inheritdoc/>
280
- public void Dispose ( )
281
- {
282
- Dispose ( true ) ;
283
- }
284
-
285
- /// <inheritdoc/>
286
- protected virtual void Dispose ( bool disposing )
287
- {
288
- if ( ! disposing ) return ;
289
-
290
- var remainingEvents = new List < LogEvent > ( ) ;
291
-
292
- while ( ! _queue . IsEmpty )
293
- {
294
- LogEvent next ;
295
- _queue . TryDequeue ( out next ) ;
296
- remainingEvents . Add ( next ) ;
297
- }
298
-
299
- Send ( remainingEvents ) . Wait ( ) ;
300
- _httpClient . Dispose ( ) ;
301
- }
302
223
}
303
224
}
0 commit comments