Skip to content
Open
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
00d9c43
GH00839: Initial support for multiple instances of pcs service bus se…
trondlindanger Dec 4, 2024
3d2762b
GH00839: Referring to dedicated infra branch.
trondlindanger Dec 5, 2024
fa62865
GH00839: Remove project name from directory.
trondlindanger Dec 17, 2024
0ad81c7
GH00839: Reintroduced projectName.
trondlindanger Dec 17, 2024
14d1c5b
GH00839: Proper filtering/ loading of configuration from Azure. Inclu…
trondlindanger Dec 17, 2024
6bb56ff
GH00839: InstanceName mock for unit tests.
trondlindanger Dec 18, 2024
99ed30e
GH00839: Locking mechanism for shared wallet resource accessed during…
trondlindanger Dec 18, 2024
a0d5012
GH00839: Logging instance name and assigned plants.
trondlindanger Dec 18, 2024
440e121
GH00839: appsettings.json formatting.
trondlindanger Dec 19, 2024
0bb8143
GH00839: Using separate wallet files for the different service instan…
trondlindanger Dec 19, 2024
0ed9d2f
GH00839: Console logging for wallet registration.
trondlindanger Dec 19, 2024
633c18c
GH00839: Introduction of remainder logic. Handling remainder of plant…
trondlindanger Dec 27, 2024
c9f7942
GH00839: Fixed failing unit test.
trondlindanger Dec 30, 2024
b3b72dc
GH00839: Unit test.
trondlindanger Dec 30, 2024
a14514a
GH00839: Updated unit tests.
trondlindanger Jan 2, 2025
a121b8d
GH00839: Log configuration.
trondlindanger Jan 2, 2025
bd6a93b
GH00839: Additional logging for issue tracing.
trondlindanger Jan 2, 2025
301a4c1
GH00839: Issue tracing.
trondlindanger Jan 2, 2025
ec7afc7
GH00839: Issue tracing.
trondlindanger Jan 2, 2025
0cf7900
GH00839: Issue tracing.
trondlindanger Jan 2, 2025
b0ffd32
GH00839: Issue tracing.
trondlindanger Jan 2, 2025
2958521
GH00839: Adjustments before code review.
trondlindanger Jan 2, 2025
ca2c315
GH00839: PR feedback; changed Azure config attribute name. Added conf…
trondlindanger Jan 6, 2025
70c585e
GH00839: Updates related to feedback from code review. Improved metho…
trondlindanger Jan 14, 2025
59e53cf
GH00839: Updates related to feedback from code review. Improved metho…
trondlindanger Jan 14, 2025
5418c66
GH00839: Configuration of plants for instances is provided as a json …
trondlindanger Jan 21, 2025
a8e7f33
GH00839: Removed unused code.
trondlindanger Jan 22, 2025
2656ff4
GH00839: Using explicit json config for plants handled by distinct in…
trondlindanger Jan 27, 2025
1c8612b
GH00839: Ensure that GetAllPlants is called during startup.
trondlindanger Jan 28, 2025
a7ef7d8
GH00839: Validation added for message chunk size > 0.
trondlindanger Jan 28, 2025
293c268
GH00839: Lazy loading GetAllPlants. We only need this to be called once.
trondlindanger Jan 28, 2025
46dd952
GH00839: Adjustments during code review.
trondlindanger Jan 29, 2025
c2a6bf1
GH00839: Moved calculation of plants for given instance from startup …
trondlindanger Feb 5, 2025
1336ef2
GH00839: Refactored to support multi instance Azure web app instead o…
trondlindanger Feb 14, 2025
c1d056e
Merge branch 'master' into feature/GH00839-Use-two-web-job-instances-…
trondlindanger Feb 14, 2025
ee9fa10
GH00839: Additional logging for issue related to Oracle connections p…
trondlindanger Feb 17, 2025
e32647d
Merge branch 'master' into feature/GH00839-Use-two-web-job-instances-…
trondlindanger Feb 17, 2025
5d54c08
GH00839: Handled two warnings.
trondlindanger Feb 17, 2025
09bfe04
GH00839: Handled some warnings.
trondlindanger Feb 17, 2025
7c58c2b
GH00839: Changed log level related to GetAllPlants.
trondlindanger Feb 17, 2025
f9e8770
GH00839: Commented in temp removed topic.
trondlindanger Feb 18, 2025
9090baa
GH00839: Removed debug logging.
trondlindanger Feb 18, 2025
26296c5
GH00839: Caching of plant lease accross instance loops.
trondlindanger Feb 20, 2025
53ed59d
Moved BlobClient initialization to constructor of BlobLeaseService fo…
trondlindanger Feb 27, 2025
7715f73
Allowing shorter delay between attempts when releasing plant lease.
trondlindanger Feb 27, 2025
f1207b2
Releasing blob lease when there are not plants to lease.
trondlindanger Feb 28, 2025
e358186
Adjustment og logging level.
trondlindanger Mar 3, 2025
c3b8a7a
GH839: Using standard datetime serializer instead of custom.
trondlindanger Mar 11, 2025
cffaff0
GH839: Support for multiple plants per blob item. Feature flag as an …
trondlindanger Mar 12, 2025
2e974b3
GH839: Start immediately on next execution if we did not complete in …
trondlindanger Mar 12, 2025
68b24c7
GH00839: Including tests for when to stick to same plant.
trondlindanger Mar 14, 2025
15fe7c1
GH00839: GetPropertiesAsync is called upfront of AcquireAsync in an a…
trondlindanger Mar 14, 2025
a30b71a
GH00839: Not acquiring lease when reading blob.
trondlindanger Mar 17, 2025
82376c0
GH00839: Jitter included in Polly retry to avoid race conditions. Tim…
trondlindanger Mar 18, 2025
59d7b9a
GH00839: Using CancellationToken.None when releasing blob lease. We w…
trondlindanger Mar 18, 2025
ef6d0a7
GH00839: Include metrics by plant.
trondlindanger Apr 2, 2025
1fbf0b3
GH00839: Also release plant lease if cancellation has been requested.
trondlindanger Apr 3, 2025
4e9baf5
GH00839: Plant lease is provided in seconds. Not millieseconds.
trondlindanger Apr 8, 2025
8ec9f12
GH00839: Properly resetting CancellationToken.
trondlindanger Apr 9, 2025
9c2314f
Upgrade to EF 8.23.80.
trondlindanger Apr 9, 2025
7433c6a
Changed log level.
trondlindanger Apr 10, 2025
2547e65
Merge from master. Conflict handling.
trondlindanger May 7, 2025
1d1c787
GH00839: Fixed failing test.
trondlindanger May 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion azure-pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ resources:
name: equinor/procosys-infra
endpoint: 'equinor'
ref: master

