Skip to content

Add support for Filter and Enrich for OpenTelemetry activities #859

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: release/2.7
Choose a base branch
from
Open
10 changes: 10 additions & 0 deletions src/NATS.Client.Core/Internal/NatsInstrumentationContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace NATS.Client.Core.Internal;

public readonly record struct NatsInstrumentationContext(
string Subject,
NatsHeaders? Headers,
string? ReplyTo,
string? QueueGroup,
long? BodySize,
long? Size,
INatsConnection? Connection);
26 changes: 26 additions & 0 deletions src/NATS.Client.Core/Internal/NatsInstrumentationOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System.Diagnostics;

namespace NATS.Client.Core.Internal;

/// <summary>
/// Options for the OpenTelemetry instrumentation.
/// </summary>
public sealed class NatsInstrumentationOptions
{
public static NatsInstrumentationOptions Default => new();

/// <summary>
/// Gets or sets a filter function that determines whether or not to collect telemetry on a per request basis.
/// </summary>
/// <remarks>
/// The return value for the filter function is interpreted as follows:
/// - If filter returns `true`, the request is collected.
/// - If filter returns `false` or throws an exception the request is NOT collected.
/// </remarks>
public Func<NatsInstrumentationContext, bool>? Filter { get; set; }

/// <summary>
/// Gets or sets an action to enrich an Activity.
/// </summary>
public Action<Activity, NatsInstrumentationContext>? Enrich { get; set; }
}
62 changes: 49 additions & 13 deletions src/NATS.Client.Core/Internal/Telemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ namespace NATS.Client.Core.Internal;
// https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes
internal static class Telemetry
{
internal static readonly ActivitySource NatsActivities = new(name: NatsActivitySource);

private const string NatsActivitySource = "NATS.Net";
public const string NatsActivitySource = "NATS.Net";
public static readonly ActivitySource NatsActivities = new(name: NatsActivitySource);
private static readonly object BoxedTrue = true;

internal static bool HasListeners() => NatsActivities.HasListeners();
public static bool HasListeners() => NatsActivities.HasListeners();

internal static Activity? StartSendActivity(
public static Activity? StartSendActivity(
string name,
INatsConnection? connection,
string subject,
Expand All @@ -23,6 +22,18 @@ internal static class Telemetry
if (!NatsActivities.HasListeners())
return null;

var instrumentationContext = new NatsInstrumentationContext(
Subject: subject,
Headers: null,
ReplyTo: replyTo,
QueueGroup: null,
BodySize: null,
Size: null,
Connection: connection);

if (NatsInstrumentationOptions.Default.Filter is { } filter && !filter(instrumentationContext))
return null;

KeyValuePair<string, object?>[] tags;
if (connection is NatsConnection { ServerInfo: not null } conn)
{
Expand Down Expand Up @@ -63,14 +74,19 @@ internal static class Telemetry
tags[3] = new KeyValuePair<string, object?>(Constants.ReplyTo, replyTo);
}

return NatsActivities.StartActivity(
var activity = NatsActivities.StartActivity(
name,
kind: ActivityKind.Producer,
parentContext: parentContext ?? default,
tags: tags);

if (activity is not null)
NatsInstrumentationOptions.Default.Enrich?.Invoke(activity, instrumentationContext);

return activity;
}

internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? headers)
public static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders? headers)
{
if (activity is null)
return;
Expand All @@ -87,15 +103,15 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders?
return;
}

// There are cases where headers reused internally (e.g. JetStream publish retry)
// There are cases where headers reused publicly (e.g. JetStream publish retry)
// there may also be cases where application can reuse the same header
// in which case we should still be able to overwrite headers with telemetry fields
// even though headers would be set to readonly before being passed down in publish methods.
headers.SetOverrideReadOnly(fieldName, fieldValue);
});
}

internal static Activity? StartReceiveActivity(
public static Activity? StartReceiveActivity(
INatsConnection? connection,
string name,
string subscriptionSubject,
Expand All @@ -109,6 +125,18 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders?
if (!NatsActivities.HasListeners())
return null;

var instrumentationContext = new NatsInstrumentationContext(
Subject: subject,
Headers: headers,
ReplyTo: replyTo,
QueueGroup: queueGroup,
BodySize: bodySize,
Size: size,
Connection: connection);

if (NatsInstrumentationOptions.Default.Filter is { } filter && !filter(instrumentationContext))
return null;

KeyValuePair<string, object?>[] tags;
if (connection is NatsConnection { ServerInfo: not null } conn)
{
Expand Down Expand Up @@ -165,14 +193,19 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders?
if (headers is null || !TryParseTraceContext(headers, out var context))
context = default;

return NatsActivities.StartActivity(
var activity = NatsActivities.StartActivity(
name,
kind: ActivityKind.Consumer,
parentContext: context,
tags: tags);

if (activity is not null)
NatsInstrumentationOptions.Default.Enrich?.Invoke(activity, instrumentationContext);

return activity;
}

internal static void SetException(Activity? activity, Exception exception)
public static void SetException(Activity? activity, Exception exception)
{
if (activity is null)
return;
Expand Down Expand Up @@ -251,11 +284,14 @@ private static bool TryParseTraceContext(NatsHeaders headers, out ActivityContex
},
out var traceParent,
out var traceState);

#if NET6_0
return ActivityContext.TryParse(traceParent, traceState, out context);
#else
return ActivityContext.TryParse(traceParent, traceState, isRemote: true, out context);
#endif
}

internal class Constants
public class Constants
{
public const string True = "true";
public const string False = "false";
Expand Down
Loading
Loading