Skip to content

Commit 87cd205

Browse files
ailtonguitarfilipeesch
authored andcommitted
feat: add event notification feature
1 parent 356865e commit 87cd205

20 files changed

+359
-203
lines changed
Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
namespace KafkaFlow
22
{
3-
using System;
4-
using KafkaFlow.Observer;
5-
63
/// <summary>
74
/// Represents the interface of a internal worker
85
/// </summary>
@@ -14,19 +11,18 @@ public interface IWorker
1411
int Id { get; }
1512

1613
/// <summary>
17-
/// This handler is called immediately after a worker completes the consumption of a message
14+
/// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
1815
/// </summary>
19-
/// <param name="handler"><see cref="Action"/> to be executed</param>
20-
void OnTaskCompleted(Action handler);
16+
IEvent WorkerStopping { get; }
2117

2218
/// <summary>
23-
/// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
19+
/// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
2420
/// </summary>
25-
ISubject<WorkerStoppingSubject, VoidObject> WorkerStopping { get; }
21+
IEvent WorkerStopped { get; }
2622

2723
/// <summary>
28-
/// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
24+
/// Gets the subject for worker consumption completed events where observers can subscribe to receive notifications.
2925
/// </summary>
30-
ISubject<WorkerStoppedSubject, VoidObject> WorkerStopped { get; }
26+
IEvent<IMessageContext> WorkerProcessingEnded { get; }
3127
}
3228
}

src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs

Lines changed: 0 additions & 19 deletions
This file was deleted.

src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs

Lines changed: 0 additions & 19 deletions
This file was deleted.

src/KafkaFlow.Abstractions/IEvent.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
namespace KafkaFlow
2+
{
3+
using System;
4+
using System.Threading.Tasks;
5+
6+
/// <summary>
7+
/// Represents an Event to be subscribed.
8+
/// </summary>
9+
public interface IEvent
10+
{
11+
/// <summary>
12+
/// Subscribes to the event.
13+
/// </summary>
14+
/// <param name="handler">The handler to be called when the event is fired.</param>
15+
/// <returns>Event subscription reference</returns>
16+
IEventSubscription Subscribe(Func<Task> handler);
17+
}
18+
19+
/// <summary>
20+
/// Represents an Event to be subscribed.
21+
/// </summary>
22+
/// <typeparam name="TArg">The argument expected by the event.</typeparam>
23+
public interface IEvent<TArg>
24+
{
25+
/// <summary>
26+
/// Subscribes to the event.
27+
/// </summary>
28+
/// <param name="handler">The handler to be called when the event is fired.</param>
29+
/// <returns>Event subscription reference</returns>
30+
IEventSubscription Subscribe(Func<TArg, Task> handler);
31+
}
32+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
namespace KafkaFlow;
2+
3+
/// <summary>
4+
/// Represents an Event subscription.
5+
/// </summary>
6+
public interface IEventSubscription
7+
{
8+
/// <summary>
9+
/// Cancels the subscription to the event.
10+
/// </summary>
11+
void Cancel();
12+
}

src/KafkaFlow.Abstractions/Observer/ISubject.cs

Lines changed: 0 additions & 17 deletions
This file was deleted.

src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs

Lines changed: 0 additions & 18 deletions
This file was deleted.

src/KafkaFlow.Abstractions/Observer/Subject.cs

Lines changed: 0 additions & 53 deletions
This file was deleted.

src/KafkaFlow.Abstractions/VoidObject.cs

Lines changed: 0 additions & 17 deletions
This file was deleted.

src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,8 @@
77
using System.Threading.Tasks;
88
using KafkaFlow.Configuration;
99
using KafkaFlow.Consumers;
10-
using KafkaFlow.Observer;
1110

12-
internal class BatchConsumeMiddleware
13-
: IMessageMiddleware,
14-
ISubjectObserver<WorkerStoppedSubject, VoidObject>,
15-
IDisposable
11+
internal class BatchConsumeMiddleware : IMessageMiddleware, IDisposable
1612
{
1713
private readonly SemaphoreSlim dispatchSemaphore = new(1, 1);
1814

@@ -37,7 +33,7 @@ public BatchConsumeMiddleware(
3733
this.batch = new(batchSize);
3834
this.consumerConfiguration = middlewareContext.Consumer.Configuration;
3935

40-
middlewareContext.Worker.WorkerStopped.Subscribe(this);
36+
middlewareContext.Worker.WorkerStopped.Subscribe(() => this.TriggerDispatchAndWaitAsync());
4137
}
4238

4339
public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
@@ -67,8 +63,6 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
6763
}
6864
}
6965

70-
public async Task OnNotification(WorkerStoppedSubject subject, VoidObject arg) => await this.TriggerDispatchAndWaitAsync();
71-
7266
public void Dispose()
7367
{
7468
this.dispatchTask?.Dispose();

src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void Setup()
5151

5252
workerMock
5353
.SetupGet(x => x.WorkerStopped)
54-
.Returns(new WorkerStoppedSubject(this.logHandlerMock.Object));
54+
.Returns(new Event(this.logHandlerMock.Object));
5555

5656
consumerConfigurationMock
5757
.SetupGet(x => x.AutoMessageCompletion)

0 commit comments

Comments
 (0)