variables:
- template: src/variables/global-variables.yml@templates
- template: src/resources/bus-sender/bus-sender-variables.yml@templates
Expand Down
39 changes: 39 additions & 0 deletions src/Equinor.ProCoSys.BusSender.Core/DateTimeConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace Equinor.ProCoSys.BusSenderWorker.Core;
public class DateTimeConverter : JsonConverter<DateTime?>
Copy link
Contributor

@TordJoranger TordJoranger Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ser ikke helt hvorfor vi trenger denne klassen.
antar det er for å støtte datofromat slik det er satt opp i plantslease.json?
ie: "LastProcessed": "2025-02-17T14:01:40"

Hadde kanskje forventet at vi i stedet oppdaterte plantslease.json til å bruke ett standardformat som gjorde at vi ikke trengte en custom converter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Døme på tidsstempling standardformat: 2025-03-04T11:46:13.0961214Z viss me vil bruka det og unngå custom converter.

{
private const string DateFormat = "yyyy-MM-ddTHH:mm:ss";

public override DateTime? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType == JsonTokenType.Null)
{
return null;
}

if (reader.TokenType == JsonTokenType.String)
{
if (DateTime.TryParseExact(reader.GetString(), DateFormat, null, System.Globalization.DateTimeStyles.None, out DateTime dateTime))
{
return dateTime;
}
}

