Skip to content

feat(logs): add Buffering and Batching #4310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: feat/logs
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions src/Sentry/Internal/BatchBuffer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
namespace Sentry.Internal;

/// <summary>
/// A slim wrapper over an <see cref="System.Array"/>, intended for buffering.
/// <para>Requires a minimum capacity of 2.</para>
/// </summary>
/// <remarks>
/// <para><see cref="Capacity"/> is thread-safe.</para>
/// <para><see cref="TryAdd(T, out int)"/> is thread-safe.</para>
/// <para><see cref="ToArrayAndClear()"/> is not thread-safe.</para>
/// <para><see cref="ToArrayAndClear(int)"/> is not thread-safe.</para>
/// </remarks>
internal sealed class BatchBuffer<T>
{
private readonly T[] _array;
private int _additions;

public BatchBuffer(int capacity)
{
ThrowIfLessThanTwo(capacity, nameof(capacity));

_array = new T[capacity];
_additions = 0;
}

internal int Capacity => _array.Length;
internal bool IsEmpty => _additions == 0;
internal bool IsFull => _additions >= _array.Length;

internal bool TryAdd(T item, out int count)
{
count = Interlocked.Increment(ref _additions);

if (count <= _array.Length)
{
_array[count - 1] = item;
return true;
}

return false;
}

internal T[] ToArrayAndClear()
{
var additions = _additions;
var length = _array.Length;
if (additions < length)
{
length = additions;
}
return ToArrayAndClear(length);
}

internal T[] ToArrayAndClear(int length)
{
var array = ToArray(length);
Clear(length);
return array;
}

private T[] ToArray(int length)
{
if (length == 0)
{
return Array.Empty<T>();
}

var array = new T[length];
Array.Copy(_array, array, length);
return array;
}

private void Clear(int length)
{
if (length == 0)
{
return;
}

_additions = 0;
Array.Clear(_array, 0, length);
}

private static void ThrowIfLessThanTwo(int capacity, string paramName)
{
if (capacity < 2)
{
ThrowLessThanTwo(capacity, paramName);
}
}

private static void ThrowLessThanTwo(int capacity, string paramName)
{
throw new ArgumentOutOfRangeException(paramName, capacity, "Argument must be at least two.");
}
}
126 changes: 126 additions & 0 deletions src/Sentry/Internal/BatchProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
using Sentry.Extensibility;
using Sentry.Protocol;
using Sentry.Protocol.Envelopes;

namespace Sentry.Internal;

/// <summary>
/// The Sentry Batch Processor.
/// This implementation is not complete yet.
/// Also, the specification is still work in progress.
/// </summary>
/// <remarks>
/// Sentry Specification: <see href="https://develop.sentry.dev/sdk/telemetry/spans/batch-processor/"/>.
/// OpenTelemetry spec: <see href="https://github.com/open-telemetry/opentelemetry-collector/blob/main/processor/batchprocessor/README.md"/>.
/// </remarks>
internal sealed class BatchProcessor : IDisposable
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could think about giving this a more specific/descriptive name. BatchProcessor is very generic and we could have other batch processors (for metrics or other things) in the future.

If we weren't worried about long names, I guess it would be a StructuredLogBatchProcessor... Or a more concise alternative might be LogBatch (so you'd do something like _logBatch.Enqueue(log))?

Alternatively, it could be made generic (like the BatchBuffer class that it uses), but I'd recommend we do that if/when we need a batch processor for a second category of Sentry events (it's hard to make something properly generic until you have 2 or 3 different concrete implementations).

