-
Notifications
You must be signed in to change notification settings - Fork 33
Snapshot Service
CQRS allows events to be replayed to recreate the current state of an aggregate. However, as the number of events to be replayed increases, this operation can become more and more resource intensive. Snapshots allow the state of an aggregate to be captured at regular intervals, ensuring that only a small number of events would need to be replayed after the latest snapshot is applied.
The Snapshot Service has two primary tasks: reading snapshots and writing snapshots: Reading occurs each time a command comes into the system. Writing does not occur each time, but is written only if the the rules determined by a snapshot strategy require it.
Once a CommandHandler receives a command, an aggregate is rebuilt from it's previously stored events. If the snapshot service is enabled, then the latest snapshot is read and deserialized into and aggregate. Any later events not present in the snapshot are then appended to the stream. It is here that the efficiency gains from using snapshots will be seen.
Once the aggregate has been rebuilt the incoming command is converted to events, and these are then appended to the stream. If the snapshot strategy determines that a new snapshot should be created, then a new snapshot is written.
This is a class which defines the rule for when a new snapshot should be written. By default it requires a new snapshot to be written every fifty events.
- A command comes in to the Command Handler.
- The Aggregate Service is queried for the latest aggregate.
- The Aggregate Service gets the latest snapshot from the Snapshot tables.
- The latest Snapshot is returned as a versioned aggregate.
- The Aggregate is built from the latest returned Snapshot.
- The Event Log tables are read and any new events not present in the snapshot are appended to the aggregate stream.
- The command is converted into events and are applied to the event stream. This is the usual aggregate behaviour.
- The Snapshot Strategy is read to determine if a new Snapshot should be written.
- If a new Snapshot should be written, then a new Snapshot is written to the snapshot database tables.
The entire aggregate is stored in the database as a Serialized Java object. This introduces several problems. Firstly, it means that all Aggregates must be Java Serializable. Secondly, there is the problem of Aggregates changing over time: fields can be added or renamed. This is solved by introducing the concept of Versioning.
The aggregate is stored in one table in the viewstore database snapshot
, which has the following structure:
Column Name | Type | Description |
---|---|---|
stream_id | uuid | the id of the associated stream |
version_id | long | the current version of the associated stream |
type | String | the java classname of the aggregate |
aggregate | byte array | the serialized bytecode of the aggregate |
version_id
is the current version of the event stream. This will be incremented each time an event is appended to the stream and is used by the default snapshot strategy to determine if a new snapshot should be created.
The table is created from liquibase scripts found in aggregate-snapshot-repository-liquibase.
To run liquibase to add the snapshot table to the viewstore database, firstly cd into the root of the microservices project and run the following command:
mvn -f aggregate-snapshot/aggregate-snapshot-repository-liquibase/pom.xml -Dliquibase.url=jdbc:postgresql://localhost:5432/${CONTEXT_NAME}viewstore -Dliquibase.username=${CONTEXT_NAME} -Dliquibase.password=${CONTEXT_NAME} -Dliquibase.logLevel=info resources:resources liquibase:update
where ${CONTEXT_NAME} is the name of the context you wish to update
All snapshots are actually the Java Serialized bytecode of the Aggregate Object. However, Aggregates can change over time: fields can be added/removed or renamed. This means that the Aggregate object would no longer be deserializable from the database bytecode. To solve this, all Aggregates must be versioned. This is done by implementing Java serialVersionUID
in the Aggregate class. If on deserialization, the serialVersionUID
has changed, the AggregateService throws an AggregateChangeDetectedException
. The Snapshot Service then assumes that all previous Aggregates are obsolete and they are then deleted from the database. The aggregate is subsequently rebuilt from the entire event log and the latest version of the Aggregate is stored as a new snapshot.
By default, all contexts use a Snapshot Service. The default version DefaultAggregateService
is a 'do nothing' version of the Snapshot Service. To enable snapshots, then microservices needs to know to load SnapshotAwareAggregateService
in your command handler rather than the default. Both classes implement the AggregateService
interface. The SnapshotAwareAggregateService
is loaded by using some CDI magic. SnapshotAwareAggregateService' is contained in an optional microservices jar:
aggregate-snapshot.jar. If this jar is on the classpath, then as
SnapshotAwareAggregateService` has a higher priority than the default, then it will be injected into your command handler instead.
This class is used to determine the frequency of when new snapshots are created. The default DefaultSnapshotStrategy
uses the version_id from the database and creates a new snapshot after every 25 events are created. To
Snapshot Service is enable simply by adding this dependency in the pom of the module containing the aggregate you intend to snapshot.
<dependency>
<groupId>uk.gov.justice.services</groupId>
<artifactId>aggregate-snapshot-service</artifactId>
<version>${project.version}</version>
</dependency>