Skip to content

Commit b427d4d

Browse files
authored
Merge pull request #31 from Particular/fix-metrics
Metrics fixes
2 parents 848763e + b7f8ab1 commit b427d4d

File tree

5 files changed

+77
-3
lines changed

5 files changed

+77
-3
lines changed

src/NServiceBus.Metrics/CriticalTime/CriticalTimeMetricBuilder.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,20 @@
77

88
class CriticalTimeMetricBuilder : MetricBuilder
99
{
10+
readonly ResetMetricTimer resetMetricTimer;
11+
12+
public CriticalTimeMetricBuilder(ResetMetricTimer resetMetricTimer)
13+
{
14+
this.resetMetricTimer = resetMetricTimer;
15+
}
16+
1017
public override void WireUp(FeatureConfigurationContext featureConfigurationContext)
1118
{
19+
resetMetricTimer.NoMessageSentForAWhile += (sender, message) =>
20+
{
21+
criticalTimeTimer.Record(0, TimeUnit.Milliseconds);
22+
};
23+
1224
featureConfigurationContext.Pipeline.OnReceivePipelineCompleted(e =>
1325
{
1426
DateTime timeSent;
@@ -23,6 +35,6 @@ public override void WireUp(FeatureConfigurationContext featureConfigurationCont
2335
});
2436
}
2537

26-
[Timer("Critical Time", "Messages", "Age of the oldest message in the queue.")]
38+
[Timer("Critical Time", "Messages", "The time it took from sending to processing the message.")]
2739
Timer criticalTimeTimer = default(Timer);
2840
}

src/NServiceBus.Metrics/MetricsFeature.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ protected override void Setup(FeatureConfigurationContext context)
1717
{
1818
context.ThrowIfSendonly();
1919

20-
context.RegisterMetricBuilder(new CriticalTimeMetricBuilder());
20+
var resetMetricTimer = new ResetMetricTimer(context);
21+
22+
context.RegisterMetricBuilder(new CriticalTimeMetricBuilder(resetMetricTimer));
2123
context.RegisterMetricBuilder(new PerformanceStatisticsMetricBuilder());
22-
context.RegisterMetricBuilder(new ProcessingTimeMetricBuilder());
24+
context.RegisterMetricBuilder(new ProcessingTimeMetricBuilder(resetMetricTimer));
2325
context.RegisterMetricBuilder(new QueueLengthMetricBuilder());
2426

2527
var settings = context.Settings;

src/NServiceBus.Metrics/NServiceBus.Metrics.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
<Compile Include="Reporters\TraceReport.cs" />
8787
<Compile Include="AssemblyInfo.cs" />
8888
<Compile Include="InternalsVisibleTo.cs" />
89+
<Compile Include="ResetMetricTimer.cs" />
8990
</ItemGroup>
9091
<ItemGroup>
9192
<None Include="NServiceBus.snk" />

src/NServiceBus.Metrics/ProcessingTime/ProcessingTimeMetricBuilder.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,20 @@
66

77
class ProcessingTimeMetricBuilder : MetricBuilder
88
{
9+
readonly ResetMetricTimer resetMetricTimer;
10+
11+
public ProcessingTimeMetricBuilder(ResetMetricTimer resetMetricTimer)
12+
{
13+
this.resetMetricTimer = resetMetricTimer;
14+
}
15+
916
public override void WireUp(FeatureConfigurationContext featureConfigurationContext)
1017
{
18+
resetMetricTimer.NoMessageSentForAWhile += (sender, message) =>
19+
{
20+
processingTimeTimer.Record(0, TimeUnit.Milliseconds);
21+
};
22+
1123
featureConfigurationContext.Pipeline.OnReceivePipelineCompleted(e =>
1224
{
1325
var processingTimeInMilliseconds = ProcessingTimeCalculator.Calculate(e.StartedAt, e.CompletedAt).TotalMilliseconds;
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using NServiceBus;
4+
using NServiceBus.Features;
5+
6+
class ResetMetricTimer
7+
{
8+
public event EventHandler NoMessageSentForAWhile;
9+
10+
public ResetMetricTimer(FeatureConfigurationContext featureConfigurationContext)
11+
{
12+
timer = new System.Threading.Timer(ResetCounterValueIfNoMessageHasBeenProcessedRecently, null, 0, 2000);
13+
14+
featureConfigurationContext.Pipeline.OnReceivePipelineCompleted(e =>
15+
{
16+
DateTime timeSent;
17+
if (e.TryGetTimeSent(out timeSent))
18+
{
19+
lastMessageProcessedTime = e.CompletedAt;
20+
}
21+
22+
return Task.FromResult(0);
23+
});
24+
25+
}
26+
27+
void ResetCounterValueIfNoMessageHasBeenProcessedRecently(object state)
28+
{
29+
// Concurreny : This runs every 2s and might just fire before an update
30+
if (NoMessageHasBeenProcessedRecently())
31+
{
32+
NoMessageSentForAWhile?.Invoke(this, null);
33+
}
34+
}
35+
36+
bool NoMessageHasBeenProcessedRecently()
37+
{
38+
var timeFromLastMessageProcessed = DateTime.UtcNow - lastMessageProcessedTime;
39+
// Concurrency : This currently ignores messages being processed atm.
40+
return timeFromLastMessageProcessed > estimatedMaximumProcessingDuration;
41+
}
42+
43+
TimeSpan estimatedMaximumProcessingDuration = TimeSpan.FromSeconds(2);
44+
DateTime lastMessageProcessedTime;
45+
// ReSharper disable once NotAccessedField.Local
46+
System.Threading.Timer timer; // We need this member or GC will dispose timer immediately.
47+
}

0 commit comments

Comments
 (0)