Skip to content

feat(logs): add Buffering and Batching #4310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 19 commits into
base: feat/logs
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions src/Sentry/Internal/BatchBuffer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
namespace Sentry.Internal;

/// <summary>
/// A slim wrapper over an <see cref="System.Array"/>,
/// intended for buffering.
/// </summary>
internal sealed class BatchBuffer<T>
{
private readonly T[] _array;
private int _count;

public BatchBuffer(int capacity)
{
ThrowIfNegativeOrZero(capacity, nameof(capacity));

_array = new T[capacity];
_count = 0;
}

internal int Count => _count;
internal int Capacity => _array.Length;
internal bool IsEmpty => _count == 0 && _array.Length != 0;
internal bool IsFull => _count == _array.Length;

internal bool TryAdd(T item)
{
if (_count < _array.Length)
{
_array[_count] = item;
_count++;
return true;
}

return false;
}

internal T[] ToArray()
{
if (_count == 0)
{
return Array.Empty<T>();
}

var array = new T[_count];
Array.Copy(_array, array, _count);
return array;
}

internal void Clear()
{
if (_count == 0)
{
return;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Clear() method has an early return for empty buffers, but Array.Clear() is safe to call even on empty arrays. Consider removing the early return to simplify the code. However, if the early return is for performance reasons, add a comment explaining this optimization.

Suggested change
internal void Clear()
{
if (_count == 0)
{
return;
internal void Clear()
{
var count = _count;
_count = 0;
Array.Clear(_array, 0, count);
}

}

var count = _count;
_count = 0;
Array.Clear(_array, 0, count);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to go with Buffer.BlockCopy(), not sure it's still relevant, but back in the day if this was critical path, there was a perf advantage. Since we cal it to TooArrayAndClear I imagine might be but worth profiling it before doing any perf improvement changes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffer.BlockCopy only works with primitive types.
There is Buffer.MemoryCopy, but I think this one is also intended for unmanaged structs ... not sure about reference types.

}

internal T[] ToArrayAndClear()
{
var array = ToArray();
Clear();
return array;
}

private static void ThrowIfNegativeOrZero(int capacity, string paramName)
{
if (capacity <= 0)
{
ThrowNegativeOrZero(capacity, paramName);
}
}

private static void ThrowNegativeOrZero(int capacity, string paramName)
{
throw new ArgumentOutOfRangeException(paramName, capacity, "Argument must neither be negative nor zero.");
}
}
93 changes: 93 additions & 0 deletions src/Sentry/Internal/BatchProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
using System.Timers;
using Sentry.Protocol;
using Sentry.Protocol.Envelopes;

namespace Sentry.Internal;

/// <summary>
/// The Sentry Batch Processor.
/// This implementation is not complete yet.
/// Also, the specification is still work in progress.
/// </summary>
/// <remarks>
/// Sentry Specification: <see href="https://develop.sentry.dev/sdk/telemetry/spans/batch-processor/"/>.
/// OpenTelemetry spec: <see href="https://github.com/open-telemetry/opentelemetry-collector/blob/main/processor/batchprocessor/README.md"/>.
/// </remarks>
internal sealed class BatchProcessor : IDisposable
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could think about giving this a more specific/descriptive name. BatchProcessor is very generic and we could have other batch processors (for metrics or other things) in the future.

If we weren't worried about long names, I guess it would be a StructuredLogBatchProcessor... Or a more concise alternative might be LogBatch (so you'd do something like _logBatch.Enqueue(log))?

Alternatively, it could be made generic (like the BatchBuffer class that it uses), but I'd recommend we do that if/when we need a batch processor for a second category of Sentry events (it's hard to make something properly generic until you have 2 or 3 different concrete implementations).

{
private readonly IHub _hub;
private readonly BatchProcessorTimer _timer;
private readonly BatchBuffer<SentryLog> _logs;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we have a _lock and a _logs buffer here.

Any reason not to use a ConcurrentQueue or a ConcurrentQueueLite? It'd be good to have fewer classes to maintain (unless they're truly necessary).

