Skip to content

Commit f717ff6

Browse files
committed
[ksqlDB.RestApi.Client]: added cancellationToken parameters to example functions
1 parent 79f6577 commit f717ff6

File tree

1 file changed

+14
-14
lines changed

1 file changed

+14
-14
lines changed

Samples/ksqlDB.RestApi.Client.Sample/Program.cs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public static async Task Main(string[] args)
109109
Console.WriteLine();
110110
}, onError: error => { Console.WriteLine($"Exception: {error.Message}"); }, onCompleted: () => Console.WriteLine("Completed"));
111111

112-
await CreateOrReplaceTableStatement(context);
112+
await CreateOrReplaceTableStatement(context, cancellationTokenSource.Token);
113113

114114
await moviesProvider.InsertMovieAsync(MoviesProvider.Movie1);
115115
await moviesProvider.InsertMovieAsync(MoviesProvider.Movie2);
@@ -182,15 +182,15 @@ protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage reques
182182
}
183183
}
184184

185-
private static async Task ConfigureKSqlDbWithServicesCollection_AndTryAsync(string ksqlDbUrl)
185+
private static async Task ConfigureKSqlDbWithServicesCollection_AndTryAsync(string ksqlDbUrl, CancellationToken cancellationToken = default)
186186
{
187187
var services = new ServiceCollection();
188188

189189
services.AddDbContext<IKSqlDBContext, KSqlDBContext>(c =>
190190
{
191191
c.UseKSqlDb(ksqlDbUrl);
192192

193-
c.ReplaceHttpClient<ksqlDB.RestApi.Client.KSql.RestApi.Http.IHttpClientFactory, ksqlDB.RestApi.Client.KSql.RestApi.Http.HttpClientFactory>(_ => { })
193+
c.ReplaceHttpClient<KSql.RestApi.Http.IHttpClientFactory, KSql.RestApi.Http.HttpClientFactory>(_ => { })
194194
.AddHttpMessageHandler(_ => new DebugHandler());
195195
});
196196

@@ -219,17 +219,17 @@ private static async Task ConfigureKSqlDbWithServicesCollection_AndTryAsync(stri
219219
Console.WriteLine("Completed");
220220
});
221221

222-
await semaphoreSlim.WaitAsync();
222+
await semaphoreSlim.WaitAsync(cancellationToken);
223223

224224
await context.DisposeAsync();
225225
}
226226

227-
private static async Task AddAndSaveChangesAsync(KSqlDBContext context)
227+
private static async Task AddAndSaveChangesAsync(KSqlDBContext context, CancellationToken cancellationToken = default)
228228
{
229229
context.Add(MoviesProvider.Movie1);
230230
context.Add(MoviesProvider.Movie2);
231231

232-
var saveResponse = await context.SaveChangesAsync();
232+
var saveResponse = await context.SaveChangesAsync(cancellationToken);
233233
}
234234

235235
private static async Task SubscribeAsync(IKSqlDBContext context)
@@ -323,10 +323,10 @@ private static IDisposable ToObservableExample(string ksqlDbUrl)
323323
return subscriptions;
324324
}
325325

326-
private static async Task DescribeFunction(IKSqlDbRestApiClient restApiProvider, string functionName)
326+
private static async Task DescribeFunction(IKSqlDbRestApiClient restApiProvider, string functionName, CancellationToken cancellationToken = default)
327327
{
328-
var httpResponseMessage = await restApiProvider.ExecuteStatementAsync(new KSqlDbStatement($"DESCRIBE FUNCTION {functionName};"));
329-
var content = await httpResponseMessage.Content.ReadAsStringAsync();
328+
var httpResponseMessage = await restApiProvider.ExecuteStatementAsync(new KSqlDbStatement($"DESCRIBE FUNCTION {functionName};"), cancellationToken);
329+
var content = await httpResponseMessage.Content.ReadAsStringAsync(cancellationToken);
330330
Console.WriteLine(content);
331331
}
332332

@@ -401,7 +401,7 @@ private static IDisposable Entries(KSqlDBContext context)
401401

402402
Console.WriteLine($"{key} - {value}");
403403
}
404-
}, error => { });
404+
}, _ => { });
405405

406406
return subscription;
407407
}
@@ -439,7 +439,7 @@ private class Book
439439
public int ReleaseYear { get; set; }
440440
}
441441

442-
private static async Task InsertIntoSelectAsync(KSqlDbRestApiProvider restApiProvider, KSqlDBContext context)
442+
private static async Task InsertIntoSelectAsync(KSqlDbRestApiProvider restApiProvider, KSqlDBContext context, CancellationToken cancellationToken = default)
443443
{
444444
string streamName = "book";
445445
EntityCreationMetadata metadata = new(streamName)
@@ -450,18 +450,18 @@ private static async Task InsertIntoSelectAsync(KSqlDbRestApiProvider restApiPro
450450
ValueFormat = SerializationFormats.Json,
451451
ShouldPluralizeEntityName = false
452452
};
453-
await restApiProvider.CreateOrReplaceStreamAsync<Book>(metadata);
453+
await restApiProvider.CreateOrReplaceStreamAsync<Book>(metadata, cancellationToken);
454454

455455
string streamNameFrom = "book_from";
456456
metadata.EntityName = streamNameFrom;
457457
metadata.KafkaTopic = streamNameFrom;
458-
await restApiProvider.CreateOrReplaceStreamAsync<Book>(metadata);
458+
await restApiProvider.CreateOrReplaceStreamAsync<Book>(metadata, cancellationToken);
459459

460460
string queryId = "insert_query_book";
461461

462462
var response = await context.CreatePushQuery<Book>(streamNameFrom)
463463
.Where(c => c.Title != "Apocalypse now")
464-
.InsertIntoAsync(streamName, queryId);
464+
.InsertIntoAsync(streamName, queryId, cancellationToken);
465465

466466
var responses = await response.ToStatementResponsesAsync();
467467
Console.WriteLine($"QueryId: {responses[0].CommandStatus?.QueryId}");

0 commit comments

Comments
 (0)