Skip to content

Commit 1803f37

Browse files
committed
[ksqlDB.RestApi.Client] - wiki group join - query syntax
1 parent 77afe9a commit 1803f37

File tree

1 file changed

+47
-1
lines changed

1 file changed

+47
-1
lines changed

README.md

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2952,6 +2952,8 @@ SELECT STRUCT(Property := 42) AS Value FROM Locations EMIT CHANGES;
29522952
### multiple joins with query comprehension syntax (GroupJoin, SelectMany, DefaultIfEmpty)
29532953

29542954
```C#
2955+
using ksqlDB.RestApi.Client.KSql.RestApi.Statements.Annotations;
2956+
29552957
class Order
29562958
{
29572959
public int OrderId { get; set; }
@@ -2961,12 +2963,14 @@ class Order
29612963

29622964
class Payment
29632965
{
2966+
[Key]
29642967
public int Id { get; set; }
29652968
}
29662969

29672970
record Shipment
29682971
{
2969-
public int Id { get; set; }
2972+
[Key]
2973+
public int? Id { get; set; }
29702974
}
29712975
```
29722976

@@ -3004,6 +3008,48 @@ ON o.ShipmentId = sa.Id
30043008
EMIT CHANGES LIMIT 5;
30053009
```
30063010

3011+
Creation of entities for the above mentioned query:
3012+
3013+
```C#
3014+
var entityCreationMetadata = new EntityCreationMetadata
3015+
{
3016+
KafkaTopic = nameof(Order) + "-Join",
3017+
Partitions = 1
3018+
};
3019+
3020+
var response = await restApiClient.CreateStreamAsync<Order>(entityCreationMetadata, ifNotExists: true);
3021+
response = await restApiClient.CreateTableAsync<Payment>(entityCreationMetadata with { KafkaTopic = nameof(Payment) }, ifNotExists: true);
3022+
response = await restApiClient.CreateTableAsync<Shipment>(entityCreationMetadata with { KafkaTopic = nameof(Shipment) }, ifNotExists: true);
3023+
```
3024+
3025+
Listen to the incoming record messages:
3026+
3027+
```C#
3028+
using var subscription = query
3029+
.Subscribe(c => {
3030+
Console.WriteLine($"{nameof(Order.OrderId)}: {c.orderId}");
3031+
3032+
Console.WriteLine($"{nameof(Order.PaymentId)}: {c.paymentId}");
3033+
3034+
if (c.shipmentId.HasValue)
3035+
Console.WriteLine($"{nameof(Order.ShipmentId)}: {c.shipmentId}");
3036+
3037+
}, error => {
3038+
Console.WriteLine(error.Message);
3039+
});
3040+
```
3041+
3042+
Inserting of sample data:
3043+
3044+
```C#
3045+
var order = new Order { OrderId = 1, PaymentId = 1, ShipmentId = 1 };
3046+
var payment = new Payment { Id = 1 };
3047+
var shipment = new Shipment { Id = 1 };
3048+
3049+
response = await restApiClient.InsertIntoAsync(order);
3050+
response = await restApiClient.InsertIntoAsync(payment);
3051+
response = await restApiClient.InsertIntoAsync(shipment);
3052+
```
30073053

30083054
# LinqPad samples
30093055
[Push Query](https://github.com/tomasfabian/ksqlDB.RestApi.Client-DotNet/tree/main/Samples/ksqlDB.RestApi.Client.LinqPad/ksqlDB.RestApi.Client.linq)

0 commit comments

Comments
 (0)