{
private readonly IHub _hub;
private readonly TimeSpan _batchInterval;
private readonly IDiagnosticLogger? _diagnosticLogger;

private readonly Timer _timer;
private readonly object _timerCallbackLock;
private readonly BatchBuffer<SentryLog> _buffer1;
private readonly BatchBuffer<SentryLog> _buffer2;
private volatile BatchBuffer<SentryLog> _activeBuffer;

private DateTime _lastFlush = DateTime.MinValue;

public BatchProcessor(IHub hub, int batchCount, TimeSpan batchInterval, IDiagnosticLogger? diagnosticLogger)
{
_hub = hub;
_batchInterval = batchInterval;
_diagnosticLogger = diagnosticLogger;

_timer = new Timer(OnIntervalElapsed, this, Timeout.Infinite, Timeout.Infinite);
_timerCallbackLock = new object();

_buffer1 = new BatchBuffer<SentryLog>(batchCount);
_buffer2 = new BatchBuffer<SentryLog>(batchCount);
_activeBuffer = _buffer1;
}

internal void Enqueue(SentryLog log)
{
var activeBuffer = _activeBuffer;

if (!TryEnqueue(activeBuffer, log))
{
activeBuffer = activeBuffer == _buffer1 ? _buffer2 : _buffer1;
if (!TryEnqueue(activeBuffer, log))
{
_diagnosticLogger?.LogInfo("Log Buffer full ... dropping log");
}
}
}

private bool TryEnqueue(BatchBuffer<SentryLog> buffer, SentryLog log)
{
if (buffer.TryAdd(log, out var count))
{
if (count == 1) // is first of buffer
{
EnableTimer();
}

if (count == buffer.Capacity) // is buffer full
{
// swap active buffer atomically
var currentActiveBuffer = _activeBuffer;
var newActiveBuffer = ReferenceEquals(currentActiveBuffer, _buffer1) ? _buffer2 : _buffer1;
if (Interlocked.CompareExchange(ref _activeBuffer, newActiveBuffer, currentActiveBuffer) == currentActiveBuffer)
{
DisableTimer();
Flush(buffer, count);
}
}

return true;
}

return false;
}

private void Flush(BatchBuffer<SentryLog> buffer, int count)
{
_lastFlush = DateTime.UtcNow;

var logs = buffer.ToArrayAndClear(count);
_ = _hub.CaptureEnvelope(Envelope.FromLog(new StructuredLog(logs)));
}

internal void OnIntervalElapsed(object? state)
{
lock (_timerCallbackLock)
{
var currentActiveBuffer = _activeBuffer;

if (!currentActiveBuffer.IsEmpty && DateTime.UtcNow > _lastFlush)
{
var newActiveBuffer = ReferenceEquals(currentActiveBuffer, _buffer1) ? _buffer2 : _buffer1;
if (Interlocked.CompareExchange(ref _activeBuffer, newActiveBuffer, currentActiveBuffer) == currentActiveBuffer)
{
Flush(currentActiveBuffer, -1);
}
}
}
}

private void EnableTimer()
{
var updated = _timer.Change(_batchInterval, Timeout.InfiniteTimeSpan);
Debug.Assert(updated, "Timer was not successfully enabled.");
}

private void DisableTimer()
{
var updated = _timer.Change(Timeout.Infinite, Timeout.Infinite);
Debug.Assert(updated, "Timer was not successfully disabled.");
}

public void Dispose()
{
_timer.Dispose();
}
}
35 changes: 31 additions & 4 deletions src/Sentry/Internal/DefaultSentryStructuredLogger.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using Sentry.Extensibility;
using Sentry.Infrastructure;
using Sentry.Protocol.Envelopes;

namespace Sentry.Internal;

Expand All @@ -10,13 +9,33 @@ internal sealed class DefaultSentryStructuredLogger : SentryStructuredLogger
private readonly SentryOptions _options;
private readonly ISystemClock _clock;

private readonly BatchProcessor _batchProcessor;

internal DefaultSentryStructuredLogger(IHub hub, SentryOptions options, ISystemClock clock)
{
Debug.Assert(options is { Experimental.EnableLogs: true });

_hub = hub;
_options = options;
_clock = clock;

_batchProcessor = new BatchProcessor(hub, ClampBatchCount(options.Experimental.InternalBatchSize), ClampBatchInterval(options.Experimental.InternalBatchTimeout), _options.DiagnosticLogger);
}

private static int ClampBatchCount(int batchCount)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this go in the setter for the InternalBatchSize on the options instead?

If an invalid value has been configured, should we either throw an exception (we'd only do that if it happened when initialising the options - we don't generally want the SDK to throw exceptions otherwise) or at the very least log a message letting people know they're not configuring things correctly?

