Skip to content

Commit de553d7

Browse files
Postgres polling provider (#42)
1 parent ce8ebf8 commit de553d7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+2913
-31
lines changed

.github/workflows/pr.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ jobs:
2121
dotnet-version: |
2222
8.0.x
2323
9.0.x
24+
10.0.x
2425
2526
- name: Build & Test
2627
run: ./build/build.sh --target=BuildAndTest

.github/workflows/push-nightly.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ jobs:
2121
dotnet-version: |
2222
8.0.x
2323
9.0.x
24+
10.0.x
2425
2526
- name: Build & Test
2627
run: ./build/build.sh --target=BuildAndTest

.github/workflows/push-release.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ jobs:
2121
dotnet-version: |
2222
8.0.x
2323
9.0.x
24+
10.0.x
2425
2526
- name: Build & Test
2627
run: ./build/build.sh --target=BuildAndTest

Directory.Packages.props

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
<PackageVersion Include="MySqlConnector" Version="2.4.0" />
2424
<PackageVersion Include="MySqlConnector.DependencyInjection" Version="2.4.0" />
2525
<PackageVersion Include="Nito.AsyncEx.Coordination" Version="5.1.2" />
26+
<PackageVersion Include="Npgsql" Version="8.0.7" />
27+
<PackageVersion Include="Npgsql.DependencyInjection" Version="8.0.7" />
28+
<PackageVersion Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="8.0.11" />
29+
<PackageVersion Include="Npgsql.OpenTelemetry" Version="8.0.7" />
2630
<PackageVersion Include="NSubstitute" Version="5.3.0" />
2731
<PackageVersion Include="OpenTelemetry" Version="1.9.0" />
2832
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.9.0" />
@@ -33,6 +37,7 @@
3337
<PackageVersion Include="Testcontainers" Version="4.6.0" />
3438
<PackageVersion Include="Testcontainers.MySql" Version="4.6.0" />
3539
<PackageVersion Include="Testcontainers.MongoDb" Version="4.6.0" />
40+
<PackageVersion Include="Testcontainers.PostgreSql" Version="4.6.0" />
3641
<PackageVersion Include="xunit.runner.visualstudio" Version="3.1.1" />
3742
<PackageVersion Include="xunit.v3" Version="2.0.3" />
3843
</ItemGroup>

OutboxKit.sln

Lines changed: 198 additions & 3 deletions
Large diffs are not rendered by default.

docker-compose.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,15 @@ services:
2424
- "3306:3306"
2525
environment:
2626
MYSQL_ROOT_PASSWORD: root
27+
28+
postgres:
29+
image: "postgres:17-alpine"
30+
container_name: postgres
31+
ports:
32+
- "5432:5432"
33+
environment:
34+
POSTGRES_USER: "user"
35+
POSTGRES_PASSWORD: "pass"
2736

2837
lgtm:
2938
image: grafana/otel-lgtm

docs/.vitepress/config.mts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ export default defineConfig({
8585
{ text: "Polling", link: "/mysql/polling" },
8686
],
8787
},
88+
{
89+
text: "PostgreSQL",
90+
collapsed: true,
91+
items: [
92+
{ text: "PostgreSQL provider overview", link: "/postgresql/overview" },
93+
{ text: "Polling", link: "/postgresql/polling" },
94+
],
95+
},
8896
{
8997
text: "MongoDB",
9098
collapsed: true,

docs/postgresql/overview.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
---
2+
outline: deep
3+
---
4+
5+
# PostgreSQL provider overview
6+
7+
Installing:
8+
9+
```sh
10+
dotnet add package YakShaveFx.OutboxKit.PostgreSql
11+
```
12+
13+
The PostgreSQL provider is, as the name implies, a provider to have OutboxKit work with PostgreSQL databases. It uses [Npgsql](https://www.npgsql.org) to interact with the database, with not other dependencies (i.e. no EF Core, no Dapper, etc).
14+
15+
The provider currently implements only the polling approach.

docs/postgresql/polling.md

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
---
2+
outline: deep
3+
---
4+
5+
# Polling
6+
7+
To use OutboxKit with the PostgreSQL polling provider (and others as well), you'll need to choose between two paths: accept the library defaults, making your infra match them, or make use of the library's flexibility to adapt to your existing infrastructure.
8+
9+
## Using the defaults
10+
11+
If you choose to use the defaults, you'll need to create a table that matches the schema that OutboxKit expects.
12+
13+
In the box you'll find the `Message` record, which looks something like this:
14+
15+
```csharp
16+
public sealed record Message : IMessage
17+
{
18+
public long Id { get; init; }
19+
public required string Type { get; init; }
20+
public required byte[] Payload { get; init; }
21+
public required DateTime CreatedAt { get; init; }
22+
public byte[]? TraceContext { get; init; }
23+
}
24+
```
25+
26+
The corresponding table is expected to be called `outbox_messages`, while its columns are expected to use PostgreSQL's `snake_case` naming convention, so `id`, `type`, `payload`, `created_at`, and `trace_context`.
27+
28+
Additionally, with these defaults, the `id` column will be used to order the messages.
29+
30+
Assuming these defaults, setting up the provider with DI would look something like this:
31+
32+
```csharp
33+
services.AddOutboxKit(kit =>
34+
kit.WithPostgreSqlPolling(p =>
35+
p.WithConnectionString(connectionString)));
36+
```
37+
38+
## Making it your own
39+
40+
Now, while the defaults are nice, one of the motivations for building OutboxKit in the first place, is to make it possible to adapt to specific applications and their infrastructure, which means there's a bunch of things that can be configured.
41+
42+
Let's start with a snippet that shows all the things you can configure:
43+
44+
```csharp
45+
services.AddOutboxKit(kit =>
46+
kit
47+
.WithPostgreSqlPolling(p =>
48+
p
49+
.WithConnectionString(connectionString)
50+
.WithBatchSize(100)
51+
.WithPollingInterval(TimeSpan.FromMinutes(5))
52+
.WithTable(t => t
53+
.WithName("OutboxMessages")
54+
.WithColumnSelection(
55+
[
56+
"Id",
57+
"Type",
58+
"Payload",
59+
"CreatedAt",
60+
"TraceContext"
61+
])
62+
.WithIdColumn("Id")
63+
.WithSorting([new SortExpression("Id")])
64+
.WithIdGetter(m => ((OutboxMessage)m).Id)
65+
.WithMessageFactory(static r => new OutboxMessage
66+
{
67+
Id = r.GetInt64(0),
68+
Type = r.GetString(1),
69+
Payload = r.GetFieldValue<byte[]>(2),
70+
CreatedAt = r.GetDateTime(3),
71+
TraceContext = r.IsDBNull(4)
72+
? null
73+
: r.GetFieldValue<byte[]>(4)
74+
})
75+
.WithProcessedAtColumn("ProcessedAt"))
76+
.WithUpdateProcessed(u => u
77+
.WithCleanUpInterval(TimeSpan.FromHours(1))
78+
.WithMaxAge(TimeSpan.FromDays(1)))
79+
.WithSelectForUpdateConcurrencyControl()
80+
.WithAdvisoryLockConcurrencyControl()
81+
));
82+
```
83+
84+
So, it's not massive, but there still are a few options available.
85+
86+
Note that not everything is always mandatory, but there are some things that are dependent on each other, so if you set one, you'll need to set some others.
87+
88+
`WithConnectionString` is rather self-explanatory, and is also the only configuration that is, of course, always required.
89+
90+
`WithBatchSize` allows you to set the maximum number of messages that will made available to the `IBatchProducer` in one go.
91+
92+
`WithPollingInterval` allows you to customize how often polling should happen.
93+
94+
`WithTable` is where you can configure the table you want OutboxKit to use. If you want to use the defaults, minus the table name, you can simply use `WithName` and be done with it. However, if you want to customize something else in the schema, then you need to use all the other methods (minus `WithProcessedAtColumn`, but we'll look at that later).
95+
96+
`WithColumnSelection` is where you specify the names of the columns that should be fetched from the table. No need to set all of them, just the ones you need for producing messages, plus the column corresponding to the id, as it will be needed to acknowledge the messages produced.
97+
98+
The name passed to `WithIdColumn` will be used when acknowledging the messages produced.
99+
100+
`WithSorting` receives a collection of column names, as well as a sort direction, which are used to sort the rows when fetching them from the outbox.
101+
102+
When acknowledging the messages, the function passed to `WithIdGetter` will be used to get the id from the message instance, which will then be used for message completion.
103+
104+
Because of all the schema customization, the library has no idea how to construct a message instance. For this reason, you need to provide your own implementation using `WithMessageFactory`. You get a `PostgreSqlDataReader` as an argument, and you need to return an instance of something that implements `IMessage`. The order in which you configure the columns in `WithColumnSelection` is important, as it matches the indexes in the `PostgreSqlDataReader`.
105+
106+
Let's talk about `WithUpdateProcessed`, then come back to `WithProcessedAtColumn`.
107+
108+
By default, OutboxKit will immediately delete the messages that have been produced. However, if you want to keep them around for a while, you can change the strategy to mark them as processed instead. To do this, you use `WithUpdateProcessed`.
109+
110+
When using `WithUpdateProcessed`, you can configure how often the messages should be cleaned up using `WithCleanUpInterval`, and how old the messages should be before they are cleaned up using `WithMaxAge`.
111+
112+
Note that, if you use `WithUpdateProcessed`, you must use `WithProcessedAtColumn`, in order for the library to do its magic. When marking the messages as processed, OutboxKit will set the column to a `DateTime` in UTC, obtained from a [`TimeProvider`](https://learn.microsoft.com/en-us/dotnet/api/system.timeprovider) it gets from DI.
113+
114+
`WithSelectForUpdateConcurrencyControl` and `WithAdvisoryLockConcurrencyControl` are two available options to handle concurrency control. In some scenarios, using advisory locks might provide performance benefits when compared to "SELECT ... FOR UPDATE", given it avoids locking the actual rows in the outbox table as they're being produced.
115+
116+
## Multi-database
117+
118+
If your application uses multiple PostgreSQL databases, and you need an outbox for each of them (for example you have a multi-tenant application, where each tenant uses a different database), everything we discussed so far still applies, you just need to tweak things very slightly.
119+
120+
`WithPostgreSqlPolling` has an overload that takes a `string` as the first argument, allowing you to identify the outbox.
121+
122+
Taking the defaults approach as an example, you could set up two outboxes like this:
123+
124+
```csharp
125+
services.AddOutboxKit(kit =>
126+
kit
127+
.WithPostgreSqlPolling(
128+
tenantOne,
129+
p => p.WithConnectionString(connectionStringOne))
130+
.WithPostgreSqlPolling(
131+
tenantTwo,
132+
p => p.WithConnectionString(connectionStringTwo)));
133+
```
134+
135+
As you can infer, this means you can not only have multiple databases, but you can also configure them differently (not sure it's the most relevant thing ever, but hey, it works).
136+
137+
As discussed in [Core/Producing messages](/core/producing-messages), the `IBatchProducer` `ProduceAsync` method receives an `OutboxKey`, composed by a provider key (`"mysql_polling"` in this case) and a client key, which is what you passed to `WithPostgreSqlPolling`. If you only have one outbox and don't set the key, you'll get the `string` `"default"`.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# `YakShaveFx.OutboxKit.PostgreSql`
2+
3+
PostgreSQL specific library on top of which to implement the transactional outbox pattern.
4+
5+
Docs available at [https://outboxkit.yakshavefx.dev](https://outboxkit.yakshavefx.dev)

0 commit comments

Comments
 (0)