Skip to content

Commit ad3fc8e

Browse files
(#384) added pull progress report via event OfflineDbContext.SynchronizationProgress (#387)
* (#377) Fix for possible concurrent query during pull * (#384) Added SynchronizationProgress event * (#384) Added test scenario for SynchronizationProgress event (OfflineDbContext_Tests.SynchronizationProgress_Event_Works) * (#384) Added SynchronizationEventType PullStarted and PullEnded. Added Exception and ServiceResponse to SynchronizationEventArgs. Fixed bug in SynchronizationProgress_Event_Works test. Initialization of eventFired was wrong. SynchronizationProgress_Event_Works added tests for start and end. * (#384) Added test PullAsync_List_FailedRequest_SynchronizationEventWorks Added Synchronization events for push operations and test for it. * (#384) Changed DateTimeConverter on client and server side to handle 'default' DateTime correctly. * (#384) Renamed TotalNrItems on SynchronizationEventArgs to ItemsTotal. Made DateTimeConverter on server Except again if input is empty. Optimised DataTimeConverter to avoid double parsing. * Update src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Added DateTimeConverte tests for verifying default time, local time and utc time conversion. * Changed QueueHandler_WithThreads_Enqueue test to sleep a thread for 2 seconds instead of 1. This reduces the risk of thread reusage which could cause the distinct threads count to fail. --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent ec58dcb commit ad3fc8e

File tree

11 files changed

+555
-47
lines changed

11 files changed

+555
-47
lines changed

src/CommunityToolkit.Datasync.Client/Offline/OfflineDbContext.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ public abstract partial class OfflineDbContext : DbContext
9999
/// </summary>
100100
internal OperationsQueueManager QueueManager { get; }
101101

102+
/// <summary>
103+
/// An event delegate that allows the app to monitor synchronization events.
104+
/// </summary>
105+
/// <remarks>This event can be called from background threads.</remarks>
106+
public event EventHandler<SynchronizationEventArgs>? SynchronizationProgress;
107+
102108
/// <summary>
103109
/// Initializes a new instance of the <see cref="OfflineDbContext" /> class. The
104110
/// <see cref="OnConfiguring(DbContextOptionsBuilder)" /> method will be called to
@@ -561,6 +567,15 @@ public async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, bool add
561567
return await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken).ConfigureAwait(false);
562568
}
563569

570+
/// <summary>
571+
/// Sends a synchronization event to the consumers.
572+
/// </summary>
573+
/// <param name="eventArgs">The event arguments.</param>
574+
internal void SendSynchronizationEvent(SynchronizationEventArgs eventArgs)
575+
{
576+
SynchronizationProgress?.Invoke(this, eventArgs);
577+
}
578+
564579
#region IDisposable
565580
/// <summary>
566581
/// Ensure that the context has not been disposed.

src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs

Lines changed: 100 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
using System.Reflection;
1515
using System.Text.Json;
1616
using System.Text.Json.Serialization;
17+
using static CommunityToolkit.Datasync.Client.Offline.Operations.PullOperationManager;
1718

1819
namespace CommunityToolkit.Datasync.Client.Offline.Operations;
1920

@@ -53,61 +54,87 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
5354

