diff --git a/azure-pipeline.yml b/azure-pipeline.yml index 01152666..abe9d8f1 100644 --- a/azure-pipeline.yml +++ b/azure-pipeline.yml @@ -12,7 +12,6 @@ resources: name: equinor/procosys-infra endpoint: 'equinor' ref: master - variables: - template: src/variables/global-variables.yml@templates - template: src/resources/bus-sender/bus-sender-variables.yml@templates diff --git a/src/Equinor.ProCoSys.BusSender.Core/Equinor.ProCoSys.BusSenderWorker.Core.csproj b/src/Equinor.ProCoSys.BusSender.Core/Equinor.ProCoSys.BusSenderWorker.Core.csproj index f2186184..9323d3ce 100644 --- a/src/Equinor.ProCoSys.BusSender.Core/Equinor.ProCoSys.BusSenderWorker.Core.csproj +++ b/src/Equinor.ProCoSys.BusSender.Core/Equinor.ProCoSys.BusSenderWorker.Core.csproj @@ -7,9 +7,18 @@ + + + - + + + + + + + diff --git a/src/Equinor.ProCoSys.BusSender.Core/Extensions/PlantLeaseExtensions.cs b/src/Equinor.ProCoSys.BusSender.Core/Extensions/PlantLeaseExtensions.cs new file mode 100644 index 00000000..1d536455 --- /dev/null +++ b/src/Equinor.ProCoSys.BusSender.Core/Extensions/PlantLeaseExtensions.cs @@ -0,0 +1,8 @@ +using System; +using Equinor.ProCoSys.BusSenderWorker.Core.Models; + +namespace Equinor.ProCoSys.BusSenderWorker.Core.Extensions; +public static class PlantLeaseExtensions +{ + public static bool IsExpired(this PlantLease plantLease) => plantLease.LeaseExpiry.HasValue && DateTime.UtcNow >= plantLease.LeaseExpiry.Value; +} diff --git a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBlobLeaseService.cs b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBlobLeaseService.cs new file mode 100644 index 00000000..152e1b46 --- /dev/null +++ b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBlobLeaseService.cs @@ -0,0 +1,14 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Equinor.ProCoSys.BusSenderWorker.Core.Models; +using Microsoft.Extensions.Caching.Memory; + +namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces; +public interface IBlobLeaseService +{ + Task ReleasePlantLease(PlantLease? plantLease); + Task?> ClaimPlantLease(); + CancellationToken CancellationToken { get; } + IMemoryCache GetCache(); +} diff --git a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBusEventRepository.cs b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBusEventRepository.cs index c42ff13b..2bc305e9 100644 --- a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBusEventRepository.cs +++ b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBusEventRepository.cs @@ -7,7 +7,8 @@ namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces; public interface IBusEventRepository { - Task> GetEarliestUnProcessedEventChunk(); - Task GetUnProcessedCount(); - Task GetOldestEvent(); + Task> GetEarliestUnProcessedEventChunk(bool ignoreFilter = false); + Task GetUnProcessedCount(bool ignoreFilter = false); + Task GetOldestEvent(bool ignoreFilter = false); + void SetPlants(List plants); } diff --git a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBusSenderService.cs b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBusSenderService.cs index 614139d1..196fd2d2 100644 --- a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBusSenderService.cs +++ b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBusSenderService.cs @@ -6,4 +6,6 @@ public interface IBusSenderService { Task CloseConnections(); Task HandleBusEvents(); + Task HandleBusEventsSingleInstance(); + bool HasPendingEventsForCurrentPlant(); } diff --git a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IEntryPointService.cs b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IEntryPointService.cs index 033bda03..238cf1cd 100644 --- a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IEntryPointService.cs +++ b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IEntryPointService.cs @@ -4,6 +4,6 @@ namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces; public interface IEntryPointService { - Task DoWorkerJob(); + Task DoWorkerJob(); Task StopService(); } diff --git a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IPlantRepository.cs b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IPlantRepository.cs new file mode 100644 index 00000000..bbe2f958 --- /dev/null +++ b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IPlantRepository.cs @@ -0,0 +1,9 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces; + +public interface IPlantRepository +{ + List GetAllPlants(); +} diff --git a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IPlantService.cs b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IPlantService.cs new file mode 100644 index 00000000..c2ecd053 --- /dev/null +++ b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IPlantService.cs @@ -0,0 +1,10 @@ +using System.Collections.Generic; +using Equinor.ProCoSys.BusSenderWorker.Core.Models; +using Microsoft.Extensions.Configuration; + +namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces; +public interface IPlantService +{ + List GetPlantsForCurrent(List plantLeases); + List? GetAllPlants(); +} diff --git a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IQueueMonitorService.cs b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IQueueMonitorService.cs index 74ccf7eb..7d05087c 100644 --- a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IQueueMonitorService.cs +++ b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IQueueMonitorService.cs @@ -4,5 +4,5 @@ namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces; public interface IQueueMonitorService { - Task WriteQueueMetrics(); -} \ No newline at end of file + Task WriteQueueMetrics(string? plant = null); +} diff --git a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IUnitOfWork.cs b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IUnitOfWork.cs index b44620f2..fcf9a67a 100644 --- a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IUnitOfWork.cs +++ b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IUnitOfWork.cs @@ -1,8 +1,10 @@ -using System.Threading.Tasks; +using System.Threading; +using System.Threading.Tasks; namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces; public interface IUnitOfWork { Task SaveChangesAsync(); + Task SaveChangesAsync(CancellationToken cancellationToken); } diff --git a/src/Equinor.ProCoSys.BusSender.Core/Models/Plant.cs b/src/Equinor.ProCoSys.BusSender.Core/Models/Plant.cs new file mode 100644 index 00000000..5a84062b --- /dev/null +++ b/src/Equinor.ProCoSys.BusSender.Core/Models/Plant.cs @@ -0,0 +1,10 @@ +#pragma warning disable CS8618 + +namespace Equinor.ProCoSys.BusSenderWorker.Core.Models; + +public class Plant +{ + public string ProjectSchema { get; set; } + public string IsVoided { get; set; } +} + diff --git a/src/Equinor.ProCoSys.BusSender.Core/Models/PlantLease.cs b/src/Equinor.ProCoSys.BusSender.Core/Models/PlantLease.cs new file mode 100644 index 00000000..add12265 --- /dev/null +++ b/src/Equinor.ProCoSys.BusSender.Core/Models/PlantLease.cs @@ -0,0 +1,15 @@ +using System; +using System.Text.Json.Serialization; + +namespace Equinor.ProCoSys.BusSenderWorker.Core.Models; +public class PlantLease +{ + public DateTime? LeaseExpiry { get; set; } + public required string Plant { get; set; } + public DateTime? LastProcessed { get; set; } + + [JsonIgnore] + // Flag to tell if this plant is the one being handled by the current instance. + // This is transient and not stored in the blob. + public bool IsCurrent { get; set; } = false; +} diff --git a/src/Equinor.ProCoSys.BusSender.Core/Services/BlobLeaseService.cs b/src/Equinor.ProCoSys.BusSender.Core/Services/BlobLeaseService.cs new file mode 100644 index 00000000..231f41eb --- /dev/null +++ b/src/Equinor.ProCoSys.BusSender.Core/Services/BlobLeaseService.cs @@ -0,0 +1,313 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using Azure.Storage.Blobs.Models; +using Azure.Storage.Blobs; +using Azure; +using Azure.Storage.Blobs.Specialized; +using Equinor.ProCoSys.BusSenderWorker.Core.Interfaces; +using Equinor.ProCoSys.BusSenderWorker.Core.Models; +using Polly; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Configuration; +using System.Threading; +using Microsoft.Extensions.Caching.Memory; +using System.Diagnostics; + +namespace Equinor.ProCoSys.BusSenderWorker.Core.Services; + +public class BlobLeaseService : IBlobLeaseService +{ + private readonly ILogger _logger; + private readonly IConfiguration _configuration; + private CancellationTokenSource? _cancellationTokenSource; + private readonly IMemoryCache _cache; + private readonly BlobClient _blobClient; + private static readonly TimeSpan blobLeaseDuration = TimeSpan.FromSeconds(15); + + public CancellationToken CancellationToken => _cancellationTokenSource?.Token ?? CancellationToken.None; + + public BlobLeaseService(ILogger logger, IConfiguration configuration, IMemoryCache cache) + { + _logger = logger; + _configuration = configuration; + _cache = cache; + var sw = new Stopwatch(); + sw.Start(); + _blobClient = GetBlobClient(); + } + public virtual IMemoryCache GetCache() => _cache; + + public virtual async Task?> ClaimPlantLease() + { + if (!int.TryParse(_configuration["PlantLeaseExpiryTime"], out var plantLeaseExpiryTime)) + { + _logger.LogError("Invalid PlantLeaseExpiryTime configuration value."); + return null; + } + if (!GetCache().TryGetValue("CurrentPlantLeases", out List? plantLeases)) + { + var sw = new Stopwatch(); + sw.Start(); + + plantLeases = await GetPlantLeases(); + if (plantLeases == null) + { + _logger.LogDebug("No blob lease available. Awaiting next loop."); + return null; + } + + var plantLease = GetOldestUnprocessedPlantLeaseInfo(plantLeases); + if (plantLease == null) + { + // Nothing to do for now. + _logger.LogDebug("No available plants to lease. Awaiting next loop."); + return null; + } + + var leaseId = Guid.NewGuid().ToString(); + + plantLeases.Where(x => x.Plant.Equals(plantLease.Plant)).ToList().ForEach(x => + { + x.IsCurrent = true; + x.LeaseExpiry = DateTime.UtcNow.AddSeconds(plantLeaseExpiryTime); + }); + + var didUpdatePlantLeases = await UpdatePlantLeases(plantLeases, leaseId, 0); + if (!didUpdatePlantLeases) + { + return null; + } + GetCache().Set("CurrentPlantLeases", plantLeases, TimeSpan.FromSeconds(plantLeaseExpiryTime * 0.95)); + _logger.LogDebug($"Claim used {sw.ElapsedMilliseconds}"); + } + else + { + plantLeaseExpiryTime = GetSecondsUntilLeaseExpiry(plantLeases?.First(x => x.IsCurrent).LeaseExpiry); + _logger.LogDebug($"Plant leases retrieved from cache. {plantLeaseExpiryTime} until expired."); + } + + if ((_cancellationTokenSource == null || _cancellationTokenSource.Token == CancellationToken.None) && (plantLeaseExpiryTime > 0)) + { + // Multiplying by a factor lower than 1 to ensure that message processing for this instance is cancelled shortly before the lease actually expires. + _cancellationTokenSource = new CancellationTokenSource((int)(plantLeaseExpiryTime * 1000 * 0.95)); + _logger.LogDebug($"A new cancellation token is initialized. {plantLeaseExpiryTime} until expired."); + } + + return plantLeases; + } + + public async Task ReleasePlantLease(PlantLease? plantLease) + { + if (plantLease == null) + { + _logger.LogWarning("Cannot release lease."); + return false; + } + + if (!int.TryParse(_configuration["MaxBlobReleaseLeaseAttempts"], out var maxRetryAttempts)) + { + _logger.LogError("Invalid MaxBlobReleaseLeaseAttempts configuration value."); + return false; + } + + var leaseId = Guid.NewGuid().ToString(); + var plantLeases = await GetPlantLeases(); + if (plantLeases == null) + { + _logger.LogWarning("Cannot release plant lease."); + return false; + } + + plantLeases.Where(x => x.Plant.Equals(plantLease?.Plant)).ToList().ForEach(x => + { + x.LeaseExpiry = null; + x.LastProcessed = DateTime.UtcNow; + }); + var didReleasePlantLeases = await UpdatePlantLeases(plantLeases, leaseId, maxRetryAttempts); + if (!didReleasePlantLeases) + { + _logger.LogWarning("Failed to update plant lease blob. Hence plant will not be handled until expired."); + } + GetCache().Remove("CurrentPlantLeases"); + _cancellationTokenSource = null; + return didReleasePlantLeases; + } + + public virtual BlobLeaseClient GetBlobLeaseClient(BlobClient blobClient, string leaseId) => blobClient.GetBlobLeaseClient(leaseId); + + private TimeSpan GetJitter(TimeSpan delayBetweenAttempts) => TimeSpan.FromMilliseconds(new Random().Next(0, (int)(delayBetweenAttempts.TotalMilliseconds * 0.25))); // 25% jitter + + protected async Task TryAcquireBlobLeaseAsync(BlobClient blobClient, string leaseId, TimeSpan leaseDuration, int maxRetryAttempts = 0, TimeSpan? delayBetweenAttempts = null) + { + if (!int.TryParse(_configuration["BlobReleaseLeaseDelay"], out var blobReleaseLeaseDelay)) + { + _logger.LogError("Invalid BlobReleaseLeaseDelay configuration value."); + } + delayBetweenAttempts ??= TimeSpan.FromMilliseconds(blobReleaseLeaseDelay); + var leaseClient = GetBlobLeaseClient(blobClient, leaseId); + + var retryPolicy = Policy + .Handle(ex => ex.ErrorCode == BlobErrorCode.LeaseAlreadyPresent) + .WaitAndRetryAsync(maxRetryAttempts, retryAttempt => + { + _logger.LogDebug($"Attempt {retryAttempt} to acquire lease for blob: {blobClient.Name}"); + var jitter = GetJitter(delayBetweenAttempts.Value); + return delayBetweenAttempts.Value + jitter; + }); + + try + { + await retryPolicy.ExecuteAsync(async () => + { + // GetPropertiesAsync is called upfront of AcquireAsync in an attempt to reduce the number of calls to the latter method. + // In general, GetPropertiesAsync is expected to be quicker than AcquireAsync because it is a read-only operation, + // whereas AcquireAsync involves state changes and additional checks. + var properties = await blobClient.GetPropertiesAsync(); + var leaseStateUnlocked = properties.Value.LeaseStatus == LeaseStatus.Unlocked; + var leaseStateAvailableOrExpired = properties.Value.LeaseState == LeaseState.Available || properties.Value.LeaseState == LeaseState.Expired; + if (leaseStateUnlocked && leaseStateAvailableOrExpired) + { + await leaseClient.AcquireAsync(leaseDuration, cancellationToken: CancellationToken.None); + } + else + { + throw new RequestFailedException(409, "Lease already present", BlobErrorCode.LeaseAlreadyPresent.ToString(), + new InvalidOperationException("The lease is already present and cannot be acquired.")); + } + }); + return true; + } + catch (RequestFailedException rfe) + { + if (rfe.ErrorCode == BlobErrorCode.LeaseAlreadyPresent) + { + // We are only interested in warning when this method has been called with acceptance for multiple retry attempts (e.g. when releasing plant lease) + // indicating higher importance of acquiring the lease. + // We do not want to spam the logs with warnings when successful blob lease is of lower importance. (e.g. for Claim) + if (maxRetryAttempts > 0) + { + _logger.LogWarning(rfe, + $"Failed to acquire lease for blob: {blobClient.Name} after {maxRetryAttempts} attempts. ErrorCode: {rfe.ErrorCode} Message: {rfe.Message}"); + } + return false; + } + + _logger.LogError(rfe, $"Failed to acquire lease for blob: {blobClient.Name} ErrorCode: {rfe.ErrorCode} Message: {rfe.Message}"); + throw; + } + } + + public virtual async Task GetBlobContentAsync(BlobClient blobClient) + { + try + { + var response = await blobClient.DownloadStreamingAsync(cancellationToken: CancellationToken.None); + using var streamReader = new StreamReader(response.Value.Content); + return await streamReader.ReadToEndAsync(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to download blob content: {BlobName}", blobClient.Name); + throw; + } + } + private async Task?> GetPlantLeases() + { + var plantLease = await GetPlantLeases(_blobClient); + if (plantLease!=null && plantLease.Any()) + { + return plantLease; + } + + _logger.LogWarning("Could not read blob containing plant lease."); + return null; + } + + private async Task?> GetPlantLeases(BlobClient blobClient) + { + var options = new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true + }; + + var json = await GetBlobContentAsync(blobClient); + return JsonSerializer.Deserialize>(json, options); + } + + public virtual async Task UpdatePlantLeases(List plantLeases, string leaseId, int maxRetryAttempts = 0) + { + var options = new JsonSerializerOptions + { + WriteIndented = true, + PropertyNameCaseInsensitive = true + }; + + var json = JsonSerializer.Serialize(plantLeases, options); + using var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(json)); + var uploadOptions = new BlobUploadOptions + { + Conditions = new BlobRequestConditions + { + LeaseId = leaseId + } + }; + var newLeaseAcquired = await TryAcquireBlobLeaseAsync(_blobClient, leaseId, blobLeaseDuration, maxRetryAttempts); + if (!newLeaseAcquired) + { + return false; + } + await _blobClient.UploadAsync(memoryStream, uploadOptions, CancellationToken.None); + await ReleaseBlobLeaseAsync(_blobClient, leaseId); + return true; + } + + public virtual BlobClient GetBlobClient() + { + var connectionString = _configuration["BlobStorage:ConnectionString"]; + var containerName = _configuration["BlobStorage:BusSenderContainerName"]; + var plantLeaseFileName = _configuration["PlantLeaseFileName"]; + var blobContainerClient = new BlobContainerClient(connectionString, containerName); + return blobContainerClient.GetBlobClient(plantLeaseFileName); + } + + private PlantLease? GetOldestUnprocessedPlantLeaseInfo(List plantLeases) + { + var unprocessedLeaseInfos = plantLeases + .Where(p => p.LeaseExpiry == null || p.LeaseExpiry < DateTime.UtcNow) + .OrderBy(p => p.LastProcessed) + .ToList(); + + return unprocessedLeaseInfos.FirstOrDefault(); // Return first if none is taken. + } + public virtual async Task ReleaseBlobLeaseAsync(BlobClient blobClient, string leaseId) + { + try + { + var leaseClient = GetBlobLeaseClient(blobClient, leaseId); + await leaseClient.ReleaseAsync(cancellationToken: CancellationToken.None); + } + catch (RequestFailedException ex) + { + _logger.LogError(ex, "Failed to release lease for blob: {BlobName}", blobClient.Name); + } + catch (Exception ex) + { + _logger.LogError(ex, "Unexpected error occurred while releasing lease for blob: {BlobName}", blobClient.Name); + } + } + public static int GetSecondsUntilLeaseExpiry(DateTime? leaseExpiry) + { + if (leaseExpiry == null) + { + return 0; + } + + var timeSpan = leaseExpiry.Value - DateTime.UtcNow; + return (int)(timeSpan.TotalSeconds > 0 ? timeSpan.TotalSeconds : 0); + } +} diff --git a/src/Equinor.ProCoSys.BusSender.Core/Services/BusSenderService.cs b/src/Equinor.ProCoSys.BusSender.Core/Services/BusSenderService.cs index ffd47cd3..3e161ac7 100644 --- a/src/Equinor.ProCoSys.BusSender.Core/Services/BusSenderService.cs +++ b/src/Equinor.ProCoSys.BusSender.Core/Services/BusSenderService.cs @@ -27,6 +27,9 @@ public class BusSenderService : IBusSenderService private readonly Stopwatch _sw; private readonly ITelemetryClient _telemetryClient; private readonly IUnitOfWork _unitOfWork; + private readonly IBlobLeaseService _blobLeaseService; + private readonly IPlantService _plantService; + private bool _hasPendingEventsForCurrentPlant = false; public BusSenderService(IPcsBusSender pcsBusSender, IBusEventRepository busEventRepository, @@ -34,7 +37,9 @@ public BusSenderService(IPcsBusSender pcsBusSender, ILogger logger, ITelemetryClient telemetryClient, IBusEventService service, - IQueueMonitorService queueMonitor) + IQueueMonitorService queueMonitor, + IBlobLeaseService blobLeaseService, + IPlantService plantService) { _pcsBusSender = pcsBusSender; _busEventRepository = busEventRepository; @@ -44,6 +49,8 @@ public BusSenderService(IPcsBusSender pcsBusSender, _service = service; _queueMonitor = queueMonitor; _sw = new Stopwatch(); + _blobLeaseService = blobLeaseService; + _plantService = plantService; } public async Task CloseConnections() @@ -52,7 +59,77 @@ public async Task CloseConnections() await _pcsBusSender.CloseAllAsync(); } + public bool HasPendingEventsForCurrentPlant() => _hasPendingEventsForCurrentPlant; + public async Task HandleBusEvents() + { + PlantLease? plantLease = null; + try + { + _sw.Start(); + _hasPendingEventsForCurrentPlant = false; + var plantLeases = await _blobLeaseService.ClaimPlantLease(); + if (plantLeases == null) + { + _logger.LogDebug("No plant lease available exiting."); + return; + } + + plantLease = plantLeases?.FirstOrDefault(x => x.IsCurrent); + + if (plantLeases == null || plantLease?.Plant == null) + { + _logger.LogDebug("No plant leases available exiting."); + return; + } + + if (ReleasePlantLeaseIfExpired(plantLease)) + { + return; + } + + var plants = _plantService.GetPlantsForCurrent(plantLeases); + var plant = plantLease.Plant; + _logger.LogInformation($"Handling messages for plant: {plant} with lease expiry time: {plantLease.LeaseExpiry}"); + + _busEventRepository.SetPlants(plants); + + await _queueMonitor.WriteQueueMetrics(plant); + + var events = await _busEventRepository.GetEarliestUnProcessedEventChunk(); + if (events.Any()) + { + _logger.LogInformation( + "[{Plant}] BusSenderService found {Count} messages to process after {Sw} ms", plant, + events.Count, + _sw.ElapsedMilliseconds); + _telemetryClient.TrackMetric("BusSender Chunk", events.Count); + await ProcessBusEvents(events, plant); + _logger.LogInformation("[{Plant}] BusSenderService ProcessBusEvents used {Sw} ms", plant, + _sw.ElapsedMilliseconds); + } + + // Release plant lease if it has expired. + if (!ReleasePlantLeaseIfExpired(plantLease)) + { + // ... or if there are no more unprocessed events for this plant. + await ReleasePlantLeaseIfProcessingCompleted(plantLease); + } + + _sw.Reset(); + } + catch (Exception exception) + { + _logger.LogError(exception, "BusSenderService execute send failed"); + await _blobLeaseService.ReleasePlantLease(plantLease); + throw; + } + + _logger.LogDebug("BusSenderService DoWorkerJob finished"); + _telemetryClient.Flush(); + } + + public async Task HandleBusEventsSingleInstance() { try { @@ -81,6 +158,35 @@ public async Task HandleBusEvents() _telemetryClient.Flush(); } + + private bool ReleasePlantLeaseIfExpired(PlantLease? plantLease) + { + if (plantLease != null && (plantLease.IsExpired() || _blobLeaseService.CancellationToken.IsCancellationRequested)) + { + _logger.LogDebug("Lease has expired for plant: {Plant}. Releasing it.", plantLease.Plant); + _blobLeaseService.ReleasePlantLease(plantLease); + return true; + } + + return false; + } + + private async Task ReleasePlantLeaseIfProcessingCompleted(PlantLease plantLease) + { + var remainingEvents = await _busEventRepository.GetEarliestUnProcessedEventChunk(); + if (remainingEvents.Any()) + { + _logger.LogDebug("[{Plant}] More unprocessed events are handled in the next loop by this instance. Keeping blob lease for this plant.", plantLease.Plant); + _hasPendingEventsForCurrentPlant = true; + } + else + { + _logger.LogDebug("[{Plant}] No more unprocessed events for this plant. Releasing blob lease.", plantLease.Plant); + await _blobLeaseService.ReleasePlantLease(plantLease); + _hasPendingEventsForCurrentPlant = false; + } + } + private async Task BatchAndSendPerTopic(List<(string Key, Queue messages)> eventGroups) { foreach (var (topic, messages) in eventGroups) @@ -174,13 +280,13 @@ private static bool IsSimpleMessage(BusEvent busEvent) || Guid.TryParse(busEvent.Message, out _) || BusEventService.CanGetTwoIdsFromMessage(busEvent.Message.Split(","), out _, out _); - private async Task ProcessBusEvents(List events) + private async Task ProcessBusEvents(List events, string plant = "ALL") { events = SetDuplicatesToSkipped(events); var dsw = Stopwatch.StartNew(); var unProcessedEvents = events.Where(busEvent => busEvent.Status == Status.UnProcessed).ToList(); - _logger.LogInformation("Amount of messages to process: {Count} ", unProcessedEvents.Count); + _logger.LogInformation("[{Plant}] Amount of messages to process: {Count} ", plant, unProcessedEvents.Count); var unProcessedTagEvents = unProcessedEvents.Where(e => e.Event == TagTopic.TopicName).ToList(); var isOverInOperatorLimit = unProcessedTagEvents.Count > 1000; @@ -195,24 +301,36 @@ private async Task ProcessBusEvents(List events) foreach (var simpleUnprocessedBusEvent in unProcessedEvents.Where(e => IsSimpleMessage(e) || e.Event == TagTopic.TopicName)) { + if (_blobLeaseService.CancellationToken.IsCancellationRequested) + { + _logger.LogDebug($"[{plant}] Cancellation requested, exiting the loop."); + break; + } await UpdateEventBasedOnTopic(simpleUnprocessedBusEvent); } - _logger.LogInformation("Update loop finished at {Sw} ms", dsw.ElapsedMilliseconds); - await _unitOfWork.SaveChangesAsync(); + _logger.LogInformation("[{Plant}] Update loop finished at {Sw} ms", plant, dsw.ElapsedMilliseconds); - /*** - * Group by topic and then create a queue of messages per topic - */ - var topicQueues = events.Where(busEvent => busEvent.Status == Status.UnProcessed) - .GroupBy(e => BusEventToTopicMapper.Map(e.Event)).Select(group => - { - Queue messages = new(); - group.ToList().ForEach(be => messages.Enqueue(be)); - return (group.Key, messages); - }).ToList(); + if (!_blobLeaseService.CancellationToken.IsCancellationRequested) + { + await _unitOfWork.SaveChangesAsync(); + /*** + * Group by topic and then create a queue of messages per topic + */ + var topicQueues = events.Where(busEvent => busEvent.Status == Status.UnProcessed) + .GroupBy(e => BusEventToTopicMapper.Map(e.Event)).Select(group => + { + Queue messages = new(); + group.ToList().ForEach(be => messages.Enqueue(be)); + return (group.Key, messages); + }).ToList(); - await BatchAndSendPerTopic(topicQueues); + await BatchAndSendPerTopic(topicQueues); + } + else + { + _logger.LogWarning($"[{plant}] SaveChangesAsync skipped due to IsCancellationRequested."); + } } private static void SetAllButOneEventToSkipped(IEnumerable group) @@ -237,6 +355,7 @@ private static List SetDuplicatesToSkipped(List events) private void TrackMessage(BusEvent busEvent, string busMessageMessageId, string busMessageBody) { var busEventMessageToSend = busEvent.MessageToSend ?? busEvent.Message; + var message = JsonSerializer.Deserialize(_service.WashString(busEventMessageToSend)!, DefaultSerializerHelper.SerializerOptions); @@ -251,11 +370,7 @@ private void TrackMessage(BusEvent busEvent, string busMessageMessageId, string {"Created", busEvent.Created.ToString(CultureInfo.InvariantCulture)}, {"ProjectName", message?.ProjectName ?? "NoProject"}, {"Plant", message?.Plant ?? "NoPlant"}, - {"MessageId", busMessageMessageId ?? "NoID" }, - //Remove these after debugging - {"BusEventMessageToSend", string.IsNullOrEmpty(message?.ProCoSysGuid) ? "MessageToSend: ( " + busEvent.MessageToSend + " )" : "N/A" }, - {"BusEventMessage", string.IsNullOrEmpty(message?.ProCoSysGuid) ? busEvent.Message : "N/A" }, - {"MessageBody", string.IsNullOrEmpty(message?.ProCoSysGuid) ? busMessageBody : "N/A" } + {"MessageId", busMessageMessageId ?? "NoID" } }); } diff --git a/src/Equinor.ProCoSys.BusSender.Core/Services/EntryPointService.cs b/src/Equinor.ProCoSys.BusSender.Core/Services/EntryPointService.cs index d528ada3..65f65f1a 100644 --- a/src/Equinor.ProCoSys.BusSender.Core/Services/EntryPointService.cs +++ b/src/Equinor.ProCoSys.BusSender.Core/Services/EntryPointService.cs @@ -1,6 +1,7 @@ using System; using System.Threading.Tasks; using Equinor.ProCoSys.BusSenderWorker.Core.Interfaces; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; namespace Equinor.ProCoSys.BusSenderWorker.Core.Services; @@ -8,14 +9,28 @@ namespace Equinor.ProCoSys.BusSenderWorker.Core.Services; public class EntryPointService : IEntryPointService { private readonly IServiceProvider _services; + private readonly IConfiguration _configuration; - public EntryPointService(IServiceProvider services) => _services = services; + public EntryPointService(IServiceProvider services, IConfiguration configuration) + { + _services = services; + _configuration = configuration; + } - public async Task DoWorkerJob() + public async Task DoWorkerJob() { using var scope = _services.CreateScope(); var service = scope.ServiceProvider.GetRequiredService(); - await service.HandleBusEvents(); + var multiInstanceSupport = bool.Parse(_configuration["MultiInstanceSupport"]??"false"); + if (multiInstanceSupport) + { + await service.HandleBusEvents(); + } + else + { + await service.HandleBusEventsSingleInstance(); + } + return service.HasPendingEventsForCurrentPlant(); } public async Task StopService() diff --git a/src/Equinor.ProCoSys.BusSender.Core/Services/PlantService.cs b/src/Equinor.ProCoSys.BusSender.Core/Services/PlantService.cs new file mode 100644 index 00000000..765d61bb --- /dev/null +++ b/src/Equinor.ProCoSys.BusSender.Core/Services/PlantService.cs @@ -0,0 +1,131 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Equinor.ProCoSys.BusSenderWorker.Core.Interfaces; +using Equinor.ProCoSys.BusSenderWorker.Core.Models; +using Equinor.ProCoSys.PcsServiceBus; +using Microsoft.Extensions.Caching.Memory; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; + +namespace Equinor.ProCoSys.BusSenderWorker.Core.Services; +public class PlantService : IPlantService +{ + private readonly IPlantRepository _plantRepository; + private readonly ILogger _logger; + private readonly IConfiguration _config; + private readonly IMemoryCache _cache; + + public PlantService(ILogger logger, + IPlantRepository plantRepository, + IConfiguration config, + IMemoryCache cache) + { + _logger = logger; + _plantRepository = plantRepository; + _config = config; + _cache = cache; + } + + public virtual List? GetAllPlants() + { + if (!_cache.TryGetValue("AllPlants", out List? allPlants)) + { + _logger.LogDebug("Retrieving plants from memory cache."); + allPlants = _plantRepository.GetAllPlants(); + if (allPlants == null || !allPlants.Any()) + { + var message = "No plants found in database."; + _logger.LogError(message); + throw new Exception(message); + } + _cache.Set("AllPlants", allPlants, new MemoryCacheEntryOptions + { + // Have to restart instance to reload plants configuration. + Priority = CacheItemPriority.NeverRemove + }); + _logger.LogDebug("Plants read from database and added to memory cache."); + } + else + { + _logger.LogDebug("Plants retrieved from memory cache."); + } + + return allPlants; + } + + public List GetPlantsForCurrent(List plantLeases) + { + var plantsHandledByCurrentInstance = new List(); + var allPlants = GetAllPlants(); + + if (allPlants == null || !allPlants.Any()) + { + throw new Exception("No plants found in database."); + } + + var plant = plantLeases.First(x => x.IsCurrent).Plant; + var plants = plantLeases.Where(x => x.IsCurrent) + .SelectMany(x => x.Plant.Split(',', StringSplitOptions.RemoveEmptyEntries)) + .ToList(); + if (!string.IsNullOrEmpty(plant)) + { + var otherDefinedPlants = plantLeases.Where(x => !x.IsCurrent) + .SelectMany(x => x.Plant.Split(',', StringSplitOptions.RemoveEmptyEntries)) + .ToList(); + if (plants.Contains(PcsServiceBusInstanceConstants.RemainingPlants)) + { + // We are also handling cases where RemainingPlants constant is used in combination with actual plants. E.g. PCS$TROLL_A, PCS$OSEBERG_C, REMAININGPLANTS. + var plantLeftovers = GetPlantLeftovers(otherDefinedPlants, allPlants); + plantsHandledByCurrentInstance = plants.Union(plantLeftovers).ToList(); + RemovePlantReplacement(plantsHandledByCurrentInstance); + } + else + { + plantsHandledByCurrentInstance.AddRange(plants); + } + + if (otherDefinedPlants.Intersect(plants).Any()) + { + var message = "One or more plants are defined for multiple items. Check plantslease blob."; + _logger.LogError(message); + throw new Exception(message); + } + RemoveInvalidPlants(plantsHandledByCurrentInstance, allPlants); + } + + if (!plantsHandledByCurrentInstance.Any()) + { + var message = "No valid plants to handle for this configuration item. Check plantslease blob. E.g. has non valid plants been included?"; + _logger.LogError(message); + throw new Exception(message); + } + + return plantsHandledByCurrentInstance; + } + + private void RemoveInvalidPlants(List plantsHandledByCurrentInstance, IEnumerable allPlants) + { + var invalidPlants = plantsHandledByCurrentInstance + .Except(PcsServiceBusInstanceConstants.AllPlantReplacementConstants) + .Except(PcsServiceBusInstanceConstants.AllPlantConstants) + .Except(allPlants) + .ToList(); + + foreach (var plant in invalidPlants) + { + plantsHandledByCurrentInstance.Remove(plant); + } + + if (invalidPlants.Any()) + { + _logger.LogWarning($"These plants are not valid: {string.Join(", ", invalidPlants)}"); + } + } + + private void RemovePlantReplacement(List plants) => plants.RemoveAll(x => PcsServiceBusInstanceConstants.AllPlantReplacementConstants.Contains(x)); + + private static IEnumerable GetPlantLeftovers(IEnumerable handledPlants, IEnumerable allNonVoidedPlants) => + allNonVoidedPlants.Except(handledPlants); + +} diff --git a/src/Equinor.ProCoSys.BusSender.Core/Services/QueueMonitorService.cs b/src/Equinor.ProCoSys.BusSender.Core/Services/QueueMonitorService.cs index 083b3fe1..4f4f38e2 100644 --- a/src/Equinor.ProCoSys.BusSender.Core/Services/QueueMonitorService.cs +++ b/src/Equinor.ProCoSys.BusSender.Core/Services/QueueMonitorService.cs @@ -1,8 +1,11 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using Equinor.ProCoSys.BusSenderWorker.Core.Interfaces; using Equinor.ProCoSys.BusSenderWorker.Core.Telemetry; +using Equinor.ProCoSys.PcsServiceBus; using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Options; namespace Equinor.ProCoSys.BusSenderWorker.Core.Services; @@ -24,39 +27,56 @@ public QueueMonitorService(ITelemetryClient telemetryClient, IBusEventRepository _queueWriteIntervalMinutes = string.IsNullOrWhiteSpace(configuration["MonitorQueueIntervalMinutes"]) ? 15 : int.Parse(configuration["MonitorQueueIntervalMinutes"]!); } - public async Task WriteQueueMetrics() + public async Task WriteQueueMetrics(string? plant = null) { var lastQueueWrite = string.IsNullOrWhiteSpace(_configuration["LastQueueWrite"]) ? default : DateTime.Parse(_configuration["LastQueueWrite"]!); if (IsTimeToWriteQueueMetric(lastQueueWrite)) { - await WriteQueueLength(); - await WriteQueueAge(); + await WriteQueueLength(plant); + await WriteQueueAge(plant); _configuration["LastQueueWrite"] = _systemClock.UtcNow.ToString("O"); } } - private async Task WriteQueueAge() + private async Task WriteQueueAge(string? plant = null) { - var queueOldestEvent = await _busEventRepository.GetOldestEvent(); - - if(NoEventFound(queueOldestEvent)) + var queueOldestEvent = await _busEventRepository.GetOldestEvent(ignoreFilter: true); + + if (NoEventFound(queueOldestEvent)) { queueOldestEvent = _systemClock.UtcNow; - } + } var waitTime = _systemClock.UtcNow - queueOldestEvent; _telemetryClient.TrackMetric("QueueAge", waitTime.TotalMinutes); + if (plant != null) + { + queueOldestEvent = await _busEventRepository.GetOldestEvent(ignoreFilter: false); + + if (NoEventFound(queueOldestEvent)) + { + queueOldestEvent = _systemClock.UtcNow; + } + + waitTime = _systemClock.UtcNow - queueOldestEvent; + _telemetryClient.TrackMetric($"{plant ?? ""}QueueAge", waitTime.TotalMinutes); + } } - private async Task WriteQueueLength() + private async Task WriteQueueLength(string? plant = null) { - var queueLength = await _busEventRepository.GetUnProcessedCount(); + var queueLength = await _busEventRepository.GetUnProcessedCount(ignoreFilter: true); _telemetryClient.TrackMetric("QueueLength", queueLength); + if (plant != null) + { + queueLength = await _busEventRepository.GetUnProcessedCount(ignoreFilter: false); + _telemetryClient.TrackMetric($"{plant ?? ""}QueueLength", queueLength); + } } private bool IsTimeToWriteQueueMetric(DateTime lastQueueWrite) => _systemClock.UtcNow >= lastQueueWrite.ToUniversalTime().AddMinutes(_queueWriteIntervalMinutes); - private bool NoEventFound(DateTime oldestEvent) => + private static bool NoEventFound(DateTime oldestEvent) => oldestEvent.Equals(default); } diff --git a/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ApplicationInsightsTelemetryClient.cs b/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ApplicationInsightsTelemetryClient.cs index 7bcd1e33..a24b9328 100644 --- a/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ApplicationInsightsTelemetryClient.cs +++ b/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ApplicationInsightsTelemetryClient.cs @@ -1,10 +1,23 @@ using System; using System.Collections.Generic; using Microsoft.ApplicationInsights; +using Microsoft.ApplicationInsights.Channel; using Microsoft.ApplicationInsights.Extensibility; namespace Equinor.ProCoSys.BusSenderWorker.Core.Telemetry; +public class RunningExecutableTelemetryInitializer : ITelemetryInitializer +{ + private readonly string _instanceName; + + public RunningExecutableTelemetryInitializer(string instanceName) => _instanceName = instanceName; + + public void Initialize(ITelemetry telemetry) + { + telemetry.Context.Cloud.RoleName = AppDomain.CurrentDomain.FriendlyName; + } +} + public class ApplicationInsightsTelemetryClient : ITelemetryClient { private readonly TelemetryClient _ai; @@ -33,5 +46,8 @@ public void TrackMetric(string name, double metric) => _ai .GetMetric(name) .TrackValue(metric); - + + public void TrackMetric(string name, double metric, Dictionary properties) => + _ai + .TrackMetric(name, metric, properties); } diff --git a/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ConsoleTelemetryClient.cs b/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ConsoleTelemetryClient.cs index 751afc2b..4c939dc5 100644 --- a/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ConsoleTelemetryClient.cs +++ b/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ConsoleTelemetryClient.cs @@ -39,4 +39,7 @@ public void TrackMetric(string name, double metric, string dimension1Name, strin public void TrackMetric(string name, double metric, string dimension1Name, string dimension1Value) => Console.WriteLine( $"Metric:\t{name}:{Environment.NewLine}\t{metric}{Environment.NewLine}\t{dimension1Name}: {dimension1Value}"); + public void TrackMetric(string name, double metric, Dictionary properties) => + Console.WriteLine( + $"Metric:\t{name}:{Environment.NewLine}\t{string.Join(Environment.NewLine, properties.Select(p => $"\t{p.Key}: {p.Value}"))}{Environment.NewLine}{metric}"); } diff --git a/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ITelemetryClient.cs b/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ITelemetryClient.cs index 45e08af5..e94148f9 100644 --- a/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ITelemetryClient.cs +++ b/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ITelemetryClient.cs @@ -7,6 +7,7 @@ public interface ITelemetryClient void Flush(); void TrackMetric(string name, double metric); - void TrackEvent(string name, Dictionary properties); - + void TrackEvent(string name, Dictionary properties); + void TrackMetric(string name, double metric, Dictionary properties); + } diff --git a/src/Equinor.ProCoSys.BusSender.Infrastructure/Data/BusSenderServiceContext.cs b/src/Equinor.ProCoSys.BusSender.Infrastructure/Data/BusSenderServiceContext.cs index 724935d4..7712b89e 100644 --- a/src/Equinor.ProCoSys.BusSender.Infrastructure/Data/BusSenderServiceContext.cs +++ b/src/Equinor.ProCoSys.BusSender.Infrastructure/Data/BusSenderServiceContext.cs @@ -21,9 +21,13 @@ public BusSenderServiceContext(DbContextOptions options public Task SaveChangesAsync() => base.SaveChangesAsync(); + public virtual DbSet Plants { get; set; } = null!; public virtual DbSet BusEvents { get; set; } = null!; - protected override void OnModelCreating(ModelBuilder modelBuilder) => + protected override void OnModelCreating(ModelBuilder modelBuilder) + { modelBuilder.ApplyConfiguration(new BusEventConfiguration()); + modelBuilder.ApplyConfiguration(new PlantConfiguration()); + } } diff --git a/src/Equinor.ProCoSys.BusSender.Infrastructure/EntityConfiguration/PlantConfiguration.cs b/src/Equinor.ProCoSys.BusSender.Infrastructure/EntityConfiguration/PlantConfiguration.cs new file mode 100644 index 00000000..28272772 --- /dev/null +++ b/src/Equinor.ProCoSys.BusSender.Infrastructure/EntityConfiguration/PlantConfiguration.cs @@ -0,0 +1,16 @@ +using Equinor.ProCoSys.BusSenderWorker.Core.Models; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata.Builders; + +namespace Equinor.ProCoSys.BusSenderWorker.Infrastructure.EntityConfiguration; + +public class PlantConfiguration : IEntityTypeConfiguration +{ + public void Configure(EntityTypeBuilder builder) + { + builder.ToTable("PROJECTSCHEMA"); + builder.HasKey(p => p.ProjectSchema); + builder.Property(p => p.ProjectSchema).HasColumnName("PROJECTSCHEMA"); + builder.Property(p => p.IsVoided).HasColumnName("ISVOIDED"); + } +} diff --git a/src/Equinor.ProCoSys.BusSender.Infrastructure/Equinor.ProCoSys.BusSenderWorker.Infrastructure.csproj b/src/Equinor.ProCoSys.BusSender.Infrastructure/Equinor.ProCoSys.BusSenderWorker.Infrastructure.csproj index e711e90f..dd69ad5a 100644 --- a/src/Equinor.ProCoSys.BusSender.Infrastructure/Equinor.ProCoSys.BusSenderWorker.Infrastructure.csproj +++ b/src/Equinor.ProCoSys.BusSender.Infrastructure/Equinor.ProCoSys.BusSenderWorker.Infrastructure.csproj @@ -10,8 +10,10 @@ + +