Skip to content

Commit 46ea0a5

Browse files
authored
Merge pull request #63 from Lctrs/messenger-middleware
Add EventStoreTransactionMiddleware to be used with symfony/messenger
2 parents bee69da + fc4f5fa commit 46ea0a5

File tree

5 files changed

+156
-2
lines changed

5 files changed

+156
-2
lines changed

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@
6969
"friendsofphp/php-cs-fixer": "^2.8.1",
7070
"prooph/php-cs-fixer-config": "^0.2.1",
7171
"matthiasnoback/symfony-dependency-injection-test": "^2.3",
72-
"phpstan/phpstan": "^0.9.2"
72+
"phpstan/phpstan": "^0.9.2",
73+
"symfony/messenger": "^4.3"
7374
},
7475
"suggest": {
7576
"prooph/event-store-bus-bridge": "To Marry CQRS (ProophSerivceBus) with Event Sourcing"

doc/bookdown.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
{"event_store": "event_store.md"},
77
{"projection_manager": "projection_manager.md"},
88
{"event_store_bus_bridge": "event_store_bus_bridge.md"},
9-
{"configuration_reference": "configuration_reference.md"}
9+
{"configuration_reference": "configuration_reference.md"},
10+
{"messenger": "messenger.md"}
1011
],
1112
"target": "./html",
1213
"tocDepth": 2,

doc/messenger.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Messenger integration
2+
3+
This bundle provides a middleware for the
4+
`symfony/messenger` component (from version `4.3`) which handles
5+
starting/committing/rolling back a transaction when sending a command
6+
to the bus.
7+
8+
Here is an example configuration on how to use it:
9+
```yaml
10+
# app/config/messenger.yaml
11+
12+
framework:
13+
messenger:
14+
buses:
15+
command.bus:
16+
middleware:
17+
- my_eventstore_transaction_middleware
18+
19+
services:
20+
my_eventstore_transaction_middleware:
21+
class: Prooph\Bundle\EventStore\Messenger\EventStoreTransactionMiddleware
22+
arguments:
23+
- '@my_transactional_event_store'
24+
```
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Prooph\Bundle\EventStore\Messenger;
6+
7+
use Prooph\EventStore\TransactionalEventStore;
8+
use Symfony\Component\Messenger\Envelope;
9+
use Symfony\Component\Messenger\Exception\HandlerFailedException;
10+
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
11+
use Symfony\Component\Messenger\Middleware\StackInterface;
12+
use Symfony\Component\Messenger\Stamp\HandledStamp;
13+
use Throwable;
14+
15+
final class EventStoreTransactionMiddleware implements MiddlewareInterface
16+
{
17+
/** @var TransactionalEventStore */
18+
private $eventStore;
19+
20+
public function __construct(TransactionalEventStore $eventStore)
21+
{
22+
$this->eventStore = $eventStore;
23+
}
24+
25+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
26+
{
27+
$this->eventStore->beginTransaction();
28+
29+
try {
30+
$envelope = $stack->next()->handle($envelope, $stack);
31+
32+
$this->eventStore->commit();
33+
} catch (Throwable $e) {
34+
$this->eventStore->rollback();
35+
36+
if ($e instanceof HandlerFailedException) {
37+
// Remove all HandledStamp from the envelope so the retry will execute all handlers again.
38+
// When a handler fails, the queries of allegedly successful previous handlers just got rolled back.
39+
throw new HandlerFailedException(
40+
$e->getEnvelope()->withoutAll(HandledStamp::class),
41+
$e->getNestedExceptions()
42+
);
43+
}
44+
45+
throw $e;
46+
}
47+
48+
return $envelope;
49+
}
50+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace ProophTest\Bundle\EventStore\Messenger;
6+
7+
use LogicException;
8+
use Prooph\Bundle\EventStore\Messenger\EventStoreTransactionMiddleware;
9+
use Prooph\EventStore\TransactionalEventStore;
10+
use stdClass;
11+
use Symfony\Component\Messenger\Envelope;
12+
use Symfony\Component\Messenger\Exception\HandlerFailedException;
13+
use Symfony\Component\Messenger\Stamp\HandledStamp;
14+
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
15+
use Throwable;
16+
17+
class EventStoreTransactionMiddlewareTest extends MiddlewareTestCase
18+
{
19+
/** @var TransactionalEventStore */
20+
private $eventStore;
21+
/** @var EventStoreTransactionMiddleware */
22+
private $middleware;
23+
24+
public function setUp(): void
25+
{
26+
$this->eventStore = $this->createMock(TransactionalEventStore::class);
27+
$this->middleware = new EventStoreTransactionMiddleware($this->eventStore);
28+
}
29+
30+
public function testMiddlewareWrapsInTransactionAndFlushes(): void
31+
{
32+
$this->eventStore->expects($this->once())
33+
->method('beginTransaction');
34+
$this->eventStore->expects($this->once())
35+
->method('commit');
36+
37+
$this->middleware->handle(new Envelope(new stdClass()), $this->getStackMock());
38+
}
39+
40+
/**
41+
* @expectedException \RuntimeException
42+
* @expectedExceptionMessage Thrown from next middleware.
43+
*/
44+
public function testTransactionIsRolledBackOnException(): void
45+
{
46+
$this->eventStore->expects($this->once())
47+
->method('beginTransaction');
48+
$this->eventStore->expects($this->once())
49+
->method('rollback');
50+
51+
$this->middleware->handle(new Envelope(new stdClass()), $this->getThrowingStackMock());
52+
}
53+
54+
public function testItResetsHandledStampsOnHandlerFailedException(): void
55+
{
56+
$this->eventStore->expects($this->once())
57+
->method('beginTransaction');
58+
$this->eventStore->expects($this->once())
59+
->method('rollback');
60+
61+
$envelop = new Envelope(new stdClass(), [
62+
new HandledStamp('dummy', 'dummy'),
63+
]);
64+
65+
$exception = null;
66+
try {
67+
$this->middleware->handle($envelop, $this->getThrowingStackMock(new HandlerFailedException($envelop, [
68+
new LogicException('dummy exception'),
69+
])));
70+
} catch (Throwable $e) {
71+
$exception = $e;
72+
}
73+
74+
$this->assertInstanceOf(HandlerFailedException::class, $exception);
75+
/** @var HandlerFailedException $exception */
76+
$this->assertSame([], $exception->getEnvelope()->all(HandledStamp::class));
77+
}
78+
}

0 commit comments

Comments
 (0)