Skip to content

(#384) added pull progress report via event OfflineDbContext.SynchronizationProgress #387

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jul 9, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/CommunityToolkit.Datasync.Client/Offline/OfflineDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ public abstract partial class OfflineDbContext : DbContext
/// </summary>
internal OperationsQueueManager QueueManager { get; }

/// <summary>
/// An event delegate that allows the app to monitor synchronization events.
/// </summary>
/// <remarks>This event can be called from background threads.</remarks>
public event EventHandler<SynchronizationEventArgs>? SynchronizationProgress;

/// <summary>
/// Initializes a new instance of the <see cref="OfflineDbContext" /> class. The
/// <see cref="OnConfiguring(DbContextOptionsBuilder)" /> method will be called to
Expand Down Expand Up @@ -561,6 +567,15 @@ public async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, bool add
return await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Sends a synchronization event to the consumers.
/// </summary>
/// <param name="eventArgs">The event arguments.</param>
internal void SendSynchronizationEvent(SynchronizationEventArgs eventArgs)
{
SynchronizationProgress?.Invoke(this, eventArgs);
}

#region IDisposable
/// <summary>
/// Ensure that the context has not been disposed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
}
}

context.SendSynchronizationEvent(new SynchronizationEventArgs()
{
EventType = SynchronizationEventType.ItemsCommitted,
EntityType = pullResponse.EntityType,
ItemsProcessed = pullResponse.TotalItemsProcessed,
TotalNrItems = pullResponse.TotalRequestItems,
QueryId = pullResponse.QueryId
});

