Skip to content

Commit 6ee2834

Browse files
committed
[ksqlDB.RestApi.Client] - group join - query syntax sample
1 parent f5be50a commit 6ee2834

File tree

1 file changed

+73
-14
lines changed
  • Samples/ksqlDB.RestApi.Client.Sample/Joins

1 file changed

+73
-14
lines changed
Lines changed: 73 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,79 @@
1-
using ksqlDB.RestApi.Client.KSql.Linq;
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using ksqlDB.RestApi.Client.KSql.Linq;
25
using ksqlDB.RestApi.Client.KSql.Query.Context;
6+
using ksqlDB.RestApi.Client.KSql.RestApi;
7+
using ksqlDB.RestApi.Client.KSql.RestApi.Statements;
8+
using ksqlDB.RestApi.Client.KSql.RestApi.Statements.Annotations;
39

410
namespace ksqlDB.Api.Client.Samples.Joins
511
{
612
public class Joins
713
{
8-
public Joins()
14+
private readonly IKSqlDbRestApiClient restApiClient;
15+
private readonly IKSqlDBContext context;
16+
17+
public Joins(IKSqlDbRestApiClient restApiClient, IKSqlDBContext context)
18+
{
19+
this.restApiClient = restApiClient ?? throw new ArgumentNullException(nameof(restApiClient));
20+
this.context = context ?? throw new ArgumentNullException(nameof(context));
21+
}
22+
23+
public async Task SubscribeAsync()
924
{
10-
var ksqlDbUrl = @"http:\\localhost:8088";
25+
var entityCreationMetadata = new EntityCreationMetadata
26+
{
27+
KafkaTopic = nameof(Order) + "-Join",
28+
Partitions = 1
29+
};
1130

12-
var context = new KSqlDBContext(ksqlDbUrl);
31+
var response = await restApiClient.CreateStreamAsync<Order>(entityCreationMetadata, ifNotExists: true);
32+
var r = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
33+
response = await restApiClient.CreateTableAsync<Payment>(entityCreationMetadata with { KafkaTopic = nameof(Payment) + "-Join" }, ifNotExists: true);
34+
response = await restApiClient.CreateTableAsync<Shipment>(entityCreationMetadata with { KafkaTopic = nameof(Shipment) + "-Join" }, ifNotExists: true);
35+
36+
var value = new Foo { Prop = 42 };
1337

1438
var query = (from o in context.CreateQueryStream<Order>()
15-
join p1 in Source.Of<Payment>() on o.PaymentId equals p1.Id
16-
join s1 in Source.Of<Shipment>() on o.ShipmentId equals s1.Id into gj
17-
from sa in gj.DefaultIfEmpty()
18-
select new
19-
{
20-
orderId = o.OrderId,
21-
shipmentId = sa.Id,
22-
paymentId = p1.Id,
23-
})
39+
join p1 in Source.Of<Payment>() on o.PaymentId equals p1.Id
40+
join s1 in Source.Of<Shipment>() on o.ShipmentId equals s1.Id into gj
41+
from sa in gj.DefaultIfEmpty()
42+
select new
43+
{
44+
value,
45+
orderId = o.OrderId,
46+
shipmentId = sa.Id,
47+
paymentId = p1.Id,
48+
})
2449
.Take(5);
50+
51+
string ksql = query.ToQueryString();
52+
53+
using var subscription = query
54+
.Subscribe(c => {
55+
Console.WriteLine($"{nameof(Order.OrderId)}: {c.orderId}");
56+
57+
Console.WriteLine($"{nameof(Order.PaymentId)}: {c.paymentId}");
58+
59+
if (c.shipmentId.HasValue)
60+
Console.WriteLine($"{nameof(Order.ShipmentId)}: {c.shipmentId}");
61+
62+
}, error => {
63+
Console.WriteLine(error.Message);
64+
});
65+
66+
var order = new Order { OrderId = 1, PaymentId = 1, ShipmentId = 1 };
67+
var payment = new Payment { Id = 1 };
68+
var shipment = new Shipment { Id = 1 };
69+
70+
response = await restApiClient.InsertIntoAsync(order);
71+
r = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
72+
73+
response = await restApiClient.InsertIntoAsync(payment);
74+
75+
await Task.Delay(TimeSpan.FromMilliseconds(250));
76+
response = await restApiClient.InsertIntoAsync(shipment);
2577
}
2678
}
2779

@@ -34,11 +86,18 @@ class Order
3486

3587
class Payment
3688
{
89+
[Key]
3790
public int Id { get; set; }
3891
}
3992

4093
record Shipment
4194
{
42-
public int Id { get; set; }
95+
[Key]
96+
public int? Id { get; set; }
97+
}
98+
99+
struct Foo
100+
{
101+
public int Prop { get; set; }
43102
}
44103
}

0 commit comments

Comments
 (0)