5455
QueueHandler<PullResponse> databaseUpdateQueue = new(1, async pullResponse =>
5556
{
56-
DateTimeOffset lastSynchronization = await DeltaTokenStore.GetDeltaTokenAsync(pullResponse.QueryId, cancellationToken).ConfigureAwait(false);
57-
foreach (object item in pullResponse.Items)
57+
if (pullResponse.Items.Any())
5858
{
59-
EntityMetadata metadata = EntityResolver.GetEntityMetadata(item, pullResponse.EntityType);
60-
object? originalEntity = await context.FindAsync(pullResponse.EntityType, [metadata.Id], cancellationToken).ConfigureAwait(false);
61-
62-
if (originalEntity is null && !metadata.Deleted)
63-
{
64-
_ = context.Add(item);
65-
result.IncrementAdditions();
66-
}
67-
else if (originalEntity is not null && metadata.Deleted)
59+
DateTimeOffset lastSynchronization = await DeltaTokenStore.GetDeltaTokenAsync(pullResponse.QueryId, cancellationToken).ConfigureAwait(false);
60+
foreach (object item in pullResponse.Items)
6861
{
69-
_ = context.Remove(originalEntity);
70-
result.IncrementDeletions();
71-
}
72-
else if (originalEntity is not null && !metadata.Deleted)
73-
{
74-
// Gather properties marked with [JsonIgnore]
75-
HashSet<string> ignoredProps = pullResponse.EntityType
76-
.GetProperties(BindingFlags.Public | BindingFlags.Instance)
77-
.Where(p => p.IsDefined(typeof(JsonIgnoreAttribute), inherit: true))
78-
.Select(p => p.Name)
79-
.ToHashSet();
80-
81-
EntityEntry originalEntry = context.Entry(originalEntity);
82-
EntityEntry newEntry = context.Entry(item);
83-
84-
// Only copy properties that are not marked with [JsonIgnore]
85-
foreach (IProperty property in originalEntry.Metadata.GetProperties())
62+
EntityMetadata metadata = EntityResolver.GetEntityMetadata(item, pullResponse.EntityType);
63+
object? originalEntity = await context.FindAsync(pullResponse.EntityType, [metadata.Id], cancellationToken).ConfigureAwait(false);
64+
65+
if (originalEntity is null && !metadata.Deleted)
66+
{
67+
_ = context.Add(item);
68+
result.IncrementAdditions();
69+
}
70+
else if (originalEntity is not null && metadata.Deleted)
71+
{
72+
_ = context.Remove(originalEntity);
73+
result.IncrementDeletions();
74+
}
75+
else if (originalEntity is not null && !metadata.Deleted)
8676
{
87-
if (!ignoredProps.Contains(property.Name))
77+
// Gather properties marked with [JsonIgnore]
78+
HashSet<string> ignoredProps = pullResponse.EntityType
79+
.GetProperties(BindingFlags.Public | BindingFlags.Instance)
80+
.Where(p => p.IsDefined(typeof(JsonIgnoreAttribute), inherit: true))
81+
.Select(p => p.Name)
82+
.ToHashSet();
83+
84+
EntityEntry originalEntry = context.Entry(originalEntity);
85+
EntityEntry newEntry = context.Entry(item);
86+
87+
// Only copy properties that are not marked with [JsonIgnore]
88+
foreach (IProperty property in originalEntry.Metadata.GetProperties())
8889
{
89-
originalEntry.Property(property.Name).CurrentValue = newEntry.Property(property.Name).CurrentValue;
90+
if (!ignoredProps.Contains(property.Name))
91+
{
92+
originalEntry.Property(property.Name).CurrentValue = newEntry.Property(property.Name).CurrentValue;
93+
}
9094
}
95+
96+
result.IncrementReplacements();
9197
}
9298

93-
result.IncrementReplacements();
99+
if (metadata.UpdatedAt > lastSynchronization)
100+
{
101+
lastSynchronization = metadata.UpdatedAt.Value;
102+
bool isAdded = await DeltaTokenStore.SetDeltaTokenAsync(pullResponse.QueryId, metadata.UpdatedAt.Value, cancellationToken).ConfigureAwait(false);
103+
if (isAdded)
104+
{
105+
// Sqlite oddity - you can't add then update; it changes the change type to UPDATE, which then fails.
106+
_ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false);
107+
}
108+
}
94109
}
95110

96-
if (metadata.UpdatedAt > lastSynchronization)
111+
if (pullOptions.SaveAfterEveryServiceRequest)
97112
{
98-
lastSynchronization = metadata.UpdatedAt.Value;
99-
bool isAdded = await DeltaTokenStore.SetDeltaTokenAsync(pullResponse.QueryId, metadata.UpdatedAt.Value, cancellationToken).ConfigureAwait(false);
100-
if (isAdded)
101-
{
102-
// Sqlite oddity - you can't add then update; it changes the change type to UPDATE, which then fails.
103-
_ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false);
104-
}
113+
_ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false);
105114
}
115+
116+
context.SendSynchronizationEvent(new SynchronizationEventArgs()
117+
{
118+
EventType = SynchronizationEventType.ItemsCommitted,
119+
EntityType = pullResponse.EntityType,
120+
ItemsProcessed = pullResponse.TotalItemsProcessed,
121+
ItemsTotal = pullResponse.TotalRequestItems,
122+
QueryId = pullResponse.QueryId
123+
});
106124
}
107125

