-
Notifications
You must be signed in to change notification settings - Fork 0
Feature/gh00839 use two web job instances to create messages to azure service bus based on content in busevent table #159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 30 commits
00d9c43
3d2762b
fa62865
0ad81c7
14d1c5b
6bb56ff
99ed30e
a0d5012
440e121
0bb8143
0ed9d2f
633c18c
c9f7942
b3b72dc
a14514a
a121b8d
bd6a93b
301a4c1
ec7afc7
0cf7900
b0ffd32
2958521
ca2c315
70c585e
59e53cf
5418c66
a8e7f33
2656ff4
1c8612b
a7ef7d8
293c268
46dd952
c2a6bf1
1336ef2
c1d056e
ee9fa10
e32647d
5d54c08
09bfe04
7c58c2b
f9e8770
9090baa
26296c5
53ed59d
7715f73
f1207b2
e358186
c3b8a7a
cffaff0
2e974b3
68b24c7
15fe7c1
a30b71a
82376c0
59d7b9a
ef6d0a7
1fbf0b3
4e9baf5
8ec9f12
9c2314f
7433c6a
2547e65
1d1c787
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,7 @@ resources: | |
type: github | ||
name: equinor/procosys-infra | ||
endpoint: 'equinor' | ||
ref: master | ||
ref: GH00839-Use-two-web-job-instances-to-create-messages-to-Azure-service-bus-based-on-content-in-busevent-table | ||
|
||
variables: | ||
- template: src/variables/global-variables.yml@templates | ||
|
@@ -59,7 +59,19 @@ stages: | |
rootFolderOrFile: '$(Build.BinariesDirectory)/build' | ||
solution: '**/*.sln' | ||
sourceFolder: '$(System.DefaultWorkingDirectory)/src/$(projectName)/bin/$(buildConfiguration)/net6.0' | ||
targetFolder: '$(Build.BinariesDirectory)/build/App_Data/jobs/continuous/$(projectName)' | ||
searchFolder: '$(System.DefaultWorkingDirectory)' | ||
targetFolder: '$(Build.BinariesDirectory)/build/App_Data/jobs/continuous' | ||
testSelector: 'testAssemblies' | ||
testAssemblyVer2: | | ||
**/*.Specs/bin/$(buildConfiguration)/*.Specs.dll | ||
testArtifactName: 'test' | ||
testInstances: | ||
|
||
- ServiceA | ||
- ServiceB | ||
prodArtifactName: 'prod' | ||
prodInstances: | ||
- ServiceA | ||
- ServiceB | ||
versionSpec: '6.7.0' | ||
projectName: '$(projectName)' | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,10 @@ | |
</PropertyGroup> | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are all of these changes needed? Double check that we need these dependencies. |
||
<ItemGroup> | ||
<PackageReference Include="Azure.Data.AppConfiguration" Version="1.2.0" /> | ||
<PackageReference Include="Azure.Identity" Version="1.11.3" /> | ||
<PackageReference Include="Microsoft.ApplicationInsights" Version="2.21.0" /> | ||
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.1.1" /> | ||
<PackageReference Include="Oracle.EntityFrameworkCore" Version="7.21.12" /> | ||
<PackageReference Include="System.Text.Json" Version="7.0.3" /> | ||
</ItemGroup> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
using System.Collections.Generic; | ||
using System.Threading.Tasks; | ||
|
||
namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces; | ||
|
||
public interface IPlantRepository | ||
{ | ||
List<string> GetAllPlants(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
using System.Collections.Generic; | ||
using Equinor.ProCoSys.BusSenderWorker.Core.Models; | ||
using Microsoft.Extensions.Configuration; | ||
|
||
namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces; | ||
public interface IPlantService | ||
{ | ||
List<string> GetPlantsHandledByInstance(List<PlantsByInstance>? plantsByInstances, List<string> allPlants, | ||
string instanceName); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
#pragma warning disable CS8618 | ||
|
||
namespace Equinor.ProCoSys.BusSenderWorker.Core.Models; | ||
|
||
public class Plant | ||
{ | ||
public string ProjectSchema { get; set; } | ||
public string IsVoided { get; set; } | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.ComponentModel.DataAnnotations; | ||
using System.Linq; | ||
using System.Text; | ||
using System.Text.Json.Serialization; | ||
|
||
using System.Threading.Tasks; | ||
|
||
namespace Equinor.ProCoSys.BusSenderWorker.Core.Models; | ||
public class PlantsByInstance | ||
{ | ||
[Required] | ||
public string InstanceName { get; set; } = string.Empty; | ||
|
||
[Required] | ||
public string Value { get; set; } = string.Empty; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,9 +11,12 @@ | |
using Equinor.ProCoSys.BusSenderWorker.Core.Mappers; | ||
using Equinor.ProCoSys.BusSenderWorker.Core.Models; | ||
using Equinor.ProCoSys.BusSenderWorker.Core.Telemetry; | ||
using Equinor.ProCoSys.PcsServiceBus; | ||
using Equinor.ProCoSys.PcsServiceBus.Sender.Interfaces; | ||
using Equinor.ProCoSys.PcsServiceBus.Topics; | ||
using Microsoft.Extensions.Configuration; | ||
using Microsoft.Extensions.Logging; | ||
using Microsoft.Extensions.Options; | ||
|
||
namespace Equinor.ProCoSys.BusSenderWorker.Core.Services; | ||
|
||
|
@@ -27,14 +30,17 @@ public class BusSenderService : IBusSenderService | |
private readonly Stopwatch _sw; | ||
private readonly ITelemetryClient _telemetryClient; | ||
private readonly IUnitOfWork _unitOfWork; | ||
private readonly string _instanceName; | ||
|
||
public BusSenderService(IPcsBusSender pcsBusSender, | ||
IBusEventRepository busEventRepository, | ||
IUnitOfWork unitOfWork, | ||
ILogger<BusSenderService> logger, | ||
ITelemetryClient telemetryClient, | ||
IBusEventService service, | ||
IQueueMonitorService queueMonitor) | ||
IQueueMonitorService queueMonitor, | ||
IConfiguration configuration, | ||
IOptions<InstanceOptions> instanceOptions) | ||
{ | ||
_pcsBusSender = pcsBusSender; | ||
_busEventRepository = busEventRepository; | ||
|
@@ -44,6 +50,7 @@ public BusSenderService(IPcsBusSender pcsBusSender, | |
_service = service; | ||
_queueMonitor = queueMonitor; | ||
_sw = new Stopwatch(); | ||
_instanceName = instanceOptions.Value.InstanceName; | ||
} | ||
|
||
public async Task CloseConnections() | ||
|
@@ -62,11 +69,11 @@ public async Task HandleBusEvents() | |
var events = await _busEventRepository.GetEarliestUnProcessedEventChunk(); | ||
if (events.Any()) | ||
{ | ||
_logger.LogInformation("BusSenderService found {Count} messages to process after {Sw} ms", events.Count, | ||
_logger.LogInformation("[{InstanceName}] BusSenderService found {Count} messages to process after {Sw} ms", _instanceName, events.Count, | ||
_sw.ElapsedMilliseconds); | ||
_telemetryClient.TrackMetric("BusSender Chunk", events.Count); | ||
await ProcessBusEvents(events); | ||
_logger.LogInformation("BusSenderService ProcessBusEvents used {Sw} ms", _sw.ElapsedMilliseconds); | ||
_logger.LogInformation("[{InstanceName}] BusSenderService ProcessBusEvents used {Sw} ms", _instanceName,_sw.ElapsedMilliseconds); | ||
} | ||
|
||
_sw.Reset(); | ||
|
@@ -180,15 +187,15 @@ private async Task ProcessBusEvents(List<BusEvent> events) | |
var dsw = Stopwatch.StartNew(); | ||
|
||
var unProcessedEvents = events.Where(busEvent => busEvent.Status == Status.UnProcessed).ToList(); | ||
_logger.LogInformation("Amount of messages to process: {Count} ", unProcessedEvents.Count); | ||
_logger.LogInformation("[{InstanceName}] Amount of messages to process: {Count} ", _instanceName, unProcessedEvents.Count); | ||
|
||
foreach (var simpleUnprocessedBusEvent in unProcessedEvents.Where(e => | ||
IsSimpleMessage(e) || e.Event == TagTopic.TopicName)) | ||
|
||
{ | ||
await UpdateEventBasedOnTopic(simpleUnprocessedBusEvent); | ||
} | ||
|
||
_logger.LogInformation("Update loop finished at at {Sw} ms", dsw.ElapsedMilliseconds); | ||
_logger.LogInformation("[{InstanceName}] Update loop finished at at {Sw} ms", _instanceName, dsw.ElapsedMilliseconds); | ||
await _unitOfWork.SaveChangesAsync(); | ||
|
||
|
||
|
@@ -243,6 +250,7 @@ private void TrackMessage(BusEvent busEvent, string busMessageMessageId, string | |
{"ProjectName", message?.ProjectName ?? "NoProject"}, | ||
{"Plant", message?.Plant ?? "NoPlant"}, | ||
{"MessageId", busMessageMessageId ?? "NoID" }, | ||
{"InstanceName", _instanceName}, | ||
//Remove these after debugging | ||
{"BusEventMessageToSend", string.IsNullOrEmpty(message?.ProCoSysGuid) ? "MessageToSend: ( " + busEvent.MessageToSend + " )" : "N/A" }, | ||
{"BusEventMessage", string.IsNullOrEmpty(message?.ProCoSysGuid) ? busEvent.Message : "N/A" }, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using Equinor.ProCoSys.BusSenderWorker.Core.Interfaces; | ||
using Equinor.ProCoSys.BusSenderWorker.Core.Models; | ||
using Equinor.ProCoSys.PcsServiceBus; | ||
using Microsoft.Extensions.Logging; | ||
|
||
|
||
namespace Equinor.ProCoSys.BusSenderWorker.Core.Services; | ||
public class PlantService : IPlantService | ||
{ | ||
private readonly ILogger<PlantService> _logger; | ||
private List<string> _plantsHandledByCurrentInstance = new(); | ||
|
||
public PlantService(ILogger<PlantService> logger) => _logger = logger; | ||
|
||
public List<string> GetPlantsHandledByInstance(List<PlantsByInstance>? plantsByInstances, List<string> allPlants, string instanceName) | ||
{ | ||
_plantsHandledByCurrentInstance = GetPlantsForInstance(plantsByInstances, instanceName); | ||
|
||
if (_plantsHandledByCurrentInstance.Any()) | ||
{ | ||
_logger.LogInformation($"[{instanceName}] Plants configured for this instance: {string.Join(",", _plantsHandledByCurrentInstance)}"); | ||
|
||
var containsRemainingPlants = _plantsHandledByCurrentInstance.Contains(PcsServiceBusInstanceConstants.RemainingPlants); | ||
if (containsRemainingPlants) | ||
{ | ||
// We are also handling cases where RemainingPlants constant is used in combination with actual plants. E.g. PCS$TROLL_A, PCS$OSEBERG_C, REMAININGPLANTS. | ||
AddPlantLeftovers(plantsByInstances, allPlants, instanceName); | ||
} | ||
RemoveInvalidPlants(allPlants, instanceName); | ||
} | ||
|
||
if (!_plantsHandledByCurrentInstance.Any()) | ||
{ | ||
var message = $"[{instanceName}] No plants configured for this instance. Hence it will not start."; | ||
_logger.LogError(message); | ||
throw new Exception(message); | ||
} | ||
|
||
return _plantsHandledByCurrentInstance; | ||
} | ||
|
||
|
||
private void AddPlantLeftovers(IEnumerable<PlantsByInstance> plantsByInstances, IEnumerable<string> allPlants, string instanceName) | ||
{ | ||
var plantsForAllExceptInstance = GetPlantsForAllExceptInstance(plantsByInstances, instanceName); | ||
var plantLeftovers = GetPlantLeftovers(plantsForAllExceptInstance, allPlants); | ||
_plantsHandledByCurrentInstance = _plantsHandledByCurrentInstance.Union(plantLeftovers).ToList(); | ||
} | ||
|
||
private static IEnumerable<string> GetPlantsForAllExceptInstance(IEnumerable<PlantsByInstance> plantsByInstances, string instanceName) => | ||
plantsByInstances | ||
.Where(x => x.InstanceName != instanceName) | ||
.SelectMany(x => x.Value.Split(',')) | ||
|
||
.ToList(); | ||
|
||
private static List<string> GetPlantsForInstance(IEnumerable<PlantsByInstance> plantsByInstances, string instanceName) => | ||
plantsByInstances | ||
.First(x => x.InstanceName == instanceName).Value.Split(',') | ||
|
||
.ToList(); | ||
|
||
private void RemoveInvalidPlants(IEnumerable<string> allPlants, string instanceName) | ||
{ | ||
_plantsHandledByCurrentInstance = | ||
_plantsHandledByCurrentInstance.Except(PcsServiceBusInstanceConstants.AllPlantReplacementConstants).ToList(); | ||
|
||
var plantsNotPresent = _plantsHandledByCurrentInstance.Except(PcsServiceBusInstanceConstants.AllPlantConstants).Except(allPlants).ToList(); | ||
if (plantsNotPresent.Any()) | ||
{ | ||
_logger.LogWarning( | ||
$"[{instanceName}] The following plant(s) is/ are not valid: {string.Join(",", plantsNotPresent)}. Removing them/ it from plants being processed."); | ||
_plantsHandledByCurrentInstance = _plantsHandledByCurrentInstance.Except(plantsNotPresent).ToList(); | ||
} | ||
else | ||
{ | ||
_logger.LogInformation($"[{instanceName}] Plants validated: {string.Join(",", _plantsHandledByCurrentInstance)}"); | ||
} | ||
} | ||
|
||
private static IEnumerable<string> GetPlantLeftovers(IEnumerable<string> handledPlants, IEnumerable<string> allNonVoidedPlants) => | ||
allNonVoidedPlants.Except(handledPlants); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,11 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Threading.Tasks; | ||
using Equinor.ProCoSys.BusSenderWorker.Core.Interfaces; | ||
using Equinor.ProCoSys.BusSenderWorker.Core.Telemetry; | ||
using Equinor.ProCoSys.PcsServiceBus; | ||
using Microsoft.Extensions.Configuration; | ||
using Microsoft.Extensions.Options; | ||
|
||
namespace Equinor.ProCoSys.BusSenderWorker.Core.Services; | ||
|
||
|
@@ -13,15 +16,17 @@ public class QueueMonitorService : IQueueMonitorService | |
private readonly IConfiguration _configuration; | ||
private readonly ISystemClock _systemClock; | ||
private readonly int _queueWriteIntervalMinutes; | ||
private readonly string _instanceName; | ||
|
||
|
||
public QueueMonitorService(ITelemetryClient telemetryClient, IBusEventRepository busEventRepository, IConfiguration configuration, ISystemClock systemClock) | ||
public QueueMonitorService(ITelemetryClient telemetryClient, IBusEventRepository busEventRepository, IConfiguration configuration, ISystemClock systemClock, IOptions<InstanceOptions> instanceOptions) | ||
{ | ||
_telemetryClient = telemetryClient; | ||
_busEventRepository = busEventRepository; | ||
_configuration = configuration; | ||
_systemClock = systemClock; | ||
_queueWriteIntervalMinutes = string.IsNullOrWhiteSpace(configuration["MonitorQueueIntervalMinutes"]) ? 15 : int.Parse(configuration["MonitorQueueIntervalMinutes"]); | ||
|
||
_instanceName = instanceOptions.Value.InstanceName; | ||
} | ||
|
||
public async Task WriteQueueMetrics() | ||
|
@@ -45,18 +50,26 @@ private async Task WriteQueueAge() | |
} | ||
|
||
var waitTime = _systemClock.UtcNow - queueOldestEvent; | ||
_telemetryClient.TrackMetric("QueueAge", waitTime.TotalMinutes); | ||
var properties = new Dictionary<string, string> | ||
{ | ||
{ "InstanceName", _instanceName } | ||
}; | ||
_telemetryClient.TrackMetric("QueueAge", waitTime.TotalMinutes, properties); | ||
} | ||
|
||
private async Task WriteQueueLength() | ||
{ | ||
var queueLength = await _busEventRepository.GetUnProcessedCount(); | ||
_telemetryClient.TrackMetric("QueueLength", queueLength); | ||
var properties = new Dictionary<string, string> | ||
{ | ||
{ "InstanceName", _instanceName } | ||
}; | ||
_telemetryClient.TrackMetric("QueueLength", queueLength, properties); | ||
} | ||
|
||
private bool IsTimeToWriteQueueMetric(DateTime lastQueueWrite) => | ||
_systemClock.UtcNow >= lastQueueWrite.ToUniversalTime().AddMinutes(_queueWriteIntervalMinutes); | ||
|
||
private bool NoEventFound(DateTime oldestEvent) => | ||
private static bool NoEventFound(DateTime oldestEvent) => | ||
oldestEvent.Equals(default); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,32 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using Equinor.ProCoSys.PcsServiceBus; | ||
using Microsoft.ApplicationInsights; | ||
using Microsoft.ApplicationInsights.Channel; | ||
using Microsoft.ApplicationInsights.Extensibility; | ||
using Microsoft.Extensions.Configuration; | ||
using Microsoft.Extensions.Options; | ||
|
||
namespace Equinor.ProCoSys.BusSenderWorker.Core.Telemetry; | ||
|
||
public class RunningExecutableTelemetryInitializer : ITelemetryInitializer | ||
{ | ||
private readonly string _instanceName; | ||
|
||
public RunningExecutableTelemetryInitializer(string instanceName) => _instanceName = instanceName; | ||
|
||
public void Initialize(ITelemetry telemetry) | ||
{ | ||
telemetry.Context.Cloud.RoleName = AppDomain.CurrentDomain.FriendlyName; | ||
telemetry.Context.GlobalProperties["InstanceName"] = _instanceName; | ||
} | ||
} | ||
|
||
public class ApplicationInsightsTelemetryClient : ITelemetryClient | ||
{ | ||
private readonly TelemetryClient _ai; | ||
|
||
public ApplicationInsightsTelemetryClient(TelemetryConfiguration telemetryConfiguration) | ||
public ApplicationInsightsTelemetryClient(TelemetryConfiguration telemetryConfiguration, IConfiguration configuration, IOptions<InstanceOptions> instanceOptions) | ||
|
||
{ | ||
if (telemetryConfiguration == null) | ||
{ | ||
|
@@ -21,6 +38,7 @@ public ApplicationInsightsTelemetryClient(TelemetryConfiguration telemetryConfig | |
// The InstrumentationKey isn't set through the configuration object. Setting it explicitly works. | ||
TelemetryConfiguration = { ConnectionString = telemetryConfiguration.ConnectionString } | ||
}; | ||
_ai.TelemetryConfiguration.TelemetryInitializers.Add(new RunningExecutableTelemetryInitializer(instanceOptions.Value.InstanceName)); | ||
} | ||
|
||
public void Flush() => _ai.Flush(); | ||
|
@@ -33,5 +51,8 @@ public void TrackMetric(string name, double metric) => | |
_ai | ||
.GetMetric(name) | ||
.TrackValue(metric); | ||
|
||
|
||
public void TrackMetric(string name, double metric, Dictionary<string, string> properties) => | ||
_ai | ||
.TrackMetric(name, metric, properties); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be changed before merge I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It will.