Skip to content

Commit 221b640

Browse files
committed
[ksqlDB.RestApi.Client]: added Pause/Resume PersistentQueryAsync examples
1 parent 9f82720 commit 221b640

File tree

1 file changed

+15
-2
lines changed

1 file changed

+15
-2
lines changed

Samples/ksqlDB.RestApi.Client.Sample/Program.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
using ksqlDB.RestApi.Client.KSql.Query.Functions;
2323
using ksqlDB.RestApi.Client.KSql.Query.Operators;
2424
using ksqlDB.RestApi.Client.KSql.Query.Options;
25+
using ksqlDb.RestApi.Client.KSql.Query.PushQueries;
2526
using ksqlDB.RestApi.Client.KSql.Query.Windows;
2627
using ksqlDB.RestApi.Client.KSql.RestApi;
2728
using ksqlDB.RestApi.Client.KSql.RestApi.Extensions;
@@ -209,7 +210,7 @@ private static async Task AddAndSaveChangesAsync(KSqlDBContext context)
209210
var saveResponse = await context.SaveChangesAsync();
210211
}
211212

212-
private static async Task SubscribeAsync(IKSqlDBContext context)
213+
private static async Task SubscribeAsync(IKSqlDBContext context, IKSqlDbRestApiClient restApiProvider)
213214
{
214215
var cts = new CancellationTokenSource();
215216

@@ -379,6 +380,8 @@ private static IDisposable FullOuterJoinTables(KSqlDBContext context)
379380

380381
private static IDisposable Window(KSqlDBContext context)
381382
{
383+
new TimeWindows(Duration.OfSeconds(2), OutputRefinement.Final).WithGracePeriod(Duration.OfSeconds(2));
384+
382385
var subscription1 = context.CreateQueryStream<Tweet>()
383386
.GroupBy(c => c.Id)
384387
.WindowedBy(new TimeWindows(Duration.OfSeconds(5)).WithGracePeriod(Duration.OfHours(2)))
@@ -869,7 +872,17 @@ private static async Task TerminatePersistentQueryAsync(IKSqlDbRestApiClient res
869872

870873
var query = queries.SelectMany(c => c.Queries).FirstOrDefault(c => c.SinkKafkaTopics.Contains(topicName));
871874

872-
var response = await restApiClient.TerminatePersistentQueryAsync(query.Id);
875+
if (query == null)
876+
return;
877+
878+
await TerminatePersistentQueryAsync(restApiClient, query.Id);
879+
}
880+
881+
private static async Task TerminatePersistentQueryAsync(IKSqlDbRestApiClient restApiClient, string queryId)
882+
{
883+
var response = await restApiClient.PausePersistentQueryAsync(queryId);
884+
response = await restApiClient.ResumePersistentQueryAsync(queryId);
885+
response = await restApiClient.TerminatePersistentQueryAsync(queryId);
873886
}
874887

875888
private static async Task TerminatePushQueryAsync(IKSqlDBContext context, IKSqlDbRestApiClient restApiClient)

0 commit comments

Comments
 (0)