108-
if (pullOptions.SaveAfterEveryServiceRequest)
126+
if (pullResponse.Completed)
109127
{
110-
_ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false);
128+
context.SendSynchronizationEvent(new SynchronizationEventArgs()
129+
{
130+
EventType = SynchronizationEventType.PullEnded,
131+
EntityType = pullResponse.EntityType,
132+
ItemsProcessed = pullResponse.TotalItemsProcessed,
133+
ItemsTotal = pullResponse.TotalRequestItems,
134+
QueryId = pullResponse.QueryId,
135+
Exception = pullResponse.Exception,
136+
ServiceResponse = pullResponse.Exception is DatasyncPullException ex ? ex.ServiceResponse : null
137+
});
111138
}
112139
});
113140

@@ -116,14 +143,34 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
116143
Uri endpoint = ExecutableOperation.MakeAbsoluteUri(pullRequest.HttpClient.BaseAddress, pullRequest.Endpoint);
117144
Uri requestUri = new UriBuilder(endpoint) { Query = pullRequest.QueryDescription.ToODataQueryString() }.Uri;
118145
Type pageType = typeof(Page<>).MakeGenericType(pullRequest.EntityType);
146+
long itemsProcessed = 0;
147+
long totalCount = 0;
119148

120149
try
121150
{
122151
bool completed = false;
152+
// Signal we started the pull operation.
153+
context.SendSynchronizationEvent(new SynchronizationEventArgs()
154+
{
155+
EventType = SynchronizationEventType.PullStarted,
156+
EntityType = pullRequest.EntityType,
157+
QueryId = pullRequest.QueryId
158+
});
123159
do
124160
{
125161
Page<object> page = await GetPageAsync(pullRequest.HttpClient, requestUri, pageType, cancellationToken).ConfigureAwait(false);
126-
databaseUpdateQueue.Enqueue(new PullResponse(pullRequest.EntityType, pullRequest.QueryId, page.Items));
162+
itemsProcessed += page.Items.Count();
163+
totalCount = page.Count ?? totalCount;
164+
165+
context.SendSynchronizationEvent(new SynchronizationEventArgs()
166+
{
167+
EventType = SynchronizationEventType.ItemsFetched,
168+
EntityType = pullRequest.EntityType,
169+
ItemsProcessed = itemsProcessed,
170+
ItemsTotal = page.Count ?? 0,
171+
QueryId = pullRequest.QueryId
172+
});
173+
127174
if (!string.IsNullOrEmpty(page.NextLink))
128175
{
129176
requestUri = new UriBuilder(endpoint) { Query = page.NextLink }.Uri;
@@ -132,12 +179,15 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
132179
{
133180
completed = true;
134181
}
182+
183+
databaseUpdateQueue.Enqueue(new PullResponse(pullRequest.EntityType, pullRequest.QueryId, page.Items, totalCount, itemsProcessed, completed));
135184
}
136185
while (!completed);
137186
}
138187
catch (DatasyncPullException ex)
139188
{
140189
result.AddFailedRequest(requestUri, ex.ServiceResponse);
190+
databaseUpdateQueue.Enqueue(new PullResponse(pullRequest.EntityType, pullRequest.QueryId, [], totalCount, itemsProcessed, true, ex));
141191
}
142192
});
143193

