-
-
Notifications
You must be signed in to change notification settings - Fork 220
feat(logs): add Buffering and Batching #4310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feat/logs
Are you sure you want to change the base?
Changes from all commits
d24d165
aad0599
76fcc1b
2ad33f6
38e1c04
f7a43b8
e6b0b74
a84b78f
53c90ea
6580632
6e2ee9b
0774709
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
namespace Sentry.Internal; | ||
|
||
/// <summary> | ||
/// A slim wrapper over an <see cref="System.Array"/>, intended for buffering. | ||
/// <para>Requires a minimum capacity of 2.</para> | ||
/// </summary> | ||
/// <remarks> | ||
/// <para><see cref="Capacity"/> is thread-safe.</para> | ||
/// <para><see cref="TryAdd(T, out int)"/> is thread-safe.</para> | ||
/// <para><see cref="ToArrayAndClear()"/> is not thread-safe.</para> | ||
/// <para><see cref="ToArrayAndClear(int)"/> is not thread-safe.</para> | ||
/// </remarks> | ||
internal sealed class BatchBuffer<T> | ||
{ | ||
private readonly T[] _array; | ||
private int _additions; | ||
|
||
public BatchBuffer(int capacity) | ||
{ | ||
ThrowIfLessThanTwo(capacity, nameof(capacity)); | ||
|
||
_array = new T[capacity]; | ||
_additions = 0; | ||
} | ||
|
||
internal int Capacity => _array.Length; | ||
internal bool IsEmpty => _additions == 0; | ||
internal bool IsFull => _additions >= _array.Length; | ||
|
||
internal bool TryAdd(T item, out int count) | ||
{ | ||
count = Interlocked.Increment(ref _additions); | ||
|
||
if (count <= _array.Length) | ||
{ | ||
_array[count - 1] = item; | ||
return true; | ||
} | ||
|
||
return false; | ||
} | ||
|
||
internal T[] ToArrayAndClear() | ||
{ | ||
var additions = _additions; | ||
var length = _array.Length; | ||
if (additions < length) | ||
{ | ||
length = additions; | ||
} | ||
return ToArrayAndClear(length); | ||
} | ||
|
||
internal T[] ToArrayAndClear(int length) | ||
{ | ||
var array = ToArray(length); | ||
Clear(length); | ||
return array; | ||
} | ||
|
||
private T[] ToArray(int length) | ||
{ | ||
if (length == 0) | ||
{ | ||
return Array.Empty<T>(); | ||
} | ||
|
||
var array = new T[length]; | ||
Array.Copy(_array, array, length); | ||
return array; | ||
} | ||
|
||
private void Clear(int length) | ||
{ | ||
if (length == 0) | ||
{ | ||
return; | ||
} | ||
|
||
_additions = 0; | ||
Array.Clear(_array, 0, length); | ||
} | ||
|
||
private static void ThrowIfLessThanTwo(int capacity, string paramName) | ||
{ | ||
if (capacity < 2) | ||
{ | ||
ThrowLessThanTwo(capacity, paramName); | ||
} | ||
} | ||
|
||
private static void ThrowLessThanTwo(int capacity, string paramName) | ||
{ | ||
throw new ArgumentOutOfRangeException(paramName, capacity, "Argument must be at least two."); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
using Sentry.Extensibility; | ||
using Sentry.Protocol; | ||
using Sentry.Protocol.Envelopes; | ||
|
||
namespace Sentry.Internal; | ||
|
||
/// <summary> | ||
/// The Sentry Batch Processor. | ||
/// This implementation is not complete yet. | ||
/// Also, the specification is still work in progress. | ||
/// </summary> | ||
/// <remarks> | ||
/// Sentry Specification: <see href="https://develop.sentry.dev/sdk/telemetry/spans/batch-processor/"/>. | ||
/// OpenTelemetry spec: <see href="https://github.com/open-telemetry/opentelemetry-collector/blob/main/processor/batchprocessor/README.md"/>. | ||
/// </remarks> | ||
internal sealed class BatchProcessor : IDisposable | ||
{ | ||
private readonly IHub _hub; | ||
private readonly TimeSpan _batchInterval; | ||
private readonly IDiagnosticLogger? _diagnosticLogger; | ||
|
||
private readonly Timer _timer; | ||
private readonly object _timerCallbackLock; | ||
private readonly BatchBuffer<SentryLog> _buffer1; | ||
private readonly BatchBuffer<SentryLog> _buffer2; | ||
private volatile BatchBuffer<SentryLog> _activeBuffer; | ||
|
||
private DateTime _lastFlush = DateTime.MinValue; | ||
|
||
public BatchProcessor(IHub hub, int batchCount, TimeSpan batchInterval, IDiagnosticLogger? diagnosticLogger) | ||
{ | ||
_hub = hub; | ||
_batchInterval = batchInterval; | ||
_diagnosticLogger = diagnosticLogger; | ||
|
||
_timer = new Timer(OnIntervalElapsed, this, Timeout.Infinite, Timeout.Infinite); | ||
_timerCallbackLock = new object(); | ||
|
||
_buffer1 = new BatchBuffer<SentryLog>(batchCount); | ||
_buffer2 = new BatchBuffer<SentryLog>(batchCount); | ||
_activeBuffer = _buffer1; | ||
} | ||
|
||
internal void Enqueue(SentryLog log) | ||
{ | ||
var activeBuffer = _activeBuffer; | ||
|
||
if (!TryEnqueue(activeBuffer, log)) | ||
{ | ||
activeBuffer = activeBuffer == _buffer1 ? _buffer2 : _buffer1; | ||
if (!TryEnqueue(activeBuffer, log)) | ||
{ | ||
_diagnosticLogger?.LogInfo("Log Buffer full ... dropping log"); | ||
} | ||
} | ||
} | ||
|
||
private bool TryEnqueue(BatchBuffer<SentryLog> buffer, SentryLog log) | ||
{ | ||
if (buffer.TryAdd(log, out var count)) | ||
{ | ||
if (count == 1) // is first of buffer | ||
{ | ||
EnableTimer(); | ||
} | ||
|
||
if (count == buffer.Capacity) // is buffer full | ||
{ | ||
// swap active buffer atomically | ||
var currentActiveBuffer = _activeBuffer; | ||
var newActiveBuffer = ReferenceEquals(currentActiveBuffer, _buffer1) ? _buffer2 : _buffer1; | ||
if (Interlocked.CompareExchange(ref _activeBuffer, newActiveBuffer, currentActiveBuffer) == currentActiveBuffer) | ||
{ | ||
DisableTimer(); | ||
Flush(buffer, count); | ||
} | ||
} | ||
|
||
return true; | ||
} | ||
|
||
return false; | ||
} | ||
|
||
private void Flush(BatchBuffer<SentryLog> buffer, int count) | ||
{ | ||
_lastFlush = DateTime.UtcNow; | ||
|
||
var logs = buffer.ToArrayAndClear(count); | ||
_ = _hub.CaptureEnvelope(Envelope.FromLog(new StructuredLog(logs))); | ||
} | ||
|
||
internal void OnIntervalElapsed(object? state) | ||
{ | ||
lock (_timerCallbackLock) | ||
{ | ||
var currentActiveBuffer = _activeBuffer; | ||
|
||
if (!currentActiveBuffer.IsEmpty && DateTime.UtcNow > _lastFlush) | ||
{ | ||
var newActiveBuffer = ReferenceEquals(currentActiveBuffer, _buffer1) ? _buffer2 : _buffer1; | ||
if (Interlocked.CompareExchange(ref _activeBuffer, newActiveBuffer, currentActiveBuffer) == currentActiveBuffer) | ||
{ | ||
Flush(currentActiveBuffer, -1); | ||
} | ||
} | ||
} | ||
} | ||
|
||
private void EnableTimer() | ||
{ | ||
var updated = _timer.Change(_batchInterval, Timeout.InfiniteTimeSpan); | ||
Debug.Assert(updated, "Timer was not successfully enabled."); | ||
} | ||
|
||
private void DisableTimer() | ||
{ | ||
var updated = _timer.Change(Timeout.Infinite, Timeout.Infinite); | ||
Debug.Assert(updated, "Timer was not successfully disabled."); | ||
} | ||
|
||
public void Dispose() | ||
{ | ||
_timer.Dispose(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,6 +1,5 @@ | ||||||||||||||||||||||||||||||||||||||||
using Sentry.Extensibility; | ||||||||||||||||||||||||||||||||||||||||
using Sentry.Infrastructure; | ||||||||||||||||||||||||||||||||||||||||
using Sentry.Protocol.Envelopes; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
namespace Sentry.Internal; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
|
@@ -10,13 +9,33 @@ internal sealed class DefaultSentryStructuredLogger : SentryStructuredLogger | |||||||||||||||||||||||||||||||||||||||
private readonly SentryOptions _options; | ||||||||||||||||||||||||||||||||||||||||
private readonly ISystemClock _clock; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
private readonly BatchProcessor _batchProcessor; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
internal DefaultSentryStructuredLogger(IHub hub, SentryOptions options, ISystemClock clock) | ||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||
Debug.Assert(options is { Experimental.EnableLogs: true }); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
_hub = hub; | ||||||||||||||||||||||||||||||||||||||||
_options = options; | ||||||||||||||||||||||||||||||||||||||||
_clock = clock; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
_batchProcessor = new BatchProcessor(hub, ClampBatchCount(options.Experimental.InternalBatchSize), ClampBatchInterval(options.Experimental.InternalBatchTimeout), _options.DiagnosticLogger); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
private static int ClampBatchCount(int batchCount) | ||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this go in the setter for the If an invalid value has been configured, should we either throw an exception (we'd only do that if it happened when initialising the options - we don't generally want the SDK to throw exceptions otherwise) or at the very least log a message letting people know they're not configuring things correctly? |
||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||
return batchCount <= 0 | ||||||||||||||||||||||||||||||||||||||||
? 1 | ||||||||||||||||||||||||||||||||||||||||
: batchCount > 1_000_000 | ||||||||||||||||||||||||||||||||||||||||
? 1_000_000 | ||||||||||||||||||||||||||||||||||||||||
: batchCount; | ||||||||||||||||||||||||||||||||||||||||
Comment on lines
+24
to
+31
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
private static TimeSpan ClampBatchInterval(TimeSpan batchInterval) | ||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, could this be moved to the setter? |
||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||
return batchInterval.TotalMilliseconds is <= 0 or > int.MaxValue | ||||||||||||||||||||||||||||||||||||||||
? TimeSpan.FromMilliseconds(int.MaxValue) | ||||||||||||||||||||||||||||||||||||||||
: batchInterval; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
Comment on lines
+33
to
39
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
private protected override void CaptureLog(SentryLogLevel level, string template, object[]? parameters, Action<SentryLog>? configureLog) | ||||||||||||||||||||||||||||||||||||||||
|
@@ -71,9 +90,17 @@ private protected override void CaptureLog(SentryLogLevel level, string template | |||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
if (configuredLog is not null) | ||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||
//TODO: enqueue in Batch-Processor / Background-Worker | ||||||||||||||||||||||||||||||||||||||||
// see https://github.com/getsentry/sentry-dotnet/issues/4132 | ||||||||||||||||||||||||||||||||||||||||
_ = _hub.CaptureEnvelope(Envelope.FromLog(configuredLog)); | ||||||||||||||||||||||||||||||||||||||||
_batchProcessor.Enqueue(configuredLog); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
protected override void Dispose(bool disposing) | ||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||
if (disposing) | ||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||
_batchProcessor.Dispose(); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
base.Dispose(disposing); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} |
jamescrosswell marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
using Sentry.Extensibility; | ||
|
||
namespace Sentry.Protocol; | ||
|
||
/// <summary> | ||
/// Represents the Sentry Log protocol. | ||
/// </summary> | ||
/// <remarks> | ||
/// Sentry Docs: <see href="https://docs.sentry.io/product/explore/logs/"/>. | ||
/// Sentry Developer Documentation: <see href="https://develop.sentry.dev/sdk/telemetry/logs/"/>. | ||
/// </remarks> | ||
internal sealed class StructuredLog : ISentryJsonSerializable | ||
{ | ||
private readonly SentryLog[] _items; | ||
|
||
public StructuredLog(SentryLog[] logs) | ||
{ | ||
_items = logs; | ||
} | ||
|
||
public int Length => _items.Length; | ||
public ReadOnlySpan<SentryLog> Items => _items; | ||
|
||
public void WriteTo(Utf8JsonWriter writer, IDiagnosticLogger? logger) | ||
{ | ||
writer.WriteStartObject(); | ||
writer.WriteStartArray("items"); | ||
|
||
foreach (var log in _items) | ||
{ | ||
log.WriteTo(writer, logger); | ||
} | ||
|
||
writer.WriteEndArray(); | ||
writer.WriteEndObject(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could think about giving this a more specific/descriptive name.
BatchProcessor
is very generic and we could have other batch processors (for metrics or other things) in the future.If we weren't worried about long names, I guess it would be a
StructuredLogBatchProcessor
... Or a more concise alternative might beLogBatch
(so you'd do something like_logBatch.Enqueue(log)
)?Alternatively, it could be made generic (like the
BatchBuffer
class that it uses), but I'd recommend we do that if/when we need a batch processor for a second category of Sentry events (it's hard to make something properly generic until you have 2 or 3 different concrete implementations).