private readonly object _lock;

private DateTime _lastFlush = DateTime.MinValue;

public BatchProcessor(IHub hub, int batchCount, TimeSpan batchInterval)
: this(hub, batchCount, new TimersBatchProcessorTimer(batchInterval))
{
}

public BatchProcessor(IHub hub, int batchCount, BatchProcessorTimer timer)
{
_hub = hub;

_timer = timer;
_timer.Elapsed += OnIntervalElapsed;

_logs = new BatchBuffer<SentryLog>(batchCount);
_lock = new object();
}

internal void Enqueue(SentryLog log)
{
lock (_lock)
{
EnqueueCore(log);
}
}

private void EnqueueCore(SentryLog log)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is only called from the internal Enqueue method. I guess what it mainly does is avoid a bit of code indentation to make the Enqueue method easier to read... in which case it could be made an inline function of that method.

{
var isFirstLog = _logs.IsEmpty;
var added = _logs.TryAdd(log);
Debug.Assert(added, $"Since we currently have no lock-free scenario, it's unexpected to exceed the {nameof(BatchBuffer<SentryLog>)}'s capacity.");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Debug.Assert statement on line 59 could be problematic in release builds where assertions are disabled. If this is a critical invariant that must be maintained, consider throwing an exception instead or using a different approach to ensure the condition is always checked.

Suggested change
var added = _logs.TryAdd(log);
Debug.Assert(added, $"Since we currently have no lock-free scenario, it's unexpected to exceed the {nameof(BatchBuffer<SentryLog>)}'s capacity.");
if (!added)
{
throw new InvalidOperationException($"Failed to add item to {nameof(BatchBuffer<SentryLog>)}. This indicates a potential threading issue or buffer overflow.");
}


if (isFirstLog && !_logs.IsFull)
{
_timer.Enabled = true;
}
else if (_logs.IsFull)
{
_timer.Enabled = false;
Flush();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're doing way too much work in the critical path.

Think about what absolutely must happen inside the lock, and what we can move/copy around to have it happen async.

Once the reference that's mutated concurrently is swapped with a new one, new messages can start logging, and you can continue async doing capture stuff. Like allocating the strucutered log, envelope, calling Capture etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be better off using a couple of arrays and doing some reference swap.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did start out with a lock-free solution for non-flushing logs.
And then realized a flushing log takes 2-3 ms.
But I was not considering the context of Unity where this is unacceptable.
I'm going back to the (mostly) lock-free solution I had initially, and will build up on that to be mostly lock-free with something like a Ring-Buffer of Batch-Buffers.

Thanks for the feedback ... also thanks @bitsandfoxes.

}
}

private void Flush()
{
_lastFlush = DateTime.UtcNow;

var logs = _logs.ToArrayAndClear();
_ = _hub.CaptureEnvelope(Envelope.FromLog(new StructuredLog(logs)));
}

private void OnIntervalElapsed(object? sender, ElapsedEventArgs e)
{
_timer.Enabled = false;

lock (_lock)
{
if (!_logs.IsEmpty && e.SignalTime > _lastFlush)
{
Flush();
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The locking strategy could potentially cause performance issues under high load. Consider whether the lock needs to be held for the entire duration of the flush operation, or if it could be released earlier to allow more items to be enqueued while the flush is in progress.

Suggested change
lock (_lock)
{
if (!_logs.IsEmpty && e.SignalTime > _lastFlush)
{
Flush();
}
lock (_lock)
{
if (!_logs.IsEmpty && e.SignalTime > _lastFlush)
{
// Take a snapshot and release the lock quickly
var logsToFlush = _logs.ToArrayAndClear();
_lastFlush = DateTime.UtcNow;
// Release lock before expensive I/O operation
Task.Run(() =>
{
try
{
_ = _hub.CaptureEnvelope(Envelope.FromLog(new StructuredLog(logsToFlush)));
}
catch (Exception ex)
{
// Handle exception appropriately
}
});
}
}

}
}

public void Dispose()
{
_timer.Enabled = false;
_timer.Elapsed -= OnIntervalElapsed;
_timer.Dispose();
}
}
61 changes: 61 additions & 0 deletions src/Sentry/Internal/BatchProcessorTimer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using System.Timers;

namespace Sentry.Internal;

internal abstract class BatchProcessorTimer : IDisposable
{
protected BatchProcessorTimer()
{
}

public abstract bool Enabled { get; set; }

public abstract event EventHandler<ElapsedEventArgs> Elapsed;

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
}
}