{
return batchCount <= 0
? 1
: batchCount > 1_000_000
? 1_000_000
: batchCount;
Comment on lines +24 to +31

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ClampBatchCount method uses magic numbers (1 and 1,000,000) without explanation. Consider defining these as named constants with clear documentation about why these specific limits were chosen.

Suggested change
private static int ClampBatchCount(int batchCount)
{
return batchCount <= 0
? 1
: batchCount > 1_000_000
? 1_000_000
: batchCount;
private const int MinBatchCount = 1;
private const int MaxBatchCount = 1_000_000;
private static int ClampBatchCount(int batchCount)
{
return batchCount <= 0
? MinBatchCount
: batchCount > MaxBatchCount
? MaxBatchCount
: batchCount;
}

}

private static TimeSpan ClampBatchInterval(TimeSpan batchInterval)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, could this be moved to the setter?

{
return batchInterval.TotalMilliseconds is <= 0 or > int.MaxValue
? TimeSpan.FromMilliseconds(int.MaxValue)
: batchInterval;
}
Comment on lines +33 to 39

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ClampBatchInterval method uses int.MaxValue as the maximum timeout, but this could result in very long timeouts that might not be practical. Consider using a more reasonable maximum timeout (e.g., 30 seconds as mentioned in the documentation).

Suggested change
private static TimeSpan ClampBatchInterval(TimeSpan batchInterval)
{
return batchInterval.TotalMilliseconds is <= 0 or > int.MaxValue
? TimeSpan.FromMilliseconds(int.MaxValue)
: batchInterval;
}
private static readonly TimeSpan MaxBatchInterval = TimeSpan.FromSeconds(30);
private static TimeSpan ClampBatchInterval(TimeSpan batchInterval)
{
return batchInterval.TotalMilliseconds <= 0
? TimeSpan.FromMilliseconds(1)
: batchInterval > MaxBatchInterval
? MaxBatchInterval
: batchInterval;
}


private protected override void CaptureLog(SentryLogLevel level, string template, object[]? parameters, Action<SentryLog>? configureLog)
Expand Down Expand Up @@ -71,9 +90,17 @@ private protected override void CaptureLog(SentryLogLevel level, string template

if (configuredLog is not null)
{
//TODO: enqueue in Batch-Processor / Background-Worker
// see https://github.com/getsentry/sentry-dotnet/issues/4132
_ = _hub.CaptureEnvelope(Envelope.FromLog(configuredLog));
_batchProcessor.Enqueue(configuredLog);
}
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
_batchProcessor.Dispose();
}

base.Dispose(disposing);
}
}
2 changes: 2 additions & 0 deletions src/Sentry/Internal/Hub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,8 @@ public void Dispose()
_memoryMonitor?.Dispose();
#endif

Logger.Dispose();

try
{
CurrentClient.FlushAsync(_options.ShutdownTimeout).ConfigureAwait(false).GetAwaiter().GetResult();
Expand Down
7 changes: 2 additions & 5 deletions src/Sentry/Protocol/Envelopes/Envelope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -445,17 +445,14 @@ internal static Envelope FromClientReport(ClientReport clientReport)
return new Envelope(header, items);
}

// TODO: This is temporary. We don't expect single log messages to become an envelope by themselves since batching is needed
[Experimental(DiagnosticId.ExperimentalFeature)]
internal static Envelope FromLog(SentryLog log)
internal static Envelope FromLog(StructuredLog log)
{
//TODO: allow batching Sentry logs
//see https://github.com/getsentry/sentry-dotnet/issues/4132
var header = DefaultHeader;

var items = new[]
{
EnvelopeItem.FromLog(log)
EnvelopeItem.FromLog(log),
};

return new Envelope(header, items);
Expand Down
6 changes: 2 additions & 4 deletions src/Sentry/Protocol/Envelopes/EnvelopeItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,12 @@ internal static EnvelopeItem FromClientReport(ClientReport report)
}

[Experimental(Infrastructure.DiagnosticId.ExperimentalFeature)]
internal static EnvelopeItem FromLog(SentryLog log)
internal static EnvelopeItem FromLog(StructuredLog log)
{
//TODO: allow batching Sentry logs
//see https://github.com/getsentry/sentry-dotnet/issues/4132
var header = new Dictionary<string, object?>(3, StringComparer.Ordinal)
{
[TypeKey] = TypeValueLog,
["item_count"] = 1,
["item_count"] = log.Length,
["content_type"] = "application/vnd.sentry.items.log+json",
};

Expand Down
37 changes: 37 additions & 0 deletions src/Sentry/Protocol/StructuredLog.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using Sentry.Extensibility;

namespace Sentry.Protocol;

/// <summary>
/// Represents the Sentry Log protocol.
/// </summary>
/// <remarks>
/// Sentry Docs: <see href="https://docs.sentry.io/product/explore/logs/"/>.
/// Sentry Developer Documentation: <see href="https://develop.sentry.dev/sdk/telemetry/logs/"/>.
/// </remarks>
internal sealed class StructuredLog : ISentryJsonSerializable
{
private readonly SentryLog[] _items;

public StructuredLog(SentryLog[] logs)
{
_items = logs;
}

public int Length => _items.Length;
public ReadOnlySpan<SentryLog> Items => _items;

public void WriteTo(Utf8JsonWriter writer, IDiagnosticLogger? logger)
{
writer.WriteStartObject();
writer.WriteStartArray("items");

foreach (var log in _items)
{
log.WriteTo(writer, logger);
}

writer.WriteEndArray();
writer.WriteEndObject();
}
}
Loading
Loading