5
5
using ksqlDB . Api . Client . IntegrationTests . Models ;
6
6
using ksqlDB . Api . Client . IntegrationTests . Models . Movies ;
7
7
using ksqlDB . RestApi . Client . KSql . Linq ;
8
+ using ksqlDB . RestApi . Client . KSql . Query . Context ;
8
9
using ksqlDB . RestApi . Client . KSql . Query . Functions ;
10
+ using ksqlDB . RestApi . Client . KSql . RestApi . Statements ;
11
+ using ksqlDB . RestApi . Client . KSql . RestApi . Statements . Annotations ;
9
12
using Microsoft . VisualStudio . TestTools . UnitTesting ;
10
13
11
14
namespace ksqlDB . Api . Client . IntegrationTests . KSql . Linq
@@ -162,5 +165,89 @@ public async Task FullOuterJoinTest(IQbservable<Movie2> querySource)
162
165
163
166
actualValues [ 2 ] . Title . Should ( ) . BeOneOf ( MoviesProvider . Movie1 . Title , MoviesProvider . Movie2 . Title , null ) ;
164
167
}
168
+
169
+ class Order
170
+ {
171
+ public int OrderId { get ; set ; }
172
+ public int PaymentId { get ; set ; }
173
+ public int ShipmentId { get ; set ; }
174
+ }
175
+
176
+ #region MultipleJoins
177
+
178
+ class Payment
179
+ {
180
+ [ Key ]
181
+ public int Id { get ; set ; }
182
+ }
183
+
184
+ record Shipment
185
+ {
186
+ [ Key ]
187
+ public int ? Id { get ; set ; }
188
+ }
189
+
190
+ struct Foo
191
+ {
192
+ public int Prop { get ; set ; }
193
+ }
194
+
195
+ [ TestMethod ]
196
+ public async Task MultipleJoins_QuerySyntax ( )
197
+ {
198
+ //Arrange
199
+ int expectedItemsCount = 1 ;
200
+
201
+ var entityCreationMetadata = new EntityCreationMetadata
202
+ {
203
+ KafkaTopic = nameof ( Order ) + "-TestJoin" ,
204
+ Partitions = 1
205
+ } ;
206
+
207
+ var response = await RestApiProvider . CreateStreamAsync < Order > ( entityCreationMetadata , ifNotExists : true ) ;
208
+ response = await RestApiProvider . CreateTableAsync < Payment > ( entityCreationMetadata with { KafkaTopic = nameof ( Payment ) + "-TestJoin" } , ifNotExists : true ) ;
209
+ response = await RestApiProvider . CreateTableAsync < Shipment > ( entityCreationMetadata with { KafkaTopic = nameof ( Shipment ) + "-TestJoin" } , ifNotExists : true ) ;
210
+
211
+ var ksqlDbUrl = @"http:\\localhost:8088" ;
212
+
213
+ var context = new KSqlDBContext ( ksqlDbUrl ) ;
214
+
215
+ var value = new Foo { Prop = 42 } ;
216
+
217
+ var query = ( from o in context . CreateQueryStream < Order > ( )
218
+ join p1 in Source . Of < Payment > ( ) on o . PaymentId equals p1 . Id
219
+ join s1 in Source . Of < Shipment > ( ) on o . ShipmentId equals s1 . Id into gj
220
+ from sa in gj . DefaultIfEmpty ( )
221
+ select new
222
+ {
223
+ value ,
224
+ orderId = o . OrderId ,
225
+ shipmentId = sa . Id ,
226
+ paymentId = p1 . Id ,
227
+ } )
228
+ . Take ( 1 ) ;
229
+
230
+ var order = new Order { OrderId = 1 , PaymentId = 1 , ShipmentId = 1 } ;
231
+ var payment = new Payment { Id = 1 } ;
232
+ var shipment = new Shipment { Id = 1 } ;
233
+
234
+ response = await RestApiProvider . InsertIntoAsync ( order ) ;
235
+ response = await RestApiProvider . InsertIntoAsync ( payment ) ;
236
+ response = await RestApiProvider . InsertIntoAsync ( shipment ) ;
237
+
238
+ //Act
239
+ var actualValues = await CollectActualValues ( query . ToAsyncEnumerable ( ) , expectedItemsCount ) ;
240
+
241
+ //Assert
242
+ Assert . AreEqual ( expectedItemsCount , actualValues . Count ) ;
243
+
244
+ actualValues [ 0 ] . orderId . Should ( ) . Be ( 1 ) ;
245
+ actualValues [ 0 ] . paymentId . Should ( ) . Be ( 1 ) ;
246
+ var shipmentId = actualValues [ 0 ] . shipmentId ;
247
+ if ( shipmentId . HasValue )
248
+ shipmentId . Should ( ) . Be ( 1 ) ;
249
+ }
250
+
251
+ #endregion
165
252
}
166
253
}
0 commit comments