internal sealed class TimersBatchProcessorTimer : BatchProcessorTimer
{
private readonly System.Timers.Timer _timer;

public TimersBatchProcessorTimer(TimeSpan interval)
{
_timer = new System.Timers.Timer(interval.TotalMilliseconds)
{
AutoReset = false,
Enabled = false,
};
_timer.Elapsed += OnElapsed;
}

public override bool Enabled
{
get => _timer.Enabled;
set => _timer.Enabled = value;
}

public override event EventHandler<ElapsedEventArgs>? Elapsed;

private void OnElapsed(object? sender, ElapsedEventArgs e)
{
Elapsed?.Invoke(sender, e);
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
_timer.Elapsed -= OnElapsed;
_timer.Dispose();
}
}
}
35 changes: 31 additions & 4 deletions src/Sentry/Internal/DefaultSentryStructuredLogger.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using Sentry.Extensibility;
using Sentry.Infrastructure;
using Sentry.Protocol.Envelopes;

namespace Sentry.Internal;

Expand All @@ -10,13 +9,33 @@ internal sealed class DefaultSentryStructuredLogger : SentryStructuredLogger
private readonly SentryOptions _options;
private readonly ISystemClock _clock;

private readonly BatchProcessor _batchProcessor;

internal DefaultSentryStructuredLogger(IHub hub, SentryOptions options, ISystemClock clock)
{
Debug.Assert(options is { Experimental.EnableLogs: true });

_hub = hub;
_options = options;
_clock = clock;

_batchProcessor = new BatchProcessor(hub, ClampBatchCount(options.Experimental.InternalBatchSize), ClampBatchInterval(options.Experimental.InternalBatchTimeout));
}

private static int ClampBatchCount(int batchCount)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this go in the setter for the InternalBatchSize on the options instead?

If an invalid value has been configured, should we either throw an exception (we'd only do that if it happened when initialising the options - we don't generally want the SDK to throw exceptions otherwise) or at the very least log a message letting people know they're not configuring things correctly?

{
return batchCount <= 0
? 1
: batchCount > 1_000_000
? 1_000_000
: batchCount;
Comment on lines +24 to +31

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ClampBatchCount method uses magic numbers (1 and 1,000,000) without explanation. Consider defining these as named constants with clear documentation about why these specific limits were chosen.

Suggested change
private static int ClampBatchCount(int batchCount)
{
return batchCount <= 0
? 1
: batchCount > 1_000_000
? 1_000_000
: batchCount;
private const int MinBatchCount = 1;
private const int MaxBatchCount = 1_000_000;
private static int ClampBatchCount(int batchCount)
{
return batchCount <= 0
? MinBatchCount
: batchCount > MaxBatchCount
? MaxBatchCount
: batchCount;
}

}

private static TimeSpan ClampBatchInterval(TimeSpan batchInterval)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, could this be moved to the setter?

{
return batchInterval.TotalMilliseconds is <= 0 or > int.MaxValue
? TimeSpan.FromMilliseconds(int.MaxValue)
: batchInterval;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ClampBatchInterval method uses int.MaxValue as the maximum timeout, but this could result in very long timeouts that might not be practical. Consider using a more reasonable maximum timeout (e.g., 30 seconds as mentioned in the documentation).

Suggested change
private static TimeSpan ClampBatchInterval(TimeSpan batchInterval)
{
return batchInterval.TotalMilliseconds is <= 0 or > int.MaxValue
? TimeSpan.FromMilliseconds(int.MaxValue)
: batchInterval;
}
private static readonly TimeSpan MaxBatchInterval = TimeSpan.FromSeconds(30);
private static TimeSpan ClampBatchInterval(TimeSpan batchInterval)
{
return batchInterval.TotalMilliseconds <= 0
? TimeSpan.FromMilliseconds(1)
: batchInterval > MaxBatchInterval
? MaxBatchInterval
: batchInterval;
}


private protected override void CaptureLog(SentryLogLevel level, string template, object[]? parameters, Action<SentryLog>? configureLog)
Expand Down Expand Up @@ -71,9 +90,17 @@ private protected override void CaptureLog(SentryLogLevel level, string template

if (configuredLog is not null)
{
//TODO: enqueue in Batch-Processor / Background-Worker
// see https://github.com/getsentry/sentry-dotnet/issues/4132
_ = _hub.CaptureEnvelope(Envelope.FromLog(configuredLog));
_batchProcessor.Enqueue(configuredLog);
}
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
_batchProcessor.Dispose();
}

base.Dispose(disposing);
}
}
2 changes: 2 additions & 0 deletions src/Sentry/Internal/Hub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,8 @@ public void Dispose()
_memoryMonitor?.Dispose();
#endif

