12
12
using ksqlDB . RestApi . Client . KSql . RestApi . Serialization ;
13
13
using ksqlDB . RestApi . Client . KSql . Query . Operators ;
14
14
15
- const string ksqlDbUrl = @ "http://localhost:8088";
15
+ const string ksqlDbUrl = "http://localhost:8088" ;
16
16
17
17
var servicesCollection = new ServiceCollection ( ) ;
18
18
servicesCollection . ConfigureKSqlDb ( ksqlDbUrl ) ;
19
19
20
+ var cancellationTokenSource = new CancellationTokenSource ( ) ;
21
+
20
22
var serviceProvider = servicesCollection . BuildServiceProvider ( ) ;
21
- IKSqlDbRestApiClient ksqlDbRestApiClient = serviceProvider . GetRequiredService < IKSqlDbRestApiClient > ( ) ;
23
+ var ksqlDbRestApiClient = serviceProvider . GetRequiredService < IKSqlDbRestApiClient > ( ) ;
24
+ var context = serviceProvider . GetRequiredService < IKSqlDBContext > ( ) ;
22
25
26
+ await SubscriptionToAComplexTypeAsync ( ksqlDbRestApiClient , context , cancellationTokenSource . Token ) ;
23
27
24
28
Console . WriteLine ( "Press any key to stop the subscription" ) ;
25
29
26
30
Console . ReadKey ( ) ;
27
31
28
32
#pragma warning disable CS8321 // Local function is declared but never used
29
33
30
- static async Task StructType ( KSqlDBContext context )
34
+ static async Task StructType ( IKSqlDBContext context )
31
35
{
32
36
var moviesStream = context . CreatePushQuery < Movie > ( ) ;
33
37
@@ -43,7 +47,7 @@ static async Task StructType(KSqlDBContext context)
43
47
}
44
48
}
45
49
46
- static IDisposable Arrays ( KSqlDBContext context )
50
+ static IDisposable Arrays ( IKSqlDBContext context )
47
51
{
48
52
var subscription =
49
53
context . CreatePushQuery < Movie > ( )
@@ -55,10 +59,12 @@ static IDisposable Arrays(KSqlDBContext context)
55
59
. Select ( _ => new [ ] { 1 , 2 , 3 } . Length )
56
60
. ToQueryString ( ) ;
57
61
62
+ Console . WriteLine ( arrayLengthQuery ) ;
63
+
58
64
return subscription ;
59
65
}
60
66
61
- static IDisposable NestedTypes ( KSqlDBContext context )
67
+ static IDisposable NestedTypes ( IKSqlDBContext context )
62
68
{
63
69
var disposable =
64
70
context . CreatePushQuery < Movie > ( )
@@ -84,7 +90,7 @@ static IDisposable NestedTypes(KSqlDBContext context)
84
90
return disposable ;
85
91
}
86
92
87
- static async Task DeeplyNestedTypes ( KSqlDBContext context )
93
+ static async Task DeeplyNestedTypes ( IKSqlDBContext context )
88
94
{
89
95
var moviesStream = context . CreatePushQuery < Movie > ( ) ;
90
96
@@ -126,7 +132,7 @@ static async Task DeeplyNestedTypes(KSqlDBContext context)
126
132
127
133
#region TimeTypes
128
134
129
- static async Task TimeTypes ( IKSqlDbRestApiClient restApiClient , IKSqlDBContext context )
135
+ static async Task TimeTypes ( IKSqlDbRestApiClient restApiClient , IKSqlDBContext context , CancellationToken cancellationToken = default )
130
136
{
131
137
EntityCreationMetadata metadata = new EntityCreationMetadata ( nameof ( Dates ) )
132
138
{
@@ -135,7 +141,9 @@ static async Task TimeTypes(IKSqlDbRestApiClient restApiClient, IKSqlDBContext c
135
141
ValueFormat = SerializationFormats . Json
136
142
} ;
137
143
138
- var httpResponseMessage = await restApiClient . CreateStreamAsync < Dates > ( metadata ) ;
144
+ var httpResponseMessage = await restApiClient . CreateStreamAsync < Dates > ( metadata , cancellationToken : cancellationToken ) ;
145
+ var content = await httpResponseMessage . Content . ReadAsStringAsync ( cancellationToken ) ;
146
+ Console . WriteLine ( content ) ;
139
147
140
148
var from = new TimeSpan ( 1 , 0 , 0 ) ;
141
149
var to = new TimeSpan ( 22 , 0 , 0 ) ;
@@ -161,7 +169,7 @@ static async Task TimeTypes(IKSqlDbRestApiClient restApiClient, IKSqlDBContext c
161
169
DtOffset = new DateTimeOffset ( 2021 , 7 , 4 , 13 , 29 , 45 , 447 , TimeSpan . FromHours ( 4 ) )
162
170
} ;
163
171
164
- httpResponseMessage = await restApiClient . InsertIntoAsync ( value ) ;
172
+ httpResponseMessage = await restApiClient . InsertIntoAsync ( value , cancellationToken : cancellationToken ) ;
165
173
var statementResponses = await httpResponseMessage . ToStatementResponsesAsync ( ) . ConfigureAwait ( false ) ;
166
174
}
167
175
@@ -174,17 +182,19 @@ static void Bytes(IKSqlDBContext ksqlDbContext)
174
182
. ToQueryString ( ) ;
175
183
}
176
184
177
- static async Task SubscriptionToAComplexTypeAsync ( IKSqlDbRestApiClient restApiClient , IKSqlDBContext ksqlDbContext )
185
+ static async Task SubscriptionToAComplexTypeAsync ( IKSqlDbRestApiClient restApiClient , IKSqlDBContext ksqlDbContext , CancellationToken cancellationToken = default )
178
186
{
179
187
string typeName = nameof ( EventCategory ) ;
180
- var httpResponseMessage = await restApiClient . DropTypeIfExistsAsync ( typeName ) ;
188
+ var httpResponseMessage = await restApiClient . DropTypeIfExistsAsync ( typeName , cancellationToken ) ;
189
+ var content = await httpResponseMessage . Content . ReadAsStringAsync ( cancellationToken ) ;
190
+ Console . WriteLine ( content ) ;
181
191
182
- httpResponseMessage = await restApiClient . ExecuteStatementAsync ( new KSqlDbStatement ( @$ "
192
+ _ = await restApiClient . ExecuteStatementAsync ( new KSqlDbStatement ( @$ "
183
193
Drop table { nameof ( Event ) } ;
184
194
" ) ) ;
185
195
186
- httpResponseMessage = await restApiClient . CreateTypeAsync < EventCategory > ( ) ;
187
- httpResponseMessage = await restApiClient . CreateTableAsync < Event > ( new EntityCreationMetadata ( "Events" ) { Partitions = 1 } ) ;
196
+ await restApiClient . CreateTypeAsync < EventCategory > ( ) ;
197
+ await restApiClient . CreateTableAsync < Event > ( new EntityCreationMetadata ( "Events" ) { Partitions = 1 } ) ;
188
198
189
199
var subscription = ksqlDbContext . CreatePushQuery < Event > ( )
190
200
. Subscribe ( value =>
@@ -202,6 +212,8 @@ static async Task SubscriptionToAComplexTypeAsync(IKSqlDbRestApiClient restApiCl
202
212
203
213
httpResponseMessage = await restApiClient . ExecuteStatementAsync ( new KSqlDbStatement ( @"
204
214
INSERT INTO Events (Id, Places, Categories) VALUES (1, ARRAY['Place1','Place2','Place3'], ARRAY[STRUCT(Name := 'Planet Earth'), STRUCT(Name := 'Discovery')]);" ) ) ;
215
+ content = await httpResponseMessage . Content . ReadAsStringAsync ( cancellationToken ) ;
216
+ Console . WriteLine ( content ) ;
205
217
}
206
218
207
219
#pragma warning restore CS8321 // Local function is declared but never used
0 commit comments