throw new JsonException();
}

public override void Write(Utf8JsonWriter writer, DateTime? value, JsonSerializerOptions options)
{
if (value.HasValue)
{
writer.WriteStringValue(value.Value.ToString(DateFormat));
}
else
{
writer.WriteNullValue();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,17 @@
</PropertyGroup>

Copy link
Contributor

@TordJoranger TordJoranger Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all of these changes needed? Double check that we need these dependencies.

<ItemGroup>
<PackageReference Include="Azure.Data.AppConfiguration" Version="1.2.0" />
<PackageReference Include="Azure.Identity" Version="1.13.2" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.18.0" />
<PackageReference Include="Microsoft.ApplicationInsights" Version="2.21.0" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="9.0.1" />
<PackageReference Include="Oracle.EntityFrameworkCore" Version="7.21.12" />

<PackageReference Include="Polly" Version="8.5.2" />
<PackageReference Include="System.Text.Json" Version="8.0.5" />

</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Equinor.ProCoSys.BusSenderWorker.Core.Models;

namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces;
public interface IBlobLeaseService
{
void ReleasePlantLease(PlantLease? plantLease, int maxRetryAttempts = 0);
Task<List<PlantLease>?> ClaimPlantLease();
CancellationToken CancellationToken { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ public interface IBusEventRepository
Task<List<BusEvent>> GetEarliestUnProcessedEventChunk();
Task<long> GetUnProcessedCount();
Task<DateTime> GetOldestEvent();
void SetPlants(List<string> plants);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces;

public interface IPlantRepository
{
List<string> GetAllPlants();
}
10 changes: 10 additions & 0 deletions src/Equinor.ProCoSys.BusSender.Core/Interfaces/IPlantService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Collections.Generic;
using Equinor.ProCoSys.BusSenderWorker.Core.Models;
using Microsoft.Extensions.Configuration;

namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces;
public interface IPlantService
{
List<string> GetPlantsHandledByInstance(List<PlantLease> plantLeases);
List<string>? GetAllPlants();
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;

namespace Equinor.ProCoSys.BusSenderWorker.Core.Interfaces;

public interface IUnitOfWork
{
Task<int> SaveChangesAsync();
Task<int> SaveChangesAsync(CancellationToken cancellationToken);
}
10 changes: 10 additions & 0 deletions src/Equinor.ProCoSys.BusSender.Core/Models/Plant.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#pragma warning disable CS8618

namespace Equinor.ProCoSys.BusSenderWorker.Core.Models;

public class Plant
{
public string ProjectSchema { get; set; }
public string IsVoided { get; set; }
}

17 changes: 17 additions & 0 deletions src/Equinor.ProCoSys.BusSender.Core/Models/PlantLease.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;
using System.Text.Json.Serialization;

namespace Equinor.ProCoSys.BusSenderWorker.Core.Models;
public class PlantLease
{
[JsonConverter(typeof(DateTimeConverter))]
public DateTime? LeaseExpiry { get; set; }
public required string Plant { get; set; }
[JsonConverter(typeof(DateTimeConverter))]
public DateTime? LastProcessed { get; set; }

[JsonIgnore]
// Flag to tell if this plant is the one being handled by the current instance.
// This is transient and not stored in the blob.
public bool IsCurrent { get; set; } = false;
}
240 changes: 240 additions & 0 deletions src/Equinor.ProCoSys.BusSender.Core/Services/BlobLeaseService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs;
using Azure;
using Azure.Storage.Blobs.Specialized;
using Equinor.ProCoSys.BusSenderWorker.Core.Interfaces;
using Equinor.ProCoSys.BusSenderWorker.Core.Models;
using Polly;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Configuration;
using System.Threading;

namespace Equinor.ProCoSys.BusSenderWorker.Core.Services;

public class BlobLeaseService : IBlobLeaseService
{
private readonly ILogger<BlobLeaseService> _logger;
private readonly IConfiguration _configuration;
private CancellationTokenSource? _cancellationTokenSource;

public CancellationToken CancellationToken => _cancellationTokenSource?.Token ?? CancellationToken.None;

public BlobLeaseService(ILogger<BlobLeaseService> logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
}

public async Task<List<PlantLease>?> ClaimPlantLease()
{
if (!int.TryParse(_configuration["BlobLeaseExpiryTime"], out var blobLeaseExpiryTime))
{
_logger.LogError("Invalid BlobLeaseExpiryTime configuration value.");
return null;
}

if (_cancellationTokenSource == null || _cancellationTokenSource.Token == CancellationToken.None)
{
// Multiplying by a factor lower than 1 to ensure that message processing for this instance is cancelled shortly before the lease expires.
_cancellationTokenSource = new CancellationTokenSource((int)(blobLeaseExpiryTime * 1000 * 0.95));
}

var leaseId = Guid.NewGuid().ToString();
var plantLeases = await GetPlantLeases(leaseId);
if (plantLeases == null)
{
// Nothing to do for now.
return null;
}

var plantLease = GetOldestUnprocessedPlantLeaseInfo(plantLeases);
if (plantLease == null)
{
// Nothing to do for now.
return null;
}

plantLeases.Where(x => x.Plant.Equals(plantLease.Plant)).ToList().ForEach(x =>
{
x.IsCurrent = true;
x.LeaseExpiry = DateTime.UtcNow.AddSeconds(blobLeaseExpiryTime);
});

UpdatePlantLeases(plantLeases, leaseId);
return plantLeases;
}

public async void ReleasePlantLease(PlantLease? plantLease, int maxRetryAttempts = 0)
{
if (plantLease == null)
{
_logger.LogWarning("Plant lease is null. Cannot release lease.");
return;
}

var leaseId = Guid.NewGuid().ToString();
var plantLeases = await GetPlantLeases(leaseId,maxRetryAttempts);
if (plantLeases == null)
{
_logger.LogWarning("Plant leases is null. Cannot release lease.");
return;
}

plantLeases.Where(x => x.Plant.Equals(plantLease?.Plant)).ToList().ForEach(x =>
{
x.LeaseExpiry = null;
x.LastProcessed = DateTime.UtcNow;
});
UpdatePlantLeases(plantLeases, leaseId);
}

public virtual BlobLeaseClient GetBlobLeaseClient(BlobClient blobClient, string leaseId) => blobClient.GetBlobLeaseClient(leaseId);

protected async Task<bool> TryAcquireBlobLeaseAsync(BlobClient blobClient, string leaseId, TimeSpan leaseDuration, int maxRetryAttempts = 0, TimeSpan? delayBetweenAttempts = null)
{
if (!int.TryParse(_configuration["BlobReleaseLeaseDelay"], out var blobReleaseLeaseDelay))
{
_logger.LogError("Invalid BlobReleaseLeaseDelay configuration value.");
}
delayBetweenAttempts ??= TimeSpan.FromSeconds(blobReleaseLeaseDelay);

var retryPolicy = Policy
.Handle<RequestFailedException>(ex => ex.ErrorCode == BlobErrorCode.LeaseAlreadyPresent)
.WaitAndRetryAsync(maxRetryAttempts, retryAttempt =>
{
_logger.LogInformation($"Attempt {retryAttempt} to acquire lease for blob: {blobClient.Name}");
return delayBetweenAttempts.Value;
});

try
{
await retryPolicy.ExecuteAsync(async () =>
{
var leaseClient = GetBlobLeaseClient(blobClient, leaseId);
await leaseClient.AcquireAsync(leaseDuration, cancellationToken: CancellationToken.None);
});
return true;
}
catch (RequestFailedException rfe)
{
if (rfe.ErrorCode == BlobErrorCode.LeaseAlreadyPresent)
{
return false;
}

_logger.LogError(rfe, $"Failed to acquire lease for blob: {blobClient.Name} ErrorCode: {rfe.ErrorCode} Message: {rfe.Message}");
throw;
}
}

public virtual async Task<string> GetBlobContentAsync(BlobClient blobClient)
{
try
{
var response = await blobClient.DownloadStreamingAsync(cancellationToken: CancellationToken.None);
using var streamReader = new StreamReader(response.Value.Content);
return await streamReader.ReadToEndAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to download blob content: {BlobName}", blobClient.Name);
throw;
}
}

private async Task<List<PlantLease>?> GetPlantLeases(string leaseId, int maxRetryAttempts = 0)
{
var blobClient = GetBlobClient();
var newLeaseAcquired = await TryAcquireBlobLeaseAsync(blobClient, leaseId, TimeSpan.FromSeconds(15), maxRetryAttempts);
if (newLeaseAcquired)
{
var plantLease = await GetPlantLeases(blobClient);
if (plantLease == null || !plantLease.Any())
{
_logger.LogWarning("Could not read blob containing plant lease.");
return null;
}

return plantLease;
}

return null;
}

private async Task<List<PlantLease>?> GetPlantLeases(BlobClient blobClient)
{
var options = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
Converters = { new DateTimeConverter() }
};

var json = await GetBlobContentAsync(blobClient);
return JsonSerializer.Deserialize<List<PlantLease>>(json, options);
}

public virtual async void UpdatePlantLeases(List<PlantLease> plantLeases, string leaseId)
{
var blobClient = GetBlobClient();
var options = new JsonSerializerOptions
{
WriteIndented = true,
PropertyNameCaseInsensitive = true,
Converters = { new DateTimeConverter() }
};

var json = JsonSerializer.Serialize(plantLeases, options);
using var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(json));
var uploadOptions = new BlobUploadOptions
{
Conditions = new BlobRequestConditions
{
LeaseId = leaseId
}
};
await blobClient.UploadAsync(memoryStream, uploadOptions, CancellationToken.None);
await ReleaseBlobLeaseAsync(blobClient, leaseId);
}

public virtual BlobClient GetBlobClient()
{
var connectionString = _configuration["BlobStorage:ConnectionString"];
var containerName = _configuration["BlobStorage:BusSenderContainerName"];
var blobLeaseFileName = _configuration["BlobLeaseFileName"];
var blobContainerClient = new BlobContainerClient(connectionString, containerName);
return blobContainerClient.GetBlobClient(blobLeaseFileName);
}

private PlantLease? GetOldestUnprocessedPlantLeaseInfo(List<PlantLease> plantLeases)
{
var unprocessedLeaseInfos = plantLeases
.Where(p => p.LeaseExpiry == null || p.LeaseExpiry < DateTime.UtcNow)
.OrderBy(p => p.LastProcessed)
.ToList();

return unprocessedLeaseInfos.FirstOrDefault(); // Return first if none is taken.
}
public virtual async Task ReleaseBlobLeaseAsync(BlobClient blobClient, string leaseId)
{
try
{
var leaseClient = GetBlobLeaseClient(blobClient, leaseId);
await leaseClient.ReleaseAsync(cancellationToken: CancellationToken);
}
catch (RequestFailedException ex)
{
_logger.LogError(ex, "Failed to release lease for blob: {BlobName}", blobClient.Name);
}
catch (Exception ex)
{
_logger.LogError(ex, "Unexpected error occurred while releasing lease for blob: {BlobName}", blobClient.Name);
}
}
}
Loading