Skip to content

Commit cba9f24

Browse files
Projection snapshot - initial framework
1 parent 8319040 commit cba9f24

File tree

9 files changed

+352
-12
lines changed

9 files changed

+352
-12
lines changed

src/EventSourcingOnAzureFunctions.Common/EventSourcing/Implementation/AzureStorage/AppendBlob/BlobEventStreamBase.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,17 @@ public abstract class BlobEventStreamBase
2121
public const string METADATA_DATE_CREATED = "DATECREATED";
2222
public const string METADATA_CORRELATION_ID = "CORRELATIONIDENTIFIER";
2323

24-
// Named specific sub-folders
24+
/// <summary>
25+
/// The default folder where uncateggorised entities are stored
26+
/// </summary>
2527
public const string ORPHANS_FOLDER = "uncategorised";
28+
/// <summary>
29+
/// The default subfolder where the event streams are stored
30+
/// </summary>
2631
public const string EVENTSTREAM_FOLDER = "eventstreams";
32+
/// <summary>
33+
/// The default subfolder where projection snapshots are stored
34+
/// </summary>
2735
public const string SNAPSHOTS_FOLDER = "snapshots";
2836

2937
private readonly CloudBlobContainer _blobBasePath;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
using EventSourcingOnAzureFunctions.Common.EventSourcing.Implementation.AzureStorage.AppendBlob;
2+
using EventSourcingOnAzureFunctions.Common.EventSourcing.Interfaces;
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Text;
6+
7+
namespace EventSourcingOnAzureFunctions.Common.EventSourcing.Implementation.AzureStorage.Blob
8+
{
9+
public class BlobSnapshotBase
10+
{
11+
12+
/// <summary>
13+
/// Turn the given snapshot definition into an unique filename
14+
/// </summary>
15+
/// <param name="snapshot">
16+
/// The snapshot definition to be saved or read
17+
/// </param>
18+
/// <remarks>
19+
/// The filename is the sequence number with the rest of the snapshot definition
20+
/// going into the folder path
21+
/// </remarks>
22+
public static string MakeSnapshotFilename(ISnapshot snapshot)
23+
{
24+
if (snapshot != null)
25+
{
26+
return BlobEventStreamBase.MakeValidStorageFolderName($"{snapshot.CurrentSequenceNumber}");
27+
}
28+
return string.Empty;
29+
}
30+
31+
/// <summary>
32+
/// Make a folder path in which a snapshot for a projection can be saved
33+
/// </summary>
34+
/// <param name="snapshot"></param>
35+
/// <returns></returns>
36+
public static string MakeSnapshotFolder(ISnapshot snapshot)
37+
{
38+
if (snapshot != null)
39+
{
40+
return BlobEventStreamBase.MakeValidStorageFolderName($"{snapshot.DomainName}/{snapshot.EntityTypeName}/{snapshot.InstanceKey}");
41+
}
42+
return string.Empty;
43+
}
44+
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
using EventSourcingOnAzureFunctions.Common.EventSourcing.Interfaces;
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Text;
5+
using System.Threading.Tasks;
6+
7+
namespace EventSourcingOnAzureFunctions.Common.EventSourcing.Implementation.AzureStorage.Blob
8+
{
9+
/// <summary>
10+
/// Class to read a projection snapshot from an Azure storage blob
11+
/// </summary>
12+
public sealed class BlobProjectionSnapshotReader
13+
: BlobSnapshotBase,
14+
IProjectionSnapshotReader
15+
{
16+
17+
/// <summary>
18+
/// Loads the snapshot record closes to the requested as-of sequence number
19+
/// from which to start running a projection
20+
/// </summary>
21+
/// <typeparam name="TProjection">
22+
/// The projection into which to load the initial snapshot
23+
/// </typeparam>
24+
/// <param name="snapshot">
25+
/// The snapshot definition from which to load
26+
/// </param>
27+
public async Task<TProjection> LoadProjection<TProjection>(ISnapshot snapshot) where TProjection : IProjectionWithSnapshots, new()
28+
{
29+
if (snapshot != null)
30+
{
31+
if (snapshot.CurrentSequenceNumber <= 0)
32+
{
33+
// We are just asking for the latest available snapshot
34+
35+
}
36+
// Create a filename from the snapshot
37+
string filename = MakeSnapshotFilename(snapshot);
38+
string filepath = MakeSnapshotFolder(snapshot);
39+
40+
TProjection projectionToLoad = new TProjection();
41+
42+
return projectionToLoad;
43+
}
44+
throw new NotImplementedException();
45+
}
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
using EventSourcingOnAzureFunctions.Common.EventSourcing.Interfaces;
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Text;
5+
using System.Threading.Tasks;
6+
7+
namespace EventSourcingOnAzureFunctions.Common.EventSourcing.Implementation.AzureStorage.Blob
8+
{
9+
/// <summary>
10+
/// Writer to save projection snapshots in an Azure storage blob
11+
/// </summary>
12+
public sealed class BlobProjectionSnapshotWriter
13+
: BlobSnapshotBase,
14+
IProjectionSnapshotWriter
15+
{
16+
17+
18+
public async Task WriteSnapshot<TProjection>(ISnapshot snapshot,
19+
TProjection state) where TProjection : IProjectionWithSnapshots
20+
{
21+
if (snapshot != null)
22+
{
23+
if (state != null)
24+
{
25+
// Create a filename from the snapshot
26+
string filename = MakeSnapshotFilename(snapshot);
27+
string filepath = MakeSnapshotFolder(snapshot);
28+
29+
30+
}
31+
}
32+
throw new NotImplementedException();
33+
}
34+
35+
36+
}
37+
}

src/EventSourcingOnAzureFunctions.Common/EventSourcing/Interfaces/IProjectionSnapshotReader.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public interface IProjectionSnapshotReader
2222
/// The entity instance and sequence number to get the snapshot from
2323
/// (If no sequence number is specified, get the latest snapshot)
2424
/// </param>
25-
Task<TProjection> LoadProjection<TProjection>(ISnapshot snapshot) where TProjection : IProjection, new();
25+
Task<TProjection> LoadProjection<TProjection>(ISnapshot snapshot) where TProjection : IProjectionWithSnapshots, new();
2626

2727
}
2828
}

src/EventSourcingOnAzureFunctions.Common/EventSourcing/Interfaces/IProjectionSnapshotWriter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public interface IProjectionSnapshotWriter
2323
/// <param name="state">
2424
/// The projection state as at that point
2525
/// </param>
26-
Task WriteSnapshot<TProjection>(ISnapshot snapshot, TProjection state) where TProjection : IProjection;
26+
Task WriteSnapshot<TProjection>(ISnapshot snapshot, TProjection state) where TProjection : IProjectionWithSnapshots;
2727

2828
}
2929
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace EventSourcingOnAzureFunctions.Common.EventSourcing.Interfaces
6+
{
7+
/// <summary>
8+
/// A projection which supports saving a state to, and reading a state from, a snapshot
9+
/// </summary>
10+
public interface IProjectionWithSnapshots
11+
: IProjection
12+
{
13+
14+
/// <summary>
15+
/// Read the current state of a projection from a snapshot
16+
/// </summary>
17+
/// <param name="sequenceNumber">
18+
/// The sequence number at which the snapshot was taken
19+
/// </param>
20+
/// <param name="asOfDate">
21+
/// The as-of date at which the snapshot was taken
22+
/// </param>
23+
/// <param name="snapshot">
24+
/// The raw data object of the state as at when the snapshot was taken
25+
/// </param>
26+
void FromSnapshot(int sequenceNumber,
27+
Nullable<DateTime> asOfDate,
28+
object snapshot);
29+
30+
/// <summary>
31+
/// Turn the current state of this projection into a snapshot object to be saved
32+
/// </summary>
33+
object ToSnapshot();
34+
35+
}
36+
}

src/EventSourcingOnAzureFunctions.Common/EventSourcing/Projection.cs

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,18 @@
88

99
namespace EventSourcingOnAzureFunctions.Common.EventSourcing
1010
{
11+
/// <summary>
12+
/// A projection is a piece of code that runs over the event stream for an entity in order to derive some state information about that entity.
13+
/// For each event in the stream it needs to decide (a) am I interested in this kind of event and if so (b) what do I do with it.
14+
/// </summary>
1115
public class Projection
1216
: IEventStreamIdentity
1317
{
1418

1519
private readonly IEventStreamSettings _settings = null;
1620
private readonly IProjectionProcessor _projectionProcessor = null;
1721
private readonly INotificationDispatcher _notificationDispatcher = null;
18-
private readonly IProjectionSnapshotWriter _snapshortWriter = null;
22+
private readonly IProjectionSnapshotWriter _snapshotWriter = null;
1923
private readonly IProjectionSnapshotReader _snapshotReader = null;
2024

2125
private readonly string _domainName;
@@ -56,10 +60,11 @@ public string InstanceKey
5660
}
5761
}
5862

63+
64+
private readonly string _projectionTypeName;
5965
/// <summary>
6066
/// The type of the projection we are going to run
6167
/// </summary>
62-
private readonly string _projectionTypeName;
6368
public string ProjectionTypeName
6469
{
6570
get
@@ -69,13 +74,42 @@ public string ProjectionTypeName
6974
}
7075

7176

77+
/// <summary>
78+
/// Process the projection and return the results
79+
/// </summary>
80+
/// <typeparam name="TProjection">
81+
/// The type of the projection to be executed
82+
/// </typeparam>
83+
/// <param name="asOfDate">
84+
/// The date up to which to run the projection.
85+
/// If this is not specified then run to the end of the event stream.
86+
/// </param>
87+
/// <returns>
88+
/// A task which is used to execute the projection
89+
/// </returns>
7290
public async Task<TProjection> Process<TProjection>(Nullable<DateTime> asOfDate = null) where TProjection : IProjection, new()
7391
{
7492
TProjection projectionToRun = new TProjection();
7593
return await Process(projectionToRun, asOfDate);
7694
}
7795

78-
96+
/// <summary>
97+
/// Process the projection and return the results
98+
/// </summary>
99+
/// <typeparam name="TProjection">
100+
/// The type of the projection to be executed
101+
/// </typeparam>
102+
/// <param name="projectionToRun">
103+
/// The instance of the projection to start processing from
104+
/// (This allows a projection to start from a given initial state)
105+
/// </param>
106+
/// <param name="asOfDate">
107+
/// The date up to which to run the projection.
108+
/// If this is not specified then run to the end of the event stream.
109+
/// </param>
110+
/// <returns>
111+
/// A task which is used to execute the projection
112+
/// </returns>
79113
public async Task<TProjection> Process<TProjection>(TProjection projectionToRun, Nullable<DateTime> asOfDate = null) where TProjection : IProjection
80114
{
81115
if (null != _projectionProcessor)
@@ -111,16 +145,23 @@ await _notificationDispatcher.ProjectionCompleted(this,
111145
/// <param name="state">
112146
/// The state of the projection when snapshotted
113147
/// </param>
114-
public async Task WriteSnapshot<TProjection>(ISnapshot snapshot, TProjection state) where TProjection : IProjection
148+
public async Task WriteSnapshot<TProjection>(ISnapshot snapshot, TProjection state) where TProjection : IProjectionWithSnapshots
115149
{
116-
// If there is a snapshot writer, use it...
117-
if (null != _snapshortWriter)
150+
if (state.SupportsSnapshots)
118151
{
119-
await _snapshortWriter.WriteSnapshot(snapshot, state);
152+
// If there is a snapshot writer, use it...
153+
if (null != _snapshotWriter)
154+
{
155+
await _snapshotWriter.WriteSnapshot(snapshot, state);
156+
}
120157
}
121158
}
122159

123160
private readonly string _connectionStringName;
161+
162+
/// <summary>
163+
/// The name of the connection string used to run this projection
164+
/// </summary>
124165
public string ConnectionStringName
125166
{
126167
get
@@ -193,7 +234,7 @@ public Projection(ProjectionAttribute attribute,
193234

194235
if (null != snapshotWriter )
195236
{
196-
_snapshortWriter = snapshotWriter;
237+
_snapshotWriter = snapshotWriter;
197238
}
198239

199240
}

0 commit comments

Comments
 (0)