Skip to content

More updates for the current OTel (OpenTelemetry) conventions #1717

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);

using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length)
: default;

ulong publishSequenceNumber = 0;
Expand Down Expand Up @@ -117,7 +117,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);

using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length)
: default;

ulong publishSequenceNumber = 0;
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -918,10 +918,10 @@ await ModelSendAsync(in method, k.CancellationToken)
BasicGetResult? result = await k;

using Activity? activity = result != null
? RabbitMQActivitySource.Receive(result.RoutingKey,
? RabbitMQActivitySource.BasicGet(result.RoutingKey,
result.Exchange,
result.DeliveryTag, result.BasicProperties, result.Body.Length)
: RabbitMQActivitySource.ReceiveEmpty(queue);
: RabbitMQActivitySource.BasicGetEmpty(queue);

activity?.SetStartTime(k.StartTime);

Expand Down
37 changes: 22 additions & 15 deletions projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ public static class RabbitMQActivitySource
// https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes
internal const string MessageId = "messaging.message.id";
internal const string MessageConversationId = "messaging.message.conversation_id";
internal const string MessagingOperationName = "messaging.operation.name";
internal const string MessagingOperationNameBasicDeliver = "deliver";
internal const string MessagingOperationNameBasicGet = "fetch";
internal const string MessagingOperationNameBasicGetEmpty = "fetch (empty)";
internal const string MessagingOperationNameBasicPublish = "publish";
internal const string MessagingOperationType = "messaging.operation.type";
internal const string MessagingOperationTypeSend = "send";
internal const string MessagingOperationTypeProcess = "process";
Expand Down Expand Up @@ -56,7 +61,7 @@ public static class RabbitMQActivitySource
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1")
};

internal static Activity? Send(string routingKey, string exchange, int bodySize,
internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize,
ActivityContext linkedContext = default)
{
if (!s_publisherSource.HasListeners())
Expand All @@ -66,41 +71,42 @@ public static class RabbitMQActivitySource

Activity? activity = linkedContext == default
? s_publisherSource.StartRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend,
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicPublish} {routingKey}" : MessagingOperationNameBasicPublish,
ActivityKind.Producer)
: s_publisherSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend,
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicPublish} {routingKey}" : MessagingOperationNameBasicPublish,
ActivityKind.Producer, linkedContext);
if (activity != null && activity.IsAllDataRequested)
{
PopulateMessagingTags(MessagingOperationTypeSend, routingKey, exchange, 0, bodySize, activity);
PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, bodySize, activity);
}

return activity;

}

internal static Activity? ReceiveEmpty(string queue)
internal static Activity? BasicGetEmpty(string queue)
{
if (!s_subscriberSource.HasListeners())
{
return null;
}

Activity? activity = s_subscriberSource.StartRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{queue} {MessagingOperationTypeReceive}" : MessagingOperationTypeReceive,
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGetEmpty} {queue}" : MessagingOperationNameBasicGetEmpty,
ActivityKind.Consumer);
if (activity != null && activity.IsAllDataRequested)
{
activity
.SetTag(MessagingOperationType, MessagingOperationTypeReceive)
.SetTag(MessagingOperationName, MessagingOperationNameBasicGetEmpty)
.SetTag(MessagingDestination, "amq.default");
}

return activity;
}

internal static Activity? Receive(string routingKey, string exchange, ulong deliveryTag,
internal static Activity? BasicGet(string routingKey, string exchange, ulong deliveryTag,
IReadOnlyBasicProperties readOnlyBasicProperties, int bodySize)
{
if (!s_subscriberSource.HasListeners())
Expand All @@ -110,11 +116,11 @@ public static class RabbitMQActivitySource

// Extract the PropagationContext of the upstream parent from the message headers.
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeReceive}" : MessagingOperationTypeReceive, ActivityKind.Consumer,
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGet} {routingKey}" : MessagingOperationNameBasicGet, ActivityKind.Consumer,
ContextExtractor(readOnlyBasicProperties));
if (activity != null && activity.IsAllDataRequested)
{
PopulateMessagingTags(MessagingOperationTypeReceive, routingKey, exchange, deliveryTag, readOnlyBasicProperties,
PopulateMessagingTags(MessagingOperationTypeReceive, MessagingOperationNameBasicGet, routingKey, exchange, deliveryTag, readOnlyBasicProperties,
bodySize, activity);
}

Expand All @@ -131,11 +137,11 @@ public static class RabbitMQActivitySource

// Extract the PropagationContext of the upstream parent from the message headers.
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeProcess}" : MessagingOperationTypeProcess,
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicDeliver} {routingKey}" : MessagingOperationNameBasicDeliver,
ActivityKind.Consumer, ContextExtractor(basicProperties));
if (activity != null && activity.IsAllDataRequested)
{
PopulateMessagingTags(MessagingOperationTypeProcess, routingKey, exchange,
PopulateMessagingTags(MessagingOperationTypeProcess, MessagingOperationNameBasicDeliver, routingKey, exchange,
deliveryTag, basicProperties, bodySize, activity);
}

Expand All @@ -157,10 +163,10 @@ public static class RabbitMQActivitySource
?.Start();
}