@@ -173,6 +223,8 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
173223
/// <exception cref="DatasyncPullException">Thrown on error</exception>
174224
internal async Task<Page<object>> GetPageAsync(HttpClient client, Uri requestUri, Type pageType, CancellationToken cancellationToken = default)
175225
{
226+
PropertyInfo countPropInfo = pageType.GetProperty("Count")
227+
?? throw new DatasyncException($"Page type '{pageType.Name}' does not have a 'Count' property");
176228
PropertyInfo itemsPropInfo = pageType.GetProperty("Items")
177229
?? throw new DatasyncException($"Page type '{pageType.Name}' does not have an 'Items' property");
178230
PropertyInfo nextLinkPropInfo = pageType.GetProperty("NextLink")
@@ -193,6 +245,7 @@ internal async Task<Page<object>> GetPageAsync(HttpClient client, Uri requestUri
193245

194246
return new Page<object>()
195247
{
248+
Count = (long?)countPropInfo.GetValue(result),
196249
Items = (IEnumerable<object>)itemsPropInfo.GetValue(result)!,
197250
NextLink = (string?)nextLinkPropInfo.GetValue(result)
198251
};
@@ -237,6 +290,10 @@ internal static QueryDescription PrepareQueryDescription(QueryDescription source
237290
/// <param name="EntityType">The type of entity contained within the items.</param>
238291
/// <param name="QueryId">The query ID for the request.</param>
239292
/// <param name="Items">The list of items to process.</param>
293+
/// <param name="TotalRequestItems">The total number of items in the current pull request.</param>
294+
/// <param name="TotalItemsProcessed">The total number of items processed, <paramref name="Items"/> included.</param>
295+
/// <param name="Completed">If <c>true</c>, indicates that the pull request is completed.</param>
296+
/// <param name="Exception">Indicates an exception occured during fetching of data</param>
240297
[ExcludeFromCodeCoverage]
241-
internal record PullResponse(Type EntityType, string QueryId, IEnumerable<object> Items);
298+
internal record PullResponse(Type EntityType, string QueryId, IEnumerable<object> Items, long TotalRequestItems, long TotalItemsProcessed, bool Completed, Exception? Exception = null);
242299
}

src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,16 +270,42 @@ internal async Task<PushResult> PushAsync(IEnumerable<Type> entityTypes, PushOpt
270270

271271
// Determine the list of queued operations in scope.
272272
List<DatasyncOperation> queuedOperations = await GetQueuedOperationsAsync(entityTypeNames, cancellationToken).ConfigureAwait(false);
273+
274+
// Signal we started the push operation.
275+
this._context.SendSynchronizationEvent(new SynchronizationEventArgs()
276+
{
277+
EventType = SynchronizationEventType.PushStarted,
278+
ItemsTotal = queuedOperations.Count
279+
});
280+
273281
if (queuedOperations.Count == 0)
274282
{
283+
// Signal we ended the push operation.
284+
this._context.SendSynchronizationEvent(new SynchronizationEventArgs()
285+
{
286+
EventType = SynchronizationEventType.PushEnded,
287+
ItemsProcessed = 0,
288+
ItemsTotal = 0
289+
});
275290
return pushResult;
276291
}
277292

293+
int nrItemsProcessed = 0;
294+
278295
// Push things in parallel, according to the PushOptions
279296
QueueHandler<DatasyncOperation> queueHandler = new(pushOptions.ParallelOperations, async operation =>
280297
{
281298
ServiceResponse? response = await PushOperationAsync(operation, cancellationToken).ConfigureAwait(false);
282299
pushResult.AddOperationResult(operation, response);
300+
// We can run on multiple threads, so use Interlocked to update the number of items processed.
301+
int newItemsProcessed = Interlocked.Increment(ref nrItemsProcessed);
302+
this._context.SendSynchronizationEvent(new SynchronizationEventArgs()
303+
{
304+
EventType = SynchronizationEventType.PushItem,
305+
ItemsProcessed = newItemsProcessed,
306+
ItemsTotal = queuedOperations.Count,
307+
PushOperation = operation,
308+
});
283309
});
284310

285311
// Enqueue and process all the queued operations in scope
@@ -288,6 +314,14 @@ internal async Task<PushResult> PushAsync(IEnumerable<Type> entityTypes, PushOpt
288314

289315
// Save the changes, this time we don't update the queue.
290316
_ = await this._context.SaveChangesAsync(acceptAllChangesOnSuccess: true, addToQueue: false, cancellationToken).ConfigureAwait(false);
317+
318+
this._context.SendSynchronizationEvent(new SynchronizationEventArgs()
319+
{
320+
EventType = SynchronizationEventType.PushEnded,
321+
ItemsProcessed = nrItemsProcessed,
322+
ItemsTotal = queuedOperations.Count,
323+
});
324+
291325
return pushResult;
292326
}
293327

0 commit comments

Comments
 (0)