diff --git a/azure-pipeline.yml b/azure-pipeline.yml
index 01152666..abe9d8f1 100644
--- a/azure-pipeline.yml
+++ b/azure-pipeline.yml
@@ -12,7 +12,6 @@ resources:
name: equinor/procosys-infra
endpoint: 'equinor'
ref: master
-
variables:
- template: src/variables/global-variables.yml@templates
- template: src/resources/bus-sender/bus-sender-variables.yml@templates
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Equinor.ProCoSys.BusSenderWorker.Core.csproj b/src/Equinor.ProCoSys.BusSender.Core/Equinor.ProCoSys.BusSenderWorker.Core.csproj
index f2186184..9323d3ce 100644
--- a/src/Equinor.ProCoSys.BusSender.Core/Equinor.ProCoSys.BusSenderWorker.Core.csproj
+++ b/src/Equinor.ProCoSys.BusSender.Core/Equinor.ProCoSys.BusSenderWorker.Core.csproj
@@ -7,9 +7,18 @@
+
+
+
-
+
+
+
+
+
+
+
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Extensions/PlantLeaseExtensions.cs b/src/Equinor.ProCoSys.BusSender.Core/Extensions/PlantLeaseExtensions.cs
new file mode 100644
index 00000000..1d536455
--- /dev/null
+++ b/src/Equinor.ProCoSys.BusSender.Core/Extensions/PlantLeaseExtensions.cs
@@ -0,0 +1,8 @@
+using System;
+using Equinor.ProCoSys.BusSenderWorker.Core.Models;
+
+namespace Equinor.ProCoSys.BusSenderWorker.Core.Extensions;
+public static class PlantLeaseExtensions
+{
+ public static bool IsExpired(this PlantLease plantLease) => plantLease.LeaseExpiry.HasValue && DateTime.UtcNow >= plantLease.LeaseExpiry.Value;
+}
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBlobLeaseService.cs b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBlobLeaseService.cs
new file mode 100644
index 00000000..152e1b46
--- /dev/null
+++ b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBlobLeaseService.cs
@@ -0,0 +1,14 @@
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Equinor.ProCoSys.BusSenderWorker.Core.Models;
+using Microsoft.Extensions.Caching.Memory;
+
+namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces;
+public interface IBlobLeaseService
+{
+ Task ReleasePlantLease(PlantLease? plantLease);
+ Task?> ClaimPlantLease();
+ CancellationToken CancellationToken { get; }
+ IMemoryCache GetCache();
+}
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBusEventRepository.cs b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBusEventRepository.cs
index c42ff13b..2bc305e9 100644
--- a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBusEventRepository.cs
+++ b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBusEventRepository.cs
@@ -7,7 +7,8 @@ namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces;
public interface IBusEventRepository
{
- Task> GetEarliestUnProcessedEventChunk();
- Task GetUnProcessedCount();
- Task GetOldestEvent();
+ Task> GetEarliestUnProcessedEventChunk(bool ignoreFilter = false);
+ Task GetUnProcessedCount(bool ignoreFilter = false);
+ Task GetOldestEvent(bool ignoreFilter = false);
+ void SetPlants(List plants);
}
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBusSenderService.cs b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBusSenderService.cs
index 614139d1..196fd2d2 100644
--- a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBusSenderService.cs
+++ b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IBusSenderService.cs
@@ -6,4 +6,6 @@ public interface IBusSenderService
{
Task CloseConnections();
Task HandleBusEvents();
+ Task HandleBusEventsSingleInstance();
+ bool HasPendingEventsForCurrentPlant();
}
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IEntryPointService.cs b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IEntryPointService.cs
index 033bda03..238cf1cd 100644
--- a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IEntryPointService.cs
+++ b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IEntryPointService.cs
@@ -4,6 +4,6 @@ namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces;
public interface IEntryPointService
{
- Task DoWorkerJob();
+ Task DoWorkerJob();
Task StopService();
}
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IPlantRepository.cs b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IPlantRepository.cs
new file mode 100644
index 00000000..bbe2f958
--- /dev/null
+++ b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IPlantRepository.cs
@@ -0,0 +1,9 @@
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces;
+
+public interface IPlantRepository
+{
+ List GetAllPlants();
+}
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IPlantService.cs b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IPlantService.cs
new file mode 100644
index 00000000..c2ecd053
--- /dev/null
+++ b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IPlantService.cs
@@ -0,0 +1,10 @@
+using System.Collections.Generic;
+using Equinor.ProCoSys.BusSenderWorker.Core.Models;
+using Microsoft.Extensions.Configuration;
+
+namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces;
+public interface IPlantService
+{
+ List GetPlantsForCurrent(List plantLeases);
+ List? GetAllPlants();
+}
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IQueueMonitorService.cs b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IQueueMonitorService.cs
index 74ccf7eb..7d05087c 100644
--- a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IQueueMonitorService.cs
+++ b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IQueueMonitorService.cs
@@ -4,5 +4,5 @@ namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces;
public interface IQueueMonitorService
{
- Task WriteQueueMetrics();
-}
\ No newline at end of file
+ Task WriteQueueMetrics(string? plant = null);
+}
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IUnitOfWork.cs b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IUnitOfWork.cs
index b44620f2..fcf9a67a 100644
--- a/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IUnitOfWork.cs
+++ b/src/Equinor.ProCoSys.BusSender.Core/Interfaces/IUnitOfWork.cs
@@ -1,8 +1,10 @@
-using System.Threading.Tasks;
+using System.Threading;
+using System.Threading.Tasks;
namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces;
public interface IUnitOfWork
{
Task SaveChangesAsync();
+ Task SaveChangesAsync(CancellationToken cancellationToken);
}
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Models/Plant.cs b/src/Equinor.ProCoSys.BusSender.Core/Models/Plant.cs
new file mode 100644
index 00000000..5a84062b
--- /dev/null
+++ b/src/Equinor.ProCoSys.BusSender.Core/Models/Plant.cs
@@ -0,0 +1,10 @@
+#pragma warning disable CS8618
+
+namespace Equinor.ProCoSys.BusSenderWorker.Core.Models;
+
+public class Plant
+{
+ public string ProjectSchema { get; set; }
+ public string IsVoided { get; set; }
+}
+
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Models/PlantLease.cs b/src/Equinor.ProCoSys.BusSender.Core/Models/PlantLease.cs
new file mode 100644
index 00000000..add12265
--- /dev/null
+++ b/src/Equinor.ProCoSys.BusSender.Core/Models/PlantLease.cs
@@ -0,0 +1,15 @@
+using System;
+using System.Text.Json.Serialization;
+
+namespace Equinor.ProCoSys.BusSenderWorker.Core.Models;
+public class PlantLease
+{
+ public DateTime? LeaseExpiry { get; set; }
+ public required string Plant { get; set; }
+ public DateTime? LastProcessed { get; set; }
+
+ [JsonIgnore]
+ // Flag to tell if this plant is the one being handled by the current instance.
+ // This is transient and not stored in the blob.
+ public bool IsCurrent { get; set; } = false;
+}
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Services/BlobLeaseService.cs b/src/Equinor.ProCoSys.BusSender.Core/Services/BlobLeaseService.cs
new file mode 100644
index 00000000..231f41eb
--- /dev/null
+++ b/src/Equinor.ProCoSys.BusSender.Core/Services/BlobLeaseService.cs
@@ -0,0 +1,313 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Text.Json;
+using System.Threading.Tasks;
+using Azure.Storage.Blobs.Models;
+using Azure.Storage.Blobs;
+using Azure;
+using Azure.Storage.Blobs.Specialized;
+using Equinor.ProCoSys.BusSenderWorker.Core.Interfaces;
+using Equinor.ProCoSys.BusSenderWorker.Core.Models;
+using Polly;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Configuration;
+using System.Threading;
+using Microsoft.Extensions.Caching.Memory;
+using System.Diagnostics;
+
+namespace Equinor.ProCoSys.BusSenderWorker.Core.Services;
+
+public class BlobLeaseService : IBlobLeaseService
+{
+ private readonly ILogger _logger;
+ private readonly IConfiguration _configuration;
+ private CancellationTokenSource? _cancellationTokenSource;
+ private readonly IMemoryCache _cache;
+ private readonly BlobClient _blobClient;
+ private static readonly TimeSpan blobLeaseDuration = TimeSpan.FromSeconds(15);
+
+ public CancellationToken CancellationToken => _cancellationTokenSource?.Token ?? CancellationToken.None;
+
+ public BlobLeaseService(ILogger logger, IConfiguration configuration, IMemoryCache cache)
+ {
+ _logger = logger;
+ _configuration = configuration;
+ _cache = cache;
+ var sw = new Stopwatch();
+ sw.Start();
+ _blobClient = GetBlobClient();
+ }
+ public virtual IMemoryCache GetCache() => _cache;
+
+ public virtual async Task?> ClaimPlantLease()
+ {
+ if (!int.TryParse(_configuration["PlantLeaseExpiryTime"], out var plantLeaseExpiryTime))
+ {
+ _logger.LogError("Invalid PlantLeaseExpiryTime configuration value.");
+ return null;
+ }
+ if (!GetCache().TryGetValue("CurrentPlantLeases", out List? plantLeases))
+ {
+ var sw = new Stopwatch();
+ sw.Start();
+
+ plantLeases = await GetPlantLeases();
+ if (plantLeases == null)
+ {
+ _logger.LogDebug("No blob lease available. Awaiting next loop.");
+ return null;
+ }
+
+ var plantLease = GetOldestUnprocessedPlantLeaseInfo(plantLeases);
+ if (plantLease == null)
+ {
+ // Nothing to do for now.
+ _logger.LogDebug("No available plants to lease. Awaiting next loop.");
+ return null;
+ }
+
+ var leaseId = Guid.NewGuid().ToString();
+
+ plantLeases.Where(x => x.Plant.Equals(plantLease.Plant)).ToList().ForEach(x =>
+ {
+ x.IsCurrent = true;
+ x.LeaseExpiry = DateTime.UtcNow.AddSeconds(plantLeaseExpiryTime);
+ });
+
+ var didUpdatePlantLeases = await UpdatePlantLeases(plantLeases, leaseId, 0);
+ if (!didUpdatePlantLeases)
+ {
+ return null;
+ }
+ GetCache().Set("CurrentPlantLeases", plantLeases, TimeSpan.FromSeconds(plantLeaseExpiryTime * 0.95));
+ _logger.LogDebug($"Claim used {sw.ElapsedMilliseconds}");
+ }
+ else
+ {
+ plantLeaseExpiryTime = GetSecondsUntilLeaseExpiry(plantLeases?.First(x => x.IsCurrent).LeaseExpiry);
+ _logger.LogDebug($"Plant leases retrieved from cache. {plantLeaseExpiryTime} until expired.");
+ }
+
+ if ((_cancellationTokenSource == null || _cancellationTokenSource.Token == CancellationToken.None) && (plantLeaseExpiryTime > 0))
+ {
+ // Multiplying by a factor lower than 1 to ensure that message processing for this instance is cancelled shortly before the lease actually expires.
+ _cancellationTokenSource = new CancellationTokenSource((int)(plantLeaseExpiryTime * 1000 * 0.95));
+ _logger.LogDebug($"A new cancellation token is initialized. {plantLeaseExpiryTime} until expired.");
+ }
+
+ return plantLeases;
+ }
+
+ public async Task ReleasePlantLease(PlantLease? plantLease)
+ {
+ if (plantLease == null)
+ {
+ _logger.LogWarning("Cannot release lease.");
+ return false;
+ }
+
+ if (!int.TryParse(_configuration["MaxBlobReleaseLeaseAttempts"], out var maxRetryAttempts))
+ {
+ _logger.LogError("Invalid MaxBlobReleaseLeaseAttempts configuration value.");
+ return false;
+ }
+
+ var leaseId = Guid.NewGuid().ToString();
+ var plantLeases = await GetPlantLeases();
+ if (plantLeases == null)
+ {
+ _logger.LogWarning("Cannot release plant lease.");
+ return false;
+ }
+
+ plantLeases.Where(x => x.Plant.Equals(plantLease?.Plant)).ToList().ForEach(x =>
+ {
+ x.LeaseExpiry = null;
+ x.LastProcessed = DateTime.UtcNow;
+ });
+ var didReleasePlantLeases = await UpdatePlantLeases(plantLeases, leaseId, maxRetryAttempts);
+ if (!didReleasePlantLeases)
+ {
+ _logger.LogWarning("Failed to update plant lease blob. Hence plant will not be handled until expired.");
+ }
+ GetCache().Remove("CurrentPlantLeases");
+ _cancellationTokenSource = null;
+ return didReleasePlantLeases;
+ }
+
+ public virtual BlobLeaseClient GetBlobLeaseClient(BlobClient blobClient, string leaseId) => blobClient.GetBlobLeaseClient(leaseId);
+
+ private TimeSpan GetJitter(TimeSpan delayBetweenAttempts) => TimeSpan.FromMilliseconds(new Random().Next(0, (int)(delayBetweenAttempts.TotalMilliseconds * 0.25))); // 25% jitter
+
+ protected async Task TryAcquireBlobLeaseAsync(BlobClient blobClient, string leaseId, TimeSpan leaseDuration, int maxRetryAttempts = 0, TimeSpan? delayBetweenAttempts = null)
+ {
+ if (!int.TryParse(_configuration["BlobReleaseLeaseDelay"], out var blobReleaseLeaseDelay))
+ {
+ _logger.LogError("Invalid BlobReleaseLeaseDelay configuration value.");
+ }
+ delayBetweenAttempts ??= TimeSpan.FromMilliseconds(blobReleaseLeaseDelay);
+ var leaseClient = GetBlobLeaseClient(blobClient, leaseId);
+
+ var retryPolicy = Policy
+ .Handle(ex => ex.ErrorCode == BlobErrorCode.LeaseAlreadyPresent)
+ .WaitAndRetryAsync(maxRetryAttempts, retryAttempt =>
+ {
+ _logger.LogDebug($"Attempt {retryAttempt} to acquire lease for blob: {blobClient.Name}");
+ var jitter = GetJitter(delayBetweenAttempts.Value);
+ return delayBetweenAttempts.Value + jitter;
+ });
+
+ try
+ {
+ await retryPolicy.ExecuteAsync(async () =>
+ {
+ // GetPropertiesAsync is called upfront of AcquireAsync in an attempt to reduce the number of calls to the latter method.
+ // In general, GetPropertiesAsync is expected to be quicker than AcquireAsync because it is a read-only operation,
+ // whereas AcquireAsync involves state changes and additional checks.
+ var properties = await blobClient.GetPropertiesAsync();
+ var leaseStateUnlocked = properties.Value.LeaseStatus == LeaseStatus.Unlocked;
+ var leaseStateAvailableOrExpired = properties.Value.LeaseState == LeaseState.Available || properties.Value.LeaseState == LeaseState.Expired;
+ if (leaseStateUnlocked && leaseStateAvailableOrExpired)
+ {
+ await leaseClient.AcquireAsync(leaseDuration, cancellationToken: CancellationToken.None);
+ }
+ else
+ {
+ throw new RequestFailedException(409, "Lease already present", BlobErrorCode.LeaseAlreadyPresent.ToString(),
+ new InvalidOperationException("The lease is already present and cannot be acquired."));
+ }
+ });
+ return true;
+ }
+ catch (RequestFailedException rfe)
+ {
+ if (rfe.ErrorCode == BlobErrorCode.LeaseAlreadyPresent)
+ {
+ // We are only interested in warning when this method has been called with acceptance for multiple retry attempts (e.g. when releasing plant lease)
+ // indicating higher importance of acquiring the lease.
+ // We do not want to spam the logs with warnings when successful blob lease is of lower importance. (e.g. for Claim)
+ if (maxRetryAttempts > 0)
+ {
+ _logger.LogWarning(rfe,
+ $"Failed to acquire lease for blob: {blobClient.Name} after {maxRetryAttempts} attempts. ErrorCode: {rfe.ErrorCode} Message: {rfe.Message}");
+ }
+ return false;
+ }
+
+ _logger.LogError(rfe, $"Failed to acquire lease for blob: {blobClient.Name} ErrorCode: {rfe.ErrorCode} Message: {rfe.Message}");
+ throw;
+ }
+ }
+
+ public virtual async Task GetBlobContentAsync(BlobClient blobClient)
+ {
+ try
+ {
+ var response = await blobClient.DownloadStreamingAsync(cancellationToken: CancellationToken.None);
+ using var streamReader = new StreamReader(response.Value.Content);
+ return await streamReader.ReadToEndAsync();
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Failed to download blob content: {BlobName}", blobClient.Name);
+ throw;
+ }
+ }
+ private async Task?> GetPlantLeases()
+ {
+ var plantLease = await GetPlantLeases(_blobClient);
+ if (plantLease!=null && plantLease.Any())
+ {
+ return plantLease;
+ }
+
+ _logger.LogWarning("Could not read blob containing plant lease.");
+ return null;
+ }
+
+ private async Task?> GetPlantLeases(BlobClient blobClient)
+ {
+ var options = new JsonSerializerOptions
+ {
+ PropertyNameCaseInsensitive = true
+ };
+
+ var json = await GetBlobContentAsync(blobClient);
+ return JsonSerializer.Deserialize>(json, options);
+ }
+
+ public virtual async Task UpdatePlantLeases(List plantLeases, string leaseId, int maxRetryAttempts = 0)
+ {
+ var options = new JsonSerializerOptions
+ {
+ WriteIndented = true,
+ PropertyNameCaseInsensitive = true
+ };
+
+ var json = JsonSerializer.Serialize(plantLeases, options);
+ using var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(json));
+ var uploadOptions = new BlobUploadOptions
+ {
+ Conditions = new BlobRequestConditions
+ {
+ LeaseId = leaseId
+ }
+ };
+ var newLeaseAcquired = await TryAcquireBlobLeaseAsync(_blobClient, leaseId, blobLeaseDuration, maxRetryAttempts);
+ if (!newLeaseAcquired)
+ {
+ return false;
+ }
+ await _blobClient.UploadAsync(memoryStream, uploadOptions, CancellationToken.None);
+ await ReleaseBlobLeaseAsync(_blobClient, leaseId);
+ return true;
+ }
+
+ public virtual BlobClient GetBlobClient()
+ {
+ var connectionString = _configuration["BlobStorage:ConnectionString"];
+ var containerName = _configuration["BlobStorage:BusSenderContainerName"];
+ var plantLeaseFileName = _configuration["PlantLeaseFileName"];
+ var blobContainerClient = new BlobContainerClient(connectionString, containerName);
+ return blobContainerClient.GetBlobClient(plantLeaseFileName);
+ }
+
+ private PlantLease? GetOldestUnprocessedPlantLeaseInfo(List plantLeases)
+ {
+ var unprocessedLeaseInfos = plantLeases
+ .Where(p => p.LeaseExpiry == null || p.LeaseExpiry < DateTime.UtcNow)
+ .OrderBy(p => p.LastProcessed)
+ .ToList();
+
+ return unprocessedLeaseInfos.FirstOrDefault(); // Return first if none is taken.
+ }
+ public virtual async Task ReleaseBlobLeaseAsync(BlobClient blobClient, string leaseId)
+ {
+ try
+ {
+ var leaseClient = GetBlobLeaseClient(blobClient, leaseId);
+ await leaseClient.ReleaseAsync(cancellationToken: CancellationToken.None);
+ }
+ catch (RequestFailedException ex)
+ {
+ _logger.LogError(ex, "Failed to release lease for blob: {BlobName}", blobClient.Name);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Unexpected error occurred while releasing lease for blob: {BlobName}", blobClient.Name);
+ }
+ }
+ public static int GetSecondsUntilLeaseExpiry(DateTime? leaseExpiry)
+ {
+ if (leaseExpiry == null)
+ {
+ return 0;
+ }
+
+ var timeSpan = leaseExpiry.Value - DateTime.UtcNow;
+ return (int)(timeSpan.TotalSeconds > 0 ? timeSpan.TotalSeconds : 0);
+ }
+}
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Services/BusSenderService.cs b/src/Equinor.ProCoSys.BusSender.Core/Services/BusSenderService.cs
index ffd47cd3..3e161ac7 100644
--- a/src/Equinor.ProCoSys.BusSender.Core/Services/BusSenderService.cs
+++ b/src/Equinor.ProCoSys.BusSender.Core/Services/BusSenderService.cs
@@ -27,6 +27,9 @@ public class BusSenderService : IBusSenderService
private readonly Stopwatch _sw;
private readonly ITelemetryClient _telemetryClient;
private readonly IUnitOfWork _unitOfWork;
+ private readonly IBlobLeaseService _blobLeaseService;
+ private readonly IPlantService _plantService;
+ private bool _hasPendingEventsForCurrentPlant = false;
public BusSenderService(IPcsBusSender pcsBusSender,
IBusEventRepository busEventRepository,
@@ -34,7 +37,9 @@ public BusSenderService(IPcsBusSender pcsBusSender,
ILogger logger,
ITelemetryClient telemetryClient,
IBusEventService service,
- IQueueMonitorService queueMonitor)
+ IQueueMonitorService queueMonitor,
+ IBlobLeaseService blobLeaseService,
+ IPlantService plantService)
{
_pcsBusSender = pcsBusSender;
_busEventRepository = busEventRepository;
@@ -44,6 +49,8 @@ public BusSenderService(IPcsBusSender pcsBusSender,
_service = service;
_queueMonitor = queueMonitor;
_sw = new Stopwatch();
+ _blobLeaseService = blobLeaseService;
+ _plantService = plantService;
}
public async Task CloseConnections()
@@ -52,7 +59,77 @@ public async Task CloseConnections()
await _pcsBusSender.CloseAllAsync();
}
+ public bool HasPendingEventsForCurrentPlant() => _hasPendingEventsForCurrentPlant;
+
public async Task HandleBusEvents()
+ {
+ PlantLease? plantLease = null;
+ try
+ {
+ _sw.Start();
+ _hasPendingEventsForCurrentPlant = false;
+ var plantLeases = await _blobLeaseService.ClaimPlantLease();
+ if (plantLeases == null)
+ {
+ _logger.LogDebug("No plant lease available exiting.");
+ return;
+ }
+
+ plantLease = plantLeases?.FirstOrDefault(x => x.IsCurrent);
+
+ if (plantLeases == null || plantLease?.Plant == null)
+ {
+ _logger.LogDebug("No plant leases available exiting.");
+ return;
+ }
+
+ if (ReleasePlantLeaseIfExpired(plantLease))
+ {
+ return;
+ }
+
+ var plants = _plantService.GetPlantsForCurrent(plantLeases);
+ var plant = plantLease.Plant;
+ _logger.LogInformation($"Handling messages for plant: {plant} with lease expiry time: {plantLease.LeaseExpiry}");
+
+ _busEventRepository.SetPlants(plants);
+
+ await _queueMonitor.WriteQueueMetrics(plant);
+
+ var events = await _busEventRepository.GetEarliestUnProcessedEventChunk();
+ if (events.Any())
+ {
+ _logger.LogInformation(
+ "[{Plant}] BusSenderService found {Count} messages to process after {Sw} ms", plant,
+ events.Count,
+ _sw.ElapsedMilliseconds);
+ _telemetryClient.TrackMetric("BusSender Chunk", events.Count);
+ await ProcessBusEvents(events, plant);
+ _logger.LogInformation("[{Plant}] BusSenderService ProcessBusEvents used {Sw} ms", plant,
+ _sw.ElapsedMilliseconds);
+ }
+
+ // Release plant lease if it has expired.
+ if (!ReleasePlantLeaseIfExpired(plantLease))
+ {
+ // ... or if there are no more unprocessed events for this plant.
+ await ReleasePlantLeaseIfProcessingCompleted(plantLease);
+ }
+
+ _sw.Reset();
+ }
+ catch (Exception exception)
+ {
+ _logger.LogError(exception, "BusSenderService execute send failed");
+ await _blobLeaseService.ReleasePlantLease(plantLease);
+ throw;
+ }
+
+ _logger.LogDebug("BusSenderService DoWorkerJob finished");
+ _telemetryClient.Flush();
+ }
+
+ public async Task HandleBusEventsSingleInstance()
{
try
{
@@ -81,6 +158,35 @@ public async Task HandleBusEvents()
_telemetryClient.Flush();
}
+
+ private bool ReleasePlantLeaseIfExpired(PlantLease? plantLease)
+ {
+ if (plantLease != null && (plantLease.IsExpired() || _blobLeaseService.CancellationToken.IsCancellationRequested))
+ {
+ _logger.LogDebug("Lease has expired for plant: {Plant}. Releasing it.", plantLease.Plant);
+ _blobLeaseService.ReleasePlantLease(plantLease);
+ return true;
+ }
+
+ return false;
+ }
+
+ private async Task ReleasePlantLeaseIfProcessingCompleted(PlantLease plantLease)
+ {
+ var remainingEvents = await _busEventRepository.GetEarliestUnProcessedEventChunk();
+ if (remainingEvents.Any())
+ {
+ _logger.LogDebug("[{Plant}] More unprocessed events are handled in the next loop by this instance. Keeping blob lease for this plant.", plantLease.Plant);
+ _hasPendingEventsForCurrentPlant = true;
+ }
+ else
+ {
+ _logger.LogDebug("[{Plant}] No more unprocessed events for this plant. Releasing blob lease.", plantLease.Plant);
+ await _blobLeaseService.ReleasePlantLease(plantLease);
+ _hasPendingEventsForCurrentPlant = false;
+ }
+ }
+
private async Task BatchAndSendPerTopic(List<(string Key, Queue messages)> eventGroups)
{
foreach (var (topic, messages) in eventGroups)
@@ -174,13 +280,13 @@ private static bool IsSimpleMessage(BusEvent busEvent)
|| Guid.TryParse(busEvent.Message, out _)
|| BusEventService.CanGetTwoIdsFromMessage(busEvent.Message.Split(","), out _, out _);
- private async Task ProcessBusEvents(List events)
+ private async Task ProcessBusEvents(List events, string plant = "ALL")
{
events = SetDuplicatesToSkipped(events);
var dsw = Stopwatch.StartNew();
var unProcessedEvents = events.Where(busEvent => busEvent.Status == Status.UnProcessed).ToList();
- _logger.LogInformation("Amount of messages to process: {Count} ", unProcessedEvents.Count);
+ _logger.LogInformation("[{Plant}] Amount of messages to process: {Count} ", plant, unProcessedEvents.Count);
var unProcessedTagEvents = unProcessedEvents.Where(e => e.Event == TagTopic.TopicName).ToList();
var isOverInOperatorLimit = unProcessedTagEvents.Count > 1000;
@@ -195,24 +301,36 @@ private async Task ProcessBusEvents(List events)
foreach (var simpleUnprocessedBusEvent in unProcessedEvents.Where(e =>
IsSimpleMessage(e) || e.Event == TagTopic.TopicName))
{
+ if (_blobLeaseService.CancellationToken.IsCancellationRequested)
+ {
+ _logger.LogDebug($"[{plant}] Cancellation requested, exiting the loop.");
+ break;
+ }
await UpdateEventBasedOnTopic(simpleUnprocessedBusEvent);
}
- _logger.LogInformation("Update loop finished at {Sw} ms", dsw.ElapsedMilliseconds);
- await _unitOfWork.SaveChangesAsync();
+ _logger.LogInformation("[{Plant}] Update loop finished at {Sw} ms", plant, dsw.ElapsedMilliseconds);
- /***
- * Group by topic and then create a queue of messages per topic
- */
- var topicQueues = events.Where(busEvent => busEvent.Status == Status.UnProcessed)
- .GroupBy(e => BusEventToTopicMapper.Map(e.Event)).Select(group =>
- {
- Queue messages = new();
- group.ToList().ForEach(be => messages.Enqueue(be));
- return (group.Key, messages);
- }).ToList();
+ if (!_blobLeaseService.CancellationToken.IsCancellationRequested)
+ {
+ await _unitOfWork.SaveChangesAsync();
+ /***
+ * Group by topic and then create a queue of messages per topic
+ */
+ var topicQueues = events.Where(busEvent => busEvent.Status == Status.UnProcessed)
+ .GroupBy(e => BusEventToTopicMapper.Map(e.Event)).Select(group =>
+ {
+ Queue messages = new();
+ group.ToList().ForEach(be => messages.Enqueue(be));
+ return (group.Key, messages);
+ }).ToList();
- await BatchAndSendPerTopic(topicQueues);
+ await BatchAndSendPerTopic(topicQueues);
+ }
+ else
+ {
+ _logger.LogWarning($"[{plant}] SaveChangesAsync skipped due to IsCancellationRequested.");
+ }
}
private static void SetAllButOneEventToSkipped(IEnumerable group)
@@ -237,6 +355,7 @@ private static List SetDuplicatesToSkipped(List events)
private void TrackMessage(BusEvent busEvent, string busMessageMessageId, string busMessageBody)
{
var busEventMessageToSend = busEvent.MessageToSend ?? busEvent.Message;
+
var message = JsonSerializer.Deserialize(_service.WashString(busEventMessageToSend)!,
DefaultSerializerHelper.SerializerOptions);
@@ -251,11 +370,7 @@ private void TrackMessage(BusEvent busEvent, string busMessageMessageId, string
{"Created", busEvent.Created.ToString(CultureInfo.InvariantCulture)},
{"ProjectName", message?.ProjectName ?? "NoProject"},
{"Plant", message?.Plant ?? "NoPlant"},
- {"MessageId", busMessageMessageId ?? "NoID" },
- //Remove these after debugging
- {"BusEventMessageToSend", string.IsNullOrEmpty(message?.ProCoSysGuid) ? "MessageToSend: ( " + busEvent.MessageToSend + " )" : "N/A" },
- {"BusEventMessage", string.IsNullOrEmpty(message?.ProCoSysGuid) ? busEvent.Message : "N/A" },
- {"MessageBody", string.IsNullOrEmpty(message?.ProCoSysGuid) ? busMessageBody : "N/A" }
+ {"MessageId", busMessageMessageId ?? "NoID" }
});
}
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Services/EntryPointService.cs b/src/Equinor.ProCoSys.BusSender.Core/Services/EntryPointService.cs
index d528ada3..65f65f1a 100644
--- a/src/Equinor.ProCoSys.BusSender.Core/Services/EntryPointService.cs
+++ b/src/Equinor.ProCoSys.BusSender.Core/Services/EntryPointService.cs
@@ -1,6 +1,7 @@
using System;
using System.Threading.Tasks;
using Equinor.ProCoSys.BusSenderWorker.Core.Interfaces;
+using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
namespace Equinor.ProCoSys.BusSenderWorker.Core.Services;
@@ -8,14 +9,28 @@ namespace Equinor.ProCoSys.BusSenderWorker.Core.Services;
public class EntryPointService : IEntryPointService
{
private readonly IServiceProvider _services;
+ private readonly IConfiguration _configuration;
- public EntryPointService(IServiceProvider services) => _services = services;
+ public EntryPointService(IServiceProvider services, IConfiguration configuration)
+ {
+ _services = services;
+ _configuration = configuration;
+ }
- public async Task DoWorkerJob()
+ public async Task DoWorkerJob()
{
using var scope = _services.CreateScope();
var service = scope.ServiceProvider.GetRequiredService();
- await service.HandleBusEvents();
+ var multiInstanceSupport = bool.Parse(_configuration["MultiInstanceSupport"]??"false");
+ if (multiInstanceSupport)
+ {
+ await service.HandleBusEvents();
+ }
+ else
+ {
+ await service.HandleBusEventsSingleInstance();
+ }
+ return service.HasPendingEventsForCurrentPlant();
}
public async Task StopService()
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Services/PlantService.cs b/src/Equinor.ProCoSys.BusSender.Core/Services/PlantService.cs
new file mode 100644
index 00000000..765d61bb
--- /dev/null
+++ b/src/Equinor.ProCoSys.BusSender.Core/Services/PlantService.cs
@@ -0,0 +1,131 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Equinor.ProCoSys.BusSenderWorker.Core.Interfaces;
+using Equinor.ProCoSys.BusSenderWorker.Core.Models;
+using Equinor.ProCoSys.PcsServiceBus;
+using Microsoft.Extensions.Caching.Memory;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.Logging;
+
+namespace Equinor.ProCoSys.BusSenderWorker.Core.Services;
+public class PlantService : IPlantService
+{
+ private readonly IPlantRepository _plantRepository;
+ private readonly ILogger _logger;
+ private readonly IConfiguration _config;
+ private readonly IMemoryCache _cache;
+
+ public PlantService(ILogger logger,
+ IPlantRepository plantRepository,
+ IConfiguration config,
+ IMemoryCache cache)
+ {
+ _logger = logger;
+ _plantRepository = plantRepository;
+ _config = config;
+ _cache = cache;
+ }
+
+ public virtual List? GetAllPlants()
+ {
+ if (!_cache.TryGetValue("AllPlants", out List? allPlants))
+ {
+ _logger.LogDebug("Retrieving plants from memory cache.");
+ allPlants = _plantRepository.GetAllPlants();
+ if (allPlants == null || !allPlants.Any())
+ {
+ var message = "No plants found in database.";
+ _logger.LogError(message);
+ throw new Exception(message);
+ }
+ _cache.Set("AllPlants", allPlants, new MemoryCacheEntryOptions
+ {
+ // Have to restart instance to reload plants configuration.
+ Priority = CacheItemPriority.NeverRemove
+ });
+ _logger.LogDebug("Plants read from database and added to memory cache.");
+ }
+ else
+ {
+ _logger.LogDebug("Plants retrieved from memory cache.");
+ }
+
+ return allPlants;
+ }
+
+ public List GetPlantsForCurrent(List plantLeases)
+ {
+ var plantsHandledByCurrentInstance = new List();
+ var allPlants = GetAllPlants();
+
+ if (allPlants == null || !allPlants.Any())
+ {
+ throw new Exception("No plants found in database.");
+ }
+
+ var plant = plantLeases.First(x => x.IsCurrent).Plant;
+ var plants = plantLeases.Where(x => x.IsCurrent)
+ .SelectMany(x => x.Plant.Split(',', StringSplitOptions.RemoveEmptyEntries))
+ .ToList();
+ if (!string.IsNullOrEmpty(plant))
+ {
+ var otherDefinedPlants = plantLeases.Where(x => !x.IsCurrent)
+ .SelectMany(x => x.Plant.Split(',', StringSplitOptions.RemoveEmptyEntries))
+ .ToList();
+ if (plants.Contains(PcsServiceBusInstanceConstants.RemainingPlants))
+ {
+ // We are also handling cases where RemainingPlants constant is used in combination with actual plants. E.g. PCS$TROLL_A, PCS$OSEBERG_C, REMAININGPLANTS.
+ var plantLeftovers = GetPlantLeftovers(otherDefinedPlants, allPlants);
+ plantsHandledByCurrentInstance = plants.Union(plantLeftovers).ToList();
+ RemovePlantReplacement(plantsHandledByCurrentInstance);
+ }
+ else
+ {
+ plantsHandledByCurrentInstance.AddRange(plants);
+ }
+
+ if (otherDefinedPlants.Intersect(plants).Any())
+ {
+ var message = "One or more plants are defined for multiple items. Check plantslease blob.";
+ _logger.LogError(message);
+ throw new Exception(message);
+ }
+ RemoveInvalidPlants(plantsHandledByCurrentInstance, allPlants);
+ }
+
+ if (!plantsHandledByCurrentInstance.Any())
+ {
+ var message = "No valid plants to handle for this configuration item. Check plantslease blob. E.g. has non valid plants been included?";
+ _logger.LogError(message);
+ throw new Exception(message);
+ }
+
+ return plantsHandledByCurrentInstance;
+ }
+
+ private void RemoveInvalidPlants(List plantsHandledByCurrentInstance, IEnumerable allPlants)
+ {
+ var invalidPlants = plantsHandledByCurrentInstance
+ .Except(PcsServiceBusInstanceConstants.AllPlantReplacementConstants)
+ .Except(PcsServiceBusInstanceConstants.AllPlantConstants)
+ .Except(allPlants)
+ .ToList();
+
+ foreach (var plant in invalidPlants)
+ {
+ plantsHandledByCurrentInstance.Remove(plant);
+ }
+
+ if (invalidPlants.Any())
+ {
+ _logger.LogWarning($"These plants are not valid: {string.Join(", ", invalidPlants)}");
+ }
+ }
+
+ private void RemovePlantReplacement(List plants) => plants.RemoveAll(x => PcsServiceBusInstanceConstants.AllPlantReplacementConstants.Contains(x));
+
+ private static IEnumerable GetPlantLeftovers(IEnumerable handledPlants, IEnumerable allNonVoidedPlants) =>
+ allNonVoidedPlants.Except(handledPlants);
+
+}
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Services/QueueMonitorService.cs b/src/Equinor.ProCoSys.BusSender.Core/Services/QueueMonitorService.cs
index 083b3fe1..4f4f38e2 100644
--- a/src/Equinor.ProCoSys.BusSender.Core/Services/QueueMonitorService.cs
+++ b/src/Equinor.ProCoSys.BusSender.Core/Services/QueueMonitorService.cs
@@ -1,8 +1,11 @@
using System;
+using System.Collections.Generic;
using System.Threading.Tasks;
using Equinor.ProCoSys.BusSenderWorker.Core.Interfaces;
using Equinor.ProCoSys.BusSenderWorker.Core.Telemetry;
+using Equinor.ProCoSys.PcsServiceBus;
using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.Options;
namespace Equinor.ProCoSys.BusSenderWorker.Core.Services;
@@ -24,39 +27,56 @@ public QueueMonitorService(ITelemetryClient telemetryClient, IBusEventRepository
_queueWriteIntervalMinutes = string.IsNullOrWhiteSpace(configuration["MonitorQueueIntervalMinutes"]) ? 15 : int.Parse(configuration["MonitorQueueIntervalMinutes"]!);
}
- public async Task WriteQueueMetrics()
+ public async Task WriteQueueMetrics(string? plant = null)
{
var lastQueueWrite = string.IsNullOrWhiteSpace(_configuration["LastQueueWrite"]) ? default : DateTime.Parse(_configuration["LastQueueWrite"]!);
if (IsTimeToWriteQueueMetric(lastQueueWrite))
{
- await WriteQueueLength();
- await WriteQueueAge();
+ await WriteQueueLength(plant);
+ await WriteQueueAge(plant);
_configuration["LastQueueWrite"] = _systemClock.UtcNow.ToString("O");
}
}
- private async Task WriteQueueAge()
+ private async Task WriteQueueAge(string? plant = null)
{
- var queueOldestEvent = await _busEventRepository.GetOldestEvent();
-
- if(NoEventFound(queueOldestEvent))
+ var queueOldestEvent = await _busEventRepository.GetOldestEvent(ignoreFilter: true);
+
+ if (NoEventFound(queueOldestEvent))
{
queueOldestEvent = _systemClock.UtcNow;
- }
+ }
var waitTime = _systemClock.UtcNow - queueOldestEvent;
_telemetryClient.TrackMetric("QueueAge", waitTime.TotalMinutes);
+ if (plant != null)
+ {
+ queueOldestEvent = await _busEventRepository.GetOldestEvent(ignoreFilter: false);
+
+ if (NoEventFound(queueOldestEvent))
+ {
+ queueOldestEvent = _systemClock.UtcNow;
+ }
+
+ waitTime = _systemClock.UtcNow - queueOldestEvent;
+ _telemetryClient.TrackMetric($"{plant ?? ""}QueueAge", waitTime.TotalMinutes);
+ }
}
- private async Task WriteQueueLength()
+ private async Task WriteQueueLength(string? plant = null)
{
- var queueLength = await _busEventRepository.GetUnProcessedCount();
+ var queueLength = await _busEventRepository.GetUnProcessedCount(ignoreFilter: true);
_telemetryClient.TrackMetric("QueueLength", queueLength);
+ if (plant != null)
+ {
+ queueLength = await _busEventRepository.GetUnProcessedCount(ignoreFilter: false);
+ _telemetryClient.TrackMetric($"{plant ?? ""}QueueLength", queueLength);
+ }
}
private bool IsTimeToWriteQueueMetric(DateTime lastQueueWrite) =>
_systemClock.UtcNow >= lastQueueWrite.ToUniversalTime().AddMinutes(_queueWriteIntervalMinutes);
- private bool NoEventFound(DateTime oldestEvent) =>
+ private static bool NoEventFound(DateTime oldestEvent) =>
oldestEvent.Equals(default);
}
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ApplicationInsightsTelemetryClient.cs b/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ApplicationInsightsTelemetryClient.cs
index 7bcd1e33..a24b9328 100644
--- a/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ApplicationInsightsTelemetryClient.cs
+++ b/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ApplicationInsightsTelemetryClient.cs
@@ -1,10 +1,23 @@
using System;
using System.Collections.Generic;
using Microsoft.ApplicationInsights;
+using Microsoft.ApplicationInsights.Channel;
using Microsoft.ApplicationInsights.Extensibility;
namespace Equinor.ProCoSys.BusSenderWorker.Core.Telemetry;
+public class RunningExecutableTelemetryInitializer : ITelemetryInitializer
+{
+ private readonly string _instanceName;
+
+ public RunningExecutableTelemetryInitializer(string instanceName) => _instanceName = instanceName;
+
+ public void Initialize(ITelemetry telemetry)
+ {
+ telemetry.Context.Cloud.RoleName = AppDomain.CurrentDomain.FriendlyName;
+ }
+}
+
public class ApplicationInsightsTelemetryClient : ITelemetryClient
{
private readonly TelemetryClient _ai;
@@ -33,5 +46,8 @@ public void TrackMetric(string name, double metric) =>
_ai
.GetMetric(name)
.TrackValue(metric);
-
+
+ public void TrackMetric(string name, double metric, Dictionary properties) =>
+ _ai
+ .TrackMetric(name, metric, properties);
}
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ConsoleTelemetryClient.cs b/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ConsoleTelemetryClient.cs
index 751afc2b..4c939dc5 100644
--- a/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ConsoleTelemetryClient.cs
+++ b/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ConsoleTelemetryClient.cs
@@ -39,4 +39,7 @@ public void TrackMetric(string name, double metric, string dimension1Name, strin
public void TrackMetric(string name, double metric, string dimension1Name, string dimension1Value) =>
Console.WriteLine(
$"Metric:\t{name}:{Environment.NewLine}\t{metric}{Environment.NewLine}\t{dimension1Name}: {dimension1Value}");
+ public void TrackMetric(string name, double metric, Dictionary properties) =>
+ Console.WriteLine(
+ $"Metric:\t{name}:{Environment.NewLine}\t{string.Join(Environment.NewLine, properties.Select(p => $"\t{p.Key}: {p.Value}"))}{Environment.NewLine}{metric}");
}
diff --git a/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ITelemetryClient.cs b/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ITelemetryClient.cs
index 45e08af5..e94148f9 100644
--- a/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ITelemetryClient.cs
+++ b/src/Equinor.ProCoSys.BusSender.Core/Telemetry/ITelemetryClient.cs
@@ -7,6 +7,7 @@ public interface ITelemetryClient
void Flush();
void TrackMetric(string name, double metric);
- void TrackEvent(string name, Dictionary properties);
-
+ void TrackEvent(string name, Dictionary properties);
+ void TrackMetric(string name, double metric, Dictionary properties);
+
}
diff --git a/src/Equinor.ProCoSys.BusSender.Infrastructure/Data/BusSenderServiceContext.cs b/src/Equinor.ProCoSys.BusSender.Infrastructure/Data/BusSenderServiceContext.cs
index 724935d4..7712b89e 100644
--- a/src/Equinor.ProCoSys.BusSender.Infrastructure/Data/BusSenderServiceContext.cs
+++ b/src/Equinor.ProCoSys.BusSender.Infrastructure/Data/BusSenderServiceContext.cs
@@ -21,9 +21,13 @@ public BusSenderServiceContext(DbContextOptions options
public Task SaveChangesAsync() => base.SaveChangesAsync();
+ public virtual DbSet Plants { get; set; } = null!;
public virtual DbSet BusEvents { get; set; } = null!;
- protected override void OnModelCreating(ModelBuilder modelBuilder) =>
+ protected override void OnModelCreating(ModelBuilder modelBuilder)
+ {
modelBuilder.ApplyConfiguration(new BusEventConfiguration());
+ modelBuilder.ApplyConfiguration(new PlantConfiguration());
+ }
}
diff --git a/src/Equinor.ProCoSys.BusSender.Infrastructure/EntityConfiguration/PlantConfiguration.cs b/src/Equinor.ProCoSys.BusSender.Infrastructure/EntityConfiguration/PlantConfiguration.cs
new file mode 100644
index 00000000..28272772
--- /dev/null
+++ b/src/Equinor.ProCoSys.BusSender.Infrastructure/EntityConfiguration/PlantConfiguration.cs
@@ -0,0 +1,16 @@
+using Equinor.ProCoSys.BusSenderWorker.Core.Models;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+namespace Equinor.ProCoSys.BusSenderWorker.Infrastructure.EntityConfiguration;
+
+public class PlantConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("PROJECTSCHEMA");
+ builder.HasKey(p => p.ProjectSchema);
+ builder.Property(p => p.ProjectSchema).HasColumnName("PROJECTSCHEMA");
+ builder.Property(p => p.IsVoided).HasColumnName("ISVOIDED");
+ }
+}
diff --git a/src/Equinor.ProCoSys.BusSender.Infrastructure/Equinor.ProCoSys.BusSenderWorker.Infrastructure.csproj b/src/Equinor.ProCoSys.BusSender.Infrastructure/Equinor.ProCoSys.BusSenderWorker.Infrastructure.csproj
index e711e90f..dd69ad5a 100644
--- a/src/Equinor.ProCoSys.BusSender.Infrastructure/Equinor.ProCoSys.BusSenderWorker.Infrastructure.csproj
+++ b/src/Equinor.ProCoSys.BusSender.Infrastructure/Equinor.ProCoSys.BusSenderWorker.Infrastructure.csproj
@@ -10,8 +10,10 @@
+
+