private static void PopulateMessagingTags(string operation, string routingKey, string exchange,
private static void PopulateMessagingTags(string operationType, string operationName, string routingKey, string exchange,
ulong deliveryTag, IReadOnlyBasicProperties readOnlyBasicProperties, int bodySize, Activity activity)
{
PopulateMessagingTags(operation, routingKey, exchange, deliveryTag, bodySize, activity);
PopulateMessagingTags(operationType, operationName, routingKey, exchange, deliveryTag, bodySize, activity);

if (!string.IsNullOrEmpty(readOnlyBasicProperties.CorrelationId))
{
Expand All @@ -173,11 +179,12 @@ private static void PopulateMessagingTags(string operation, string routingKey, s
}
}

private static void PopulateMessagingTags(string operation, string routingKey, string exchange,
private static void PopulateMessagingTags(string operationType, string operationName, string routingKey, string exchange,
ulong deliveryTag, int bodySize, Activity activity)
{
activity
.SetTag(MessagingOperationType, operation)
.SetTag(MessagingOperationType, operationType)
.SetTag(MessagingOperationName, operationName)
.SetTag(MessagingDestination, string.IsNullOrEmpty(exchange) ? "amq.default" : exchange)
.SetTag(MessagingDestinationRoutingKey, routingKey)
.SetTag(MessagingBodySize, bodySize);
Expand Down
6 changes: 3 additions & 3 deletions projects/Test/SequentialIntegration/TestActivitySource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ private static ActivityListener StartActivityListener(List<Activity> activities)
private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName,
List<Activity> activityList, bool isDeliver = false)
{
string childName = isDeliver ? "process" : "receive";
string childName = isDeliver ? "deliver" : "fetch";
Activity[] activities = activityList.ToArray();
Assert.NotEmpty(activities);

Expand All @@ -418,11 +418,11 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN
}

Activity sendActivity = activities.First(x =>
x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} send" : "send") &&
x.OperationName == (useRoutingKeyAsOperationName ? $"publish {queueName}" : "publish") &&
x.GetTagItem(RabbitMQActivitySource.MessagingDestinationRoutingKey) is string routingKeyTag &&
routingKeyTag == $"{queueName}");
Activity receiveActivity = activities.Single(x =>
x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} {childName}" : $"{childName}") &&
x.OperationName == (useRoutingKeyAsOperationName ? $"{childName} {queueName}" : childName) &&
x.Links.First().Context.TraceId == sendActivity.TraceId);
Assert.Equal(ActivityKind.Producer, sendActivity.Kind);
Assert.Equal(ActivityKind.Consumer, receiveActivity.Kind);
Expand Down
11 changes: 8 additions & 3 deletions projects/Test/SequentialIntegration/TestOpenTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName,
List<Activity> activityList, bool isDeliver = false, string baggageGuid = null)
{
string childName = isDeliver ? "process" : "receive";
string childName = isDeliver ? "deliver" : "fetch";
string childType = isDeliver ? "process" : "receive";
Activity[] activities = activityList.ToArray();
Assert.NotEmpty(activities);
foreach (var item in activities)
Expand All @@ -354,11 +355,11 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN
}

Activity sendActivity = activities.First(x =>
x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} send" : "send") &&
x.OperationName == (useRoutingKeyAsOperationName ? $"publish {queueName}" : "publish") &&
x.GetTagItem(RabbitMQActivitySource.MessagingDestinationRoutingKey) is string routingKeyTag &&
routingKeyTag == $"{queueName}");
Activity receiveActivity = activities.Single(x =>
x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} {childName}" : $"{childName}") &&
x.OperationName == (useRoutingKeyAsOperationName ? $"{childName} {queueName}" : childName) &&
x.Links.First().Context.TraceId == sendActivity.TraceId);
Assert.Equal(ActivityKind.Producer, sendActivity.Kind);
Assert.Equal(ActivityKind.Consumer, receiveActivity.Kind);
Expand All @@ -380,6 +381,10 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingEnvelopeSize);
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingBodySize);
AssertIntTagGreaterThanZero(receiveActivity, RabbitMQActivitySource.MessagingBodySize);
AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessagingOperationType, childType);
AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessagingOperationName, childName);
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationType, "send");
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationName, "publish");
}
}
}
Loading