Skip to content

Commit ac0b284

Browse files
MongoDB polling provider (#36)
1 parent 4dc5e65 commit ac0b284

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2635
-47
lines changed

.editorconfig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ csharp_style_expression_bodied_local_functions = true:suggestion
6363
# misc
6464
csharp_prefer_static_local_function = true:suggestion
6565
csharp_using_directive_placement = outside_namespace:suggestion
66-
csharp_prefer_braces = true:suggestion
66+
csharp_prefer_braces = true:none
6767
csharp_prefer_static_local_function = false:suggestion
6868
csharp_style_prefer_switch_expression = true:suggestion
6969
csharp_style_namespace_declarations = file_scoped

Directory.Packages.props

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,23 @@
33
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
44
</PropertyGroup>
55
<ItemGroup>
6-
<PackageVersion Include="Bogus" Version="35.6.1" />
6+
<PackageVersion Include="Bogus" Version="35.6.3" />
77
<PackageVersion Include="Cake.Frosting" Version="5.0.0" />
8-
<PackageVersion Include="coverlet.collector" Version="6.0.2" />
9-
<PackageVersion Include="Dapper" Version="2.1.35" />
8+
<PackageVersion Include="coverlet.collector" Version="6.0.4">
9+
<PrivateAssets>all</PrivateAssets>
10+
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
11+
</PackageVersion>
12+
<PackageVersion Include="Dapper" Version="2.1.66" />
1013
<PackageVersion Include="FluentAssertions" Version="6.12.2" />
11-
<PackageVersion Include="Microsoft.EntityFrameworkCore" Version="8.0.10" />
14+
<PackageVersion Include="Microsoft.EntityFrameworkCore" Version="8.0.15" />
1215
<PackageVersion Include="Microsoft.Extensions.Diagnostics" Version="8.0.1" />
1316
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.1" />
1417
<PackageVersion Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="8.0.0" />
1518
<PackageVersion Include="Microsoft.Extensions.TimeProvider.Testing" Version="8.10.0" />
16-
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
19+
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.13.0" />
1720
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
1821
<PackageVersion Include="MinVer" Version="6.0.0" />
19-
<PackageVersion Include="MongoDB.Driver" Version="3.0.0" />
22+
<PackageVersion Include="MongoDB.Driver" Version="3.3.0" />
2023
<PackageVersion Include="MySqlConnector" Version="2.4.0" />
2124
<PackageVersion Include="MySqlConnector.DependencyInjection" Version="2.4.0" />
2225
<PackageVersion Include="Nito.AsyncEx.Coordination" Version="5.1.2" />
@@ -25,11 +28,15 @@
2528
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.9.0" />
2629
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.9.0" />
2730
<PackageVersion Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.9.0" />
28-
<PackageVersion Include="Pomelo.EntityFrameworkCore.MySql" Version="8.0.2" />
31+
<PackageVersion Include="Pomelo.EntityFrameworkCore.MySql" Version="8.0.3" />
2932
<PackageVersion Include="RabbitMQ.Client" Version="6.8.1" />
30-
<PackageVersion Include="Testcontainers" Version="4.0.0" />
31-
<PackageVersion Include="Testcontainers.MySql" Version="4.0.0" />
32-
<PackageVersion Include="xunit" Version="2.9.2" />
33-
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
33+
<PackageVersion Include="Testcontainers" Version="4.4.0" />
34+
<PackageVersion Include="Testcontainers.MySql" Version="4.4.0" />
35+
<PackageVersion Include="Testcontainers.MongoDb" Version="4.4.0" />
36+
<PackageVersion Include="xunit" Version="2.9.3" />
37+
<PackageVersion Include="xunit.runner.visualstudio" Version="3.0.2">
38+
<PrivateAssets>all</PrivateAssets>
39+
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
40+
</PackageVersion>
3441
</ItemGroup>
3542
</Project>

OutboxKit.sln

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OutOfProcessProducer", "sam
3939
EndProject
4040
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ProducerShared", "samples\mysql\MySqlEndToEndPollingSample\ProducerShared\ProducerShared.csproj", "{13B80164-E50C-448E-9A38-C6D7A5313DC9}"
4141
EndProject
42+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MongoDb", "src\MongoDb\MongoDb.csproj", "{62F7D497-5170-4B0D-80B6-38C597FB979C}"
43+
EndProject
44+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MongoDb.Tests", "tests\MongoDb.Tests\MongoDb.Tests.csproj", "{2B7C9828-7A30-4B91-A0AC-5D82820B224D}"
45+
EndProject
46+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "mongodb", "mongodb", "{114820B8-6912-4F74-85F6-EB25CAE48BF9}"
47+
EndProject
48+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MongoDbPollingSample", "samples\mongodb\MongoDbPollingSample\MongoDbPollingSample.csproj", "{C097DEE3-EDF3-4CD4-A7F2-DD4796A41874}"
49+
EndProject
50+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MongoDbMultiDbPollingSample", "samples\mongodb\MongoDbMultiDbPollingSample\MongoDbMultiDbPollingSample.csproj", "{F5C31DA0-FE7B-4471-884E-E8BEE095ED6D}"
51+
EndProject
4252
Global
4353
GlobalSection(SolutionConfigurationPlatforms) = preSolution
4454
Debug|Any CPU = Debug|Any CPU
@@ -96,6 +106,22 @@ Global
96106
{13B80164-E50C-448E-9A38-C6D7A5313DC9}.Debug|Any CPU.Build.0 = Debug|Any CPU
97107
{13B80164-E50C-448E-9A38-C6D7A5313DC9}.Release|Any CPU.ActiveCfg = Release|Any CPU
98108
{13B80164-E50C-448E-9A38-C6D7A5313DC9}.Release|Any CPU.Build.0 = Release|Any CPU
109+
{62F7D497-5170-4B0D-80B6-38C597FB979C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
110+
{62F7D497-5170-4B0D-80B6-38C597FB979C}.Debug|Any CPU.Build.0 = Debug|Any CPU
111+
{62F7D497-5170-4B0D-80B6-38C597FB979C}.Release|Any CPU.ActiveCfg = Release|Any CPU
112+
{62F7D497-5170-4B0D-80B6-38C597FB979C}.Release|Any CPU.Build.0 = Release|Any CPU
113+
{2B7C9828-7A30-4B91-A0AC-5D82820B224D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
114+
{2B7C9828-7A30-4B91-A0AC-5D82820B224D}.Debug|Any CPU.Build.0 = Debug|Any CPU
115+
{2B7C9828-7A30-4B91-A0AC-5D82820B224D}.Release|Any CPU.ActiveCfg = Release|Any CPU
116+
{2B7C9828-7A30-4B91-A0AC-5D82820B224D}.Release|Any CPU.Build.0 = Release|Any CPU
117+
{C097DEE3-EDF3-4CD4-A7F2-DD4796A41874}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
118+
{C097DEE3-EDF3-4CD4-A7F2-DD4796A41874}.Debug|Any CPU.Build.0 = Debug|Any CPU
119+
{C097DEE3-EDF3-4CD4-A7F2-DD4796A41874}.Release|Any CPU.ActiveCfg = Release|Any CPU
120+
{C097DEE3-EDF3-4CD4-A7F2-DD4796A41874}.Release|Any CPU.Build.0 = Release|Any CPU
121+
{F5C31DA0-FE7B-4471-884E-E8BEE095ED6D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
122+
{F5C31DA0-FE7B-4471-884E-E8BEE095ED6D}.Debug|Any CPU.Build.0 = Debug|Any CPU
123+
{F5C31DA0-FE7B-4471-884E-E8BEE095ED6D}.Release|Any CPU.ActiveCfg = Release|Any CPU
124+
{F5C31DA0-FE7B-4471-884E-E8BEE095ED6D}.Release|Any CPU.Build.0 = Release|Any CPU
99125
EndGlobalSection
100126
GlobalSection(NestedProjects) = preSolution
101127
{9E17490D-8407-441F-AF33-620EAD39C9AD} = {E1748C96-94CD-479F-BF71-585EB56C9FBF}
@@ -112,5 +138,10 @@ Global
112138
{68B9A32E-A1BF-4E8B-9C7A-8EE7BF968FC0} = {846CDA36-9F33-4870-8C3B-0BC8DC3EC11B}
113139
{EFEE7AA2-19CC-4029-BCEE-2C79E148C8EC} = {846CDA36-9F33-4870-8C3B-0BC8DC3EC11B}
114140
{13B80164-E50C-448E-9A38-C6D7A5313DC9} = {846CDA36-9F33-4870-8C3B-0BC8DC3EC11B}
141+
{62F7D497-5170-4B0D-80B6-38C597FB979C} = {E1748C96-94CD-479F-BF71-585EB56C9FBF}
142+
{2B7C9828-7A30-4B91-A0AC-5D82820B224D} = {7DDB9CD3-11D6-4036-B0F5-19315D4FA1DD}
143+
{114820B8-6912-4F74-85F6-EB25CAE48BF9} = {2C240B41-C6C9-4921-9139-ECB2AA468316}
144+
{C097DEE3-EDF3-4CD4-A7F2-DD4796A41874} = {114820B8-6912-4F74-85F6-EB25CAE48BF9}
145+
{F5C31DA0-FE7B-4471-884E-E8BEE095ED6D} = {114820B8-6912-4F74-85F6-EB25CAE48BF9}
115146
EndGlobalSection
116147
EndGlobal

docs/.vitepress/config.mts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ export default defineConfig({
8585
{ text: "Polling", link: "/mysql/polling" },
8686
],
8787
},
88+
{
89+
text: "MongoDB",
90+
collapsed: true,
91+
items: [
92+
{ text: "MongoDB provider overview", link: "/mongodb/overview" },
93+
{ text: "Polling", link: "/mongodb/polling" },
94+
],
95+
},
8896
{
8997
text: "Observability",
9098
collapsed: true,

docs/mongodb/overview.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
---
2+
outline: deep
3+
---
4+
5+
# MongoDB provider overview
6+
7+
Installing:
8+
9+
```sh
10+
dotnet add package YakShaveFx.OutboxKit.MongoDb
11+
```
12+
13+
The MongoDB provider is, as the name implies, a provider to have OutboxKit work with MongoDB databases. It uses [MongoDB.Driver](https://www.mongodb.com/docs/drivers/csharp/current/) to interact with the database, with no other dependencies.
14+
15+
The provider currently implements only the polling approach.

docs/mongodb/polling.md

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
---
2+
outline: deep
3+
---
4+
5+
# Polling
6+
7+
To use OutboxKit with the MongoDB polling provider (and others as well), you'll need to choose between two paths: accept the library defaults, making your infra match them, or make use of the library's flexibility to adapt to your existing infrastructure.
8+
9+
::: tip
10+
Don't skip the [important notes](#important-notes) section at the end of the page.
11+
:::
12+
13+
## Using the defaults
14+
15+
In the box you'll find the `Message` record, which looks something like this:
16+
17+
```csharp
18+
public sealed record Message : IMessage
19+
{
20+
public ObjectId Id { get; init; }
21+
public required string Type { get; init; }
22+
public required byte[] Payload { get; init; }
23+
public required DateTime CreatedAt { get; init; }
24+
public byte[]? TraceContext { get; init; }
25+
}
26+
```
27+
28+
By default, these messages will be stored in a collection named `outbox_messages`. Additionally, with these defaults, the `Id` property will be used to order the messages.
29+
30+
Because MongoDB, unlike relational databases, doesn't support traditional locking mechanisms (e.g. `SELECT ... FOR UPDATE`), OutboxKit implements its own locking mechanism. This locking mechanism requires the use of an auxiliary collection, which by default is named `outbox_locks`.
31+
32+
Assuming you're happy with the out of the box defaults, setting up the provider with DI would look something like this:
33+
34+
```csharp
35+
services.AddOutboxKit(kit =>
36+
kit.WithMongoDbPolling(p =>
37+
p.WithDatabaseFactory((_, s) => s.GetRequiredService<IMongoDatabase>()));
38+
```
39+
40+
Note that this assumes a singleton `IMongoDatabase` is registered in the DI container, but you can do whatever you want in `WithDatabaseFactory`, as long as it returns an `IMongoDatabase` instance.
41+
42+
## Making it your own
43+
44+
Now, while the defaults are nice, one of the motivations for building OutboxKit in the first place, is to make it possible to adapt to specific applications and their infrastructure, which means there's a bunch of things that can be configured.
45+
46+
Let's start with a snippet that shows all the things you can configure:
47+
48+
```csharp
49+
services.AddOutboxKit(kit =>
50+
kit
51+
.WithMongoDbPolling(p =>
52+
p
53+
.WithDatabaseFactory((_, s) => s.GetRequiredService<IMongoDatabase>())
54+
.WithBatchSize(100)
55+
.WithPollingInterval(TimeSpan.FromMinutes(5))
56+
.WithCollection<OutboxMessage, ObjectId>(c => c
57+
.WithName("OutboxMessages")
58+
.WithIdSelector(m => m.Id)
59+
.WithSort(new SortDefinitionBuilder<OutboxMessage>().Ascending(m => m.Id))
60+
.WithProcessedAtSelector(m => m.ProcessedAt))
61+
.WithUpdateProcessed(u => u
62+
.WithCleanUpInterval(TimeSpan.FromHours(1))
63+
.WithMaxAge(TimeSpan.FromDays(1)))
64+
.WithDistributedLock(l => l
65+
.WithCollectionName("OutboxLocks")
66+
.WithId("OutboxLock")
67+
.WithOwner(Environment.MachineName)
68+
.WithChangeStreamsEnabled(true))));
69+
```
70+
71+
So, it's not massive, but there still are a few options available.
72+
73+
Note that not everything is always mandatory, but there are some things that are dependent on each other, so if you set one, you'll need to set some others.
74+
75+
`WithDatabaseFactory` is the way to get a `IMongoDatabase` instance for OutboxKit to interact with the database. It is the only configuration that is always required.
76+
77+
`WithBatchSize` allows you to set the maximum number of messages that will be made available to the `IBatchProducer` in one go.
78+
79+
`WithPollingInterval` allows you to customize how often polling should happen.
80+
81+
`WithCollection` is where you can configure the collection you want OutboxKit to use. If you're fine with OutboxKit's defaults, no need to call `WithCollection`, but if you want to customize something, then you need to configure everything, using all the exposed builder methods (minus `WithProcessedAtSelector`, but we'll look at that later).
82+
83+
`WithName` allows configuring the outbox message collection name.
84+
85+
`WithIdSelector` allows you to configure the id selector for the outbox message, which will be used when acknowledging the messages produced.
86+
87+
`WithSort` is the way to configure how to sort the messages as they're fetched from the outbox collection and made available to produce.
88+
89+
Let's talk about `WithUpdateProcessed`, then come back to `WithProcessedAtSelector`.
90+
91+
By default, OutboxKit will immediately delete the messages that have been produced. However, if you want to keep them around for a while, you can change the strategy to mark them as processed instead. To do this, you use `WithUpdateProcessed`.
92+
93+
When using `WithUpdateProcessed`, you can configure how often the messages should be cleaned up using `WithCleanUpInterval`, and how old the messages should be before they are cleaned up using `WithMaxAge`.
94+
95+
Note that, if you use `WithUpdateProcessed`, you must use `WithProcessedAtSelector`, in order for the library to do its magic. When marking the messages as processed, OutboxKit will set the column to a `DateTime` in UTC, obtained from a [`TimeProvider`](https://learn.microsoft.com/en-us/dotnet/api/system.timeprovider) it gets from DI.
96+
97+
As mentioned earlier, because MongoDB doesn't support traditional locking mechanisms, OutboxKit implements a distributed locking mechanism itself, making use of an auxiliary collection for it. You can tweak some aspects of this, by using `WithDistributedLock`. You can tweak the collection name with `WithCollectionName`, the id associated with the lock with `WithId` (should be the same in all instances of the application interacting with the outbox), the owner of the lock with `WithOwner` (should be different per instance of the application interacting with the outbox), and whether or not to use [MongoDB change streams](https://www.mongodb.com/docs/manual/changeStreams/) with `WithChangeStreamsEnabled`. The default values for these are `outbox_locks`, `outbox_lock`, [`Environment.MachineName`](https://learn.microsoft.com/en-us/dotnet/api/system.environment.machinename) and `false`, respectively.
98+
99+
## Multi-database
100+
101+
If your application uses multiple MongoDB databases, and you need an outbox for each of them (for example, you have a multi-tenant application, where each tenant uses a different database), everything we discussed so far still applies, you just need to tweak things very slightly.
102+
103+
`WithMongoDbPolling` has an overload that takes a `string` as the first argument, allowing you to identify the outbox.
104+
105+
Taking the defaults approach as an example, you could set up two outboxes like this:
106+
107+
```csharp
108+
services.AddOutboxKit(kit =>
109+
kit
110+
.WithMongoDbPolling(
111+
tenantOne,
112+
p => p.WithDatabaseFactory((k, s) => s.GetRequiredKeyedService<IMongoDatabase>(tenantOne)))
113+
.WithMongoDbPolling(
114+
tenantTwo,
115+
p => p.WithDatabaseFactory((k, s) => s.GetRequiredKeyedService<IMongoDatabase>(tenantTwo))));
116+
```
117+
118+
As you can infer, this means you can not only have multiple databases, but you can also configure them differently (not sure it's the most relevant thing ever, but hey, it works).
119+
120+
You'll notice we're using `GetRequiredKeyedService` instead of `GetRequiredService`. You don't have to do it like this, but it's a simple way to have different dependencies resolved for different contexts, like a multi-tenant application.
121+
122+
As discussed in [Core/Producing messages](/core/producing-messages), the `IBatchProducer` `ProduceAsync` method receives an `OutboxKey`, composed by a provider key (`"mongodb_polling"` in this case) and a client key, which is what you passed to `WithMongoDbPolling`. If you only have one outbox and don't set the key, you'll get the `string` `"default"`.
123+
124+
The `k` parameter shown in the example above in the `WithDatabaseFactory` method is also the aforementioned `OutboxKey`, and it's passed in case it's useful to resolve the `IMongoDatabase` instance. If you don't need it, you can just ignore it.
125+
126+
## Important notes
127+
128+
- Due to the need to implement a distributed locking mechanism, not just using something provided by the database itself, makes the likelihood of bugs in this provider higher than in others. Hopefully it's implemented well, but if you find any issues, please report them.
129+
- For simplicity and testability, the distributed locking mechanism uses the date/time of the machine running the application to determine lock expiration. For this reason, when running multiple instances of the application, it's important that the clocks are in sync, otherwise there might be unexpected behavior. Other approaches might be considered, but at this point, it seemed an acceptable approach.
130+
- This implementation of the MongoDB polling provider was designed exclusively to be used with a primary database instance. It wasn't thought or tested to be used with sharded clusters.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# `YakShaveFx.OutboxKit.MongoDb`
2+
3+
MongoDB specific library on top of which to implement the transactional outbox pattern.
4+
5+
Docs available at [https://outboxkit.yakshavefx.dev](https://outboxkit.yakshavefx.dev)
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<Project Sdk="Microsoft.NET.Sdk.Web">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net8.0</TargetFramework>
5+
<Nullable>enable</Nullable>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<RootNamespace>MongoDbMultiDbPollingSample</RootNamespace>
8+
<IsPackable>false</IsPackable>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<PackageReference Include="Bogus" />
13+
<PackageReference Include="MongoDB.Driver" />
14+
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" />
15+
<PackageReference Include="OpenTelemetry.Extensions.Hosting" />
16+
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" />
17+
</ItemGroup>
18+
19+
<ItemGroup>
20+
<ProjectReference Include="..\..\..\src\Core.OpenTelemetry\Core.OpenTelemetry.csproj" />
21+
<ProjectReference Include="..\..\..\src\Core\Core.csproj" />
22+
<ProjectReference Include="..\..\..\src\MongoDb\MongoDb.csproj" />
23+
</ItemGroup>
24+
25+
</Project>

0 commit comments

Comments
 (0)