Logger.Dispose();

try
{
CurrentClient.FlushAsync(_options.ShutdownTimeout).ConfigureAwait(false).GetAwaiter().GetResult();
Expand Down
7 changes: 2 additions & 5 deletions src/Sentry/Protocol/Envelopes/Envelope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -445,17 +445,14 @@ internal static Envelope FromClientReport(ClientReport clientReport)
return new Envelope(header, items);
}

// TODO: This is temporary. We don't expect single log messages to become an envelope by themselves since batching is needed
[Experimental(DiagnosticId.ExperimentalFeature)]
internal static Envelope FromLog(SentryLog log)
internal static Envelope FromLog(StructuredLog log)
{
//TODO: allow batching Sentry logs
//see https://github.com/getsentry/sentry-dotnet/issues/4132
var header = DefaultHeader;

var items = new[]
{
EnvelopeItem.FromLog(log)
EnvelopeItem.FromLog(log),
};

return new Envelope(header, items);
Expand Down
6 changes: 2 additions & 4 deletions src/Sentry/Protocol/Envelopes/EnvelopeItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,12 @@ internal static EnvelopeItem FromClientReport(ClientReport report)
}

[Experimental(Infrastructure.DiagnosticId.ExperimentalFeature)]
internal static EnvelopeItem FromLog(SentryLog log)
internal static EnvelopeItem FromLog(StructuredLog log)
{
//TODO: allow batching Sentry logs
//see https://github.com/getsentry/sentry-dotnet/issues/4132
var header = new Dictionary<string, object?>(3, StringComparer.Ordinal)
{
[TypeKey] = TypeValueLog,
["item_count"] = 1,
["item_count"] = log.Length,
["content_type"] = "application/vnd.sentry.items.log+json",
};

Expand Down
42 changes: 42 additions & 0 deletions src/Sentry/Protocol/StructuredLog.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using Sentry.Extensibility;

namespace Sentry.Protocol;

/// <summary>
/// Represents the Sentry Log protocol.
/// </summary>
/// <remarks>
/// Sentry Docs: <see href="https://docs.sentry.io/product/explore/logs/"/>.
/// Sentry Developer Documentation: <see href="https://develop.sentry.dev/sdk/telemetry/logs/"/>.
/// </remarks>
internal sealed class StructuredLog : ISentryJsonSerializable
{
private readonly SentryLog[] _items;

public StructuredLog(SentryLog log)
{
_items = [log];
}

public StructuredLog(SentryLog[] logs)
{
_items = logs;
}

public int Length => _items.Length;
public ReadOnlySpan<SentryLog> Items => _items;

public void WriteTo(Utf8JsonWriter writer, IDiagnosticLogger? logger)
{
writer.WriteStartObject();
writer.WriteStartArray("items");

foreach (var log in _items)
{
log.WriteTo(writer, logger);
}

writer.WriteEndArray();
writer.WriteEndObject();
}
}
Loading
Loading