if (pullOptions.SaveAfterEveryServiceRequest)
{
_ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false);
Expand All @@ -120,10 +129,22 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
try
{
bool completed = false;
long itemsProcessed = 0;
do
{
Page<object> page = await GetPageAsync(pullRequest.HttpClient, requestUri, pageType, cancellationToken).ConfigureAwait(false);
databaseUpdateQueue.Enqueue(new PullResponse(pullRequest.EntityType, pullRequest.QueryId, page.Items));
itemsProcessed += page.Items.Count();

context.SendSynchronizationEvent(new SynchronizationEventArgs()
{
EventType = SynchronizationEventType.ItemsFetched,
EntityType = pullRequest.EntityType,
ItemsProcessed = itemsProcessed,
TotalNrItems = page.Count ?? 0,
QueryId = pullRequest.QueryId
});

databaseUpdateQueue.Enqueue(new PullResponse(pullRequest.EntityType, pullRequest.QueryId, page.Items, page.Count ?? 0, itemsProcessed));
if (!string.IsNullOrEmpty(page.NextLink))
{
requestUri = new UriBuilder(endpoint) { Query = page.NextLink }.Uri;
Expand Down Expand Up @@ -173,6 +194,8 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
/// <exception cref="DatasyncPullException">Thrown on error</exception>
internal async Task<Page<object>> GetPageAsync(HttpClient client, Uri requestUri, Type pageType, CancellationToken cancellationToken = default)
{
PropertyInfo countPropInfo = pageType.GetProperty("Count")
?? throw new DatasyncException($"Page type '{pageType.Name}' does not have a 'Count' property");
PropertyInfo itemsPropInfo = pageType.GetProperty("Items")
?? throw new DatasyncException($"Page type '{pageType.Name}' does not have an 'Items' property");
PropertyInfo nextLinkPropInfo = pageType.GetProperty("NextLink")
Expand All @@ -193,6 +216,7 @@ internal async Task<Page<object>> GetPageAsync(HttpClient client, Uri requestUri

return new Page<object>()
{
Count = (long?)countPropInfo.GetValue(result),
Items = (IEnumerable<object>)itemsPropInfo.GetValue(result)!,
NextLink = (string?)nextLinkPropInfo.GetValue(result)
};
Expand Down Expand Up @@ -237,6 +261,8 @@ internal static QueryDescription PrepareQueryDescription(QueryDescription source
/// <param name="EntityType">The type of entity contained within the items.</param>
/// <param name="QueryId">The query ID for the request.</param>
/// <param name="Items">The list of items to process.</param>
/// <param name="TotalRequestItems">The total number of items in the current pull request.</param>
/// <param name="TotalItemsProcessed">The total number of items processed, <paramref name="Items"/> included.</param>
[ExcludeFromCodeCoverage]
internal record PullResponse(Type EntityType, string QueryId, IEnumerable<object> Items);
internal record PullResponse(Type EntityType, string QueryId, IEnumerable<object> Items, long TotalRequestItems, long TotalItemsProcessed);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

namespace CommunityToolkit.Datasync.Client.Offline;

/// <summary>
/// The list of synchronization events that we support.
/// </summary>
public enum SynchronizationEventType
{
/// <summary>
/// Occurs when items have been successfully fetches from the server.
/// </summary>
/// <remarks>This event is raised after a page of entities was succesfully fetched from the server, ready to be commited to the data store.</remarks>
ItemsFetched,

/// <summary>
/// Occurs when items have been successfully committed to the underlying data store.
/// </summary>
/// <remarks>This event is raised after a page of entities was succesfully commited to the database</remarks>
ItemsCommitted,
}

/// <summary>
/// The event arguments sent when a synchronization event occurs.
/// </summary>
public class SynchronizationEventArgs
{
/// <summary>
/// The type of event.
/// </summary>
public required SynchronizationEventType EventType { get; init; }

/// <summary>
/// The EntityType that is being processed.
/// </summary>
public required Type EntityType { get; init; }

/// <summary>
/// When pulling records, the number of items that have been processed in the current pull request.
/// </summary>
public long ItemsProcessed { get; init; } = -1;

/// <summary>
/// The total number of items in the current pull request.
/// </summary>
public long TotalNrItems { get; init; }

/// <summary>
/// The query ID that is being processed
/// </summary>
public required string QueryId { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,59 @@ public async Task DbSet_PushAsync_Throws_OnNonOfflineDbContext()
}
#endregion

#region SynchronizationProgress
[Fact]
public async Task SynchronizationProgress_Event_Works()
{
Page<ClientMovie> page1 = CreatePage(5, 20, "$skip=5");
Page<ClientMovie> page2 = CreatePage(5, 20, "$skip=10");
Page<ClientMovie> page3 = CreatePage(5, 20, "$skip=15");
Page<ClientMovie> page4 = CreatePage(5, 20);

this.context.Handler.AddResponse(HttpStatusCode.OK, page1);
this.context.Handler.AddResponse(HttpStatusCode.OK, page2);
this.context.Handler.AddResponse(HttpStatusCode.OK, page3);
this.context.Handler.AddResponse(HttpStatusCode.OK, page4);

bool eventFiredForFetch = true;
bool eventFiredForCommit = true;
Copy link
Preview

Copilot AI Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialize 'eventFiredForCommit' to false to ensure proper verification that the event is raised during synchronization.

Suggested change
bool eventFiredForFetch = true;
bool eventFiredForCommit = true;
bool eventFiredForFetch = false;
bool eventFiredForCommit = false;

Copilot uses AI. Check for mistakes.

long currentItemsFetched = 0;
long currentItemsCommited = 0;

this.context.SynchronizationProgress += (sender, args) =>
{
sender.Should().Be(this.context);
args.EntityType.Should().Be<ClientMovie>();
args.QueryId.Should().Be("CommunityToolkit.Datasync.TestCommon.Databases.ClientMovie");
args.TotalNrItems.Should().Be(20);
switch(args.EventType)
{
case SynchronizationEventType.ItemsFetched:
currentItemsFetched += 5;
args.ItemsProcessed.Should().Be(currentItemsFetched);
eventFiredForFetch = true;
break;
case SynchronizationEventType.ItemsCommitted:
currentItemsCommited += 5;
args.ItemsProcessed.Should().Be(currentItemsCommited);
eventFiredForCommit = true;
break;
default:
Assert.Fail($"Invalid event type: {args.EventType}");
break;
}
};

await this.context.Movies.PullAsync();

eventFiredForFetch.Should().BeTrue();
eventFiredForCommit.Should().BeTrue();
currentItemsFetched.Should().Be(20);
currentItemsCommited.Should().Be(20);
}

#endregion

public class NotOfflineDbContext : DbContext
{
public NotOfflineDbContext() : base()
Expand Down