Skip to content

Commit 821edbe

Browse files
authored
Merge pull request #91 from tomasfabian/89-specify-types-as-struct-using-the-modelbuilder
89 specify types as struct using the modelbuilder
2 parents be0ee7d + e333f8c commit 821edbe

File tree

13 files changed

+206
-20
lines changed

13 files changed

+206
-20
lines changed

Samples/ksqlDB.RestApi.Client.Sample/ModelBuilders/PaymentModelBuilder.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,19 @@ public async Task InitModelAndCreateStreamAsync(CancellationToken cancellationTo
3434
.Property(c => c.Header)
3535
.WithHeader(header);
3636

37+
modelBuilder.Entity<Record>()
38+
.Property(b => b.Headers)
39+
.AsStruct()
40+
.WithHeaders();
41+
42+
modelBuilder.Entity<KeyValuePair>()
43+
.Property(c => c.Key)
44+
.HasColumnName(nameof(KeyValuePair.Key).ToUpper());
45+
46+
modelBuilder.Entity<KeyValuePair>()
47+
.Property(c => c.Value)
48+
.HasColumnName(nameof(KeyValuePair.Value).ToUpper());
49+
3750
var restApiProvider = ConfigureRestApiClientWithServicesCollection(new ServiceCollection(), modelBuilder);
3851

3952
var entityCreationMetadata = new EntityCreationMetadata(kafkaTopic: nameof(Payment), partitions: 1);
@@ -94,3 +107,14 @@ record PocoWithHeader
94107
{
95108
public byte[] Header { get; init; } = null!;
96109
}
110+
111+
record KeyValuePair
112+
{
113+
public string Key { get; set; } = null!;
114+
public byte[] Value { get; set; } = null!;
115+
}
116+
117+
record Record
118+
{
119+
public KeyValuePair[] Headers { get; init; } = null!;
120+
}

Samples/ksqlDB.RestApi.Client.Sample/ksqlDB.RestApi.Client.Samples.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
</PropertyGroup>
1010

1111
<ItemGroup>
12-
<PackageReference Include="ksqlDb.RestApi.Client" Version="6.2.0" />
12+
<PackageReference Include="ksqlDb.RestApi.Client" Version="6.3.0" />
1313
<!-- <ProjectReference Include="..\..\ksqlDb.RestApi.Client\ksqlDb.RestApi.Client.csproj" /> -->
1414
<!-- <PackageReference Include="ksqlDb.RestApi.Client.ProtoBuf" Version="4.0.0" /> -->
1515
<ProjectReference Include="..\..\ksqlDb.RestApi.Client.ProtoBuf\ksqlDb.RestApi.Client.ProtoBuf.csproj" />

Tests/ksqlDB.RestApi.Client.Tests/FluentAPI/Builders/ModelBuilderTests.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,37 @@ public void Headers()
234234

235235
metadata.HasHeaders.Should().BeTrue();
236236
}
237+
238+
private record KeyValuePair
239+
{
240+
public string Key { get; set; } = null!;
241+
public byte[] Value { get; set; } = null!;
242+
}
243+
244+
private record Record
245+
{
246+
public KeyValuePair[] Headers { get; init; } = null!;
247+
}
248+
249+
[Test]
250+
public void AsStruct()
251+
{
252+
//Arrange
253+
254+
//Act
255+
var fieldTypeBuilder = builder.Entity<Record>()
256+
.Property(b => b.Headers)
257+
.AsStruct();
258+
259+
//Assert
260+
fieldTypeBuilder.Should().NotBeNull();
261+
262+
var entityMetadata = ((IMetadataProvider)builder).GetEntities().FirstOrDefault(c => c.Type == typeof(Record));
263+
entityMetadata.Should().NotBeNull();
264+
265+
var metadata = entityMetadata!.FieldsMetadata.First(c => c.IsStruct && c.Path == "Headers");
266+
metadata.IsStruct.Should().BeTrue();
267+
}
237268
}
238269

239270
internal record Payment

Tests/ksqlDB.RestApi.Client.Tests/KSql/RestApi/Generators/StatementGeneratorTests.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using FluentAssertions;
2+
using ksqlDb.RestApi.Client.FluentAPI.Builders;
23
using ksqlDB.RestApi.Client.KSql.RestApi.Enums;
34
using ksqlDB.RestApi.Client.KSql.RestApi.Generators;
45
using ksqlDB.RestApi.Client.KSql.RestApi.Serialization;
@@ -162,6 +163,36 @@ public void CreateOrReplaceTableWithEnumProperty()
162163
//Assert
163164
statement.Should().Contain($"{nameof(PortType)} VARCHAR");
164165
}
166+
private record KeyValuePair
167+
{
168+
public string Key { get; set; } = null!;
169+
public byte[] Value { get; set; } = null!;
170+
}
171+
172+
private record Record
173+
{
174+
public KeyValuePair[] Headers { get; init; } = null!;
175+
}
176+
177+
[Test]
178+
public void CreateTable_UseModelBuilder_WithFieldAsStruct()
179+
{
180+
//Arrange
181+
var modelBuilder = new ModelBuilder();
182+
modelBuilder.Entity<Record>()
183+
.Property(b => b.Headers)
184+
.AsStruct();
185+
186+
var creationMetadata = new EntityCreationMetadata("my_topic", partitions: 3);
187+
188+
//Act
189+
var statement = new StatementGenerator(modelBuilder).CreateTable<Record>(creationMetadata, ifNotExists: true);
190+
191+
//Assert
192+
statement.Should().Be(@"CREATE TABLE IF NOT EXISTS Records (
193+
Headers ARRAY<STRUCT<Key VARCHAR, Value BYTES>>
194+
) WITH ( KAFKA_TOPIC='my_topic', VALUE_FORMAT='Json', PARTITIONS='3' );".ReplaceLineEndings());
195+
}
165196
}
166197

167198
internal class Port

Tests/ksqlDB.RestApi.Client.Tests/KSql/RestApi/Statements/KSqlTypeTranslatorTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ namespace ksqlDb.RestApi.Client.Tests.KSql.RestApi.Statements
1212
public class KSqlTypeTranslatorTests
1313
{
1414
private ModelBuilder modelBuilder = null!;
15-
private KSqlTypeTranslator kSqlTypeTranslator = null!;
15+
private KSqlTypeTranslator<Poco> kSqlTypeTranslator = null!;
1616

1717
[SetUp]
1818
public void Init()
@@ -396,7 +396,7 @@ public void Translate_UseModelBuilderConfiguration()
396396
.Decimal(10, 2);
397397

398398
//Act
399-
string ksqlType = kSqlTypeTranslator.Translate(type);
399+
string ksqlType = kSqlTypeTranslator.Translate(type, type.GetMember(nameof(Poco.Amount))[0]);
400400

401401
//Assert
402402
ksqlType.Should().Be($"{KSqlTypes.Struct}<{nameof(Poco.Amount)} {KSqlTypes.Decimal}(10,2)>");

docs/modelbuilder.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,34 @@ CREATE TABLE IF NOT EXISTS Accounts (
9696
) WITH ( KAFKA_TOPIC='Account', VALUE_FORMAT='Json', PARTITIONS='1', REPLICAS='3' );
9797
```
9898

99+
## Dependency injection
100+
This setup ensures that the `ksqlDB` client, context, and model configuration are properly injected throughout your application.
101+
102+
```C#
103+
using ksqlDb.RestApi.Client.DependencyInjection;
104+
using ksqlDb.RestApi.Client.FluentAPI.Builders;
105+
using ksqlDB.RestApi.Client.KSql.Query.Options;
106+
using ksqlDB.RestApi.Client.KSql.RestApi.Enums;
107+
108+
var builder = WebApplication.CreateBuilder(args);
109+
110+
ModelBuilder modelBuilder = new();
111+
112+
builder.Services.ConfigureKSqlDb(
113+
builder.Configuration.GetConnectionString("KafkaConnection")!,
114+
parameters =>
115+
{
116+
parameters
117+
.SetAutoOffsetReset(AutoOffsetReset.Latest)
118+
.SetIdentifierEscaping(IdentifierEscaping.Always)
119+
.SetJsonSerializerOptions(options =>
120+
{
121+
options.PropertyNameCaseInsensitive = true;
122+
});
123+
}
124+
).AddSingleton(modelBuilder);
125+
```
126+
99127
## IFromItemTypeConfiguration
100128
**v5.0.0**
101129

@@ -248,3 +276,44 @@ The KSQL snippet illustrates an example INSERT statement with the overridden col
248276
INSERT INTO Payments (Id, Amount, Desc)
249277
VALUES ('1', 33, 'Purchase');
250278
```
279+
280+
### AsStruct
281+
**v6.3.0**
282+
283+
The `AsStruct` function designates fields in entity types as ksqlDB struct types.
284+
285+
The following code showcases how to use the `AsStruct` method in the fluent API to infer the underlying `ksqlDB` type as a struct during code generation:
286+
287+
```C#
288+
private record KeyValuePair
289+
{
290+
public string Key { get; set; } = null!;
291+
public byte[] Value { get; set; } = null!;
292+
}
293+
294+
private record Record
295+
{
296+
public KeyValuePair[] Headers { get; init; } = null!;
297+
}
298+
```
299+
300+
```C#
301+
ModelBuilder builder = new();
302+
303+
builder.Entity<Record>()
304+
.Property(b => b.Headers)
305+
.AsStruct();
306+
307+
var creationMetadata = new EntityCreationMetadata("my_topic", partitions: 3);
308+
309+
var ksql = new StatementGenerator(builder).CreateTable<Record>(creationMetadata, ifNotExists: true);
310+
```
311+
312+
The KSQL snippet illustrates an example CREATE TABLE statement with the injected `STRUCT` type,
313+
showing how it corresponds to the fluent API configuration:
314+
315+
```SQL
316+
CREATE TABLE IF NOT EXISTS Records (
317+
Headers ARRAY<STRUCT<Key VARCHAR, Value BYTES>>
318+
) WITH ( KAFKA_TOPIC='my_topic', VALUE_FORMAT='Json', PARTITIONS='3' );
319+
```

ksqlDb.RestApi.Client/ChangeLog.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
# ksqlDB.RestApi.Client
22

3+
# 6.3.0
4+
- added `AsStruct` function to the Fluent API for marking fields as ksqldb `STRUCT` types #89 (proposed by @mrt181)
5+
36
# 6.2.1
47

58
## BugFix
6-
- Source table can't be queried #87 - fixed `KSqlDbRestApiClient.CreateSourceTableAsync` and `KSqlDbRestApiClient.CreateSourceStreamAsync`
9+
- source table can't be queried #87 - fixed `KSqlDbRestApiClient.CreateSourceTableAsync` and `KSqlDbRestApiClient.CreateSourceStreamAsync`
710

811
# 6.2.0
912

ksqlDb.RestApi.Client/FluentAPI/Builders/FieldTypeBuilder.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ public interface IFieldTypeBuilder<TProperty>
2626
/// <param name="columnName">The name of the column in the record schema.</param>
2727
/// <returns>The same <see cref="IFieldTypeBuilder{TProperty}"/> instance so that multiple calls can be chained.</returns>
2828
IFieldTypeBuilder<TProperty> HasColumnName(string columnName);
29+
30+
/// <summary>
31+
/// Marks the field as a ksqldb STRUCT type.
32+
/// </summary>
33+
/// <returns>The field type builder for chaining additional configuration.</returns>
34+
IFieldTypeBuilder<TProperty> AsStruct();
2935
}
3036

3137
internal class FieldTypeBuilder<TProperty>(FieldMetadata fieldMetadata)
@@ -37,6 +43,12 @@ public IFieldTypeBuilder<TProperty> HasColumnName(string columnName)
3743
return this;
3844
}
3945

46+
public IFieldTypeBuilder<TProperty> AsStruct()
47+
{
48+
fieldMetadata.IsStruct = true;
49+
return this;
50+
}
51+
4052
public IFieldTypeBuilder<TProperty> Ignore()
4153
{
4254
fieldMetadata.Ignore = true;

ksqlDb.RestApi.Client/KSql/RestApi/Generators/TypeGenerator.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ namespace ksqlDB.RestApi.Client.KSql.RestApi.Generators;
1111

1212
internal sealed class TypeGenerator(IMetadataProvider metadataProvider) : EntityInfo(metadataProvider)
1313
{
14-
private readonly KSqlTypeTranslator typeTranslator = new(metadataProvider);
15-
1614
internal string Print<T>(TypeProperties properties)
1715
{
1816
StringBuilder stringBuilder = new();
@@ -30,11 +28,13 @@ private void PrintProperties<T>(StringBuilder stringBuilder, IdentifierEscaping
3028
{
3129
var ksqlProperties = new List<string>();
3230

31+
KSqlTypeTranslator<T> typeTranslator = new(metadataProvider);
32+
3333
foreach (var memberInfo in Members<T>())
3434
{
3535
var type = GetMemberType(memberInfo);
3636

37-
var ksqlType = typeTranslator.Translate(type, escaping);
37+
var ksqlType = typeTranslator.Translate(type, memberInfo, escaping);
3838

3939
var memberName = memberInfo.GetMemberName(metadataProvider);
4040
var columnDefinition = $"{EscapeName(memberName, escaping)} {ksqlType}{typeTranslator.ExploreAttributes(typeof(T), memberInfo, type)}";

ksqlDb.RestApi.Client/KSql/RestApi/Statements/CreateEntity.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ internal sealed class CreateEntity(IMetadataProvider metadataProvider) : EntityI
1313
{
1414
private readonly StringBuilder stringBuilder = new();
1515

16-
private readonly KSqlTypeTranslator typeTranslator = new(metadataProvider);
17-
1816
internal string Print<T>(StatementContext statementContext, EntityCreationMetadata metadata, bool? ifNotExists)
1917
{
2018
stringBuilder.Clear();
@@ -42,12 +40,13 @@ internal string Print<T>(StatementContext statementContext, EntityCreationMetada
4240
private void PrintProperties<T>(StatementContext statementContext, EntityCreationMetadata metadata)
4341
{
4442
var ksqlProperties = new List<string>();
43+
KSqlTypeTranslator<T> typeTranslator = new(metadataProvider);
4544

4645
foreach (var memberInfo in Members<T>(metadata.IncludeReadOnlyProperties))
4746
{
4847
var type = GetMemberType(memberInfo);
4948

50-
var ksqlType = typeTranslator.Translate(type, metadata.IdentifierEscaping);
49+
var ksqlType = typeTranslator.Translate(type, memberInfo, metadata.IdentifierEscaping);
5150

5251
var columnName = IdentifierUtil.Format(memberInfo, metadata.IdentifierEscaping, metadataProvider);
5352
string columnDefinition = $"\t{columnName} {ksqlType}{typeTranslator.ExploreAttributes(typeof(T), memberInfo, type)}";

ksqlDb.RestApi.Client/KSql/RestApi/Statements/KSqlTypeTranslator.cs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@
99

1010
namespace ksqlDB.RestApi.Client.KSql.RestApi.Statements
1111
{
12-
internal sealed class KSqlTypeTranslator(IMetadataProvider metadataProvider) : EntityInfo(metadataProvider)
12+
internal sealed class KSqlTypeTranslator<TEntity>(IMetadataProvider metadataProvider) : EntityInfo(metadataProvider)
1313
{
1414
private readonly IMetadataProvider metadataProvider = metadataProvider;
1515
private readonly DecimalTypeTranslator decimalTypeTranslator = new(metadataProvider);
1616

17-
internal string Translate(Type type, IdentifierEscaping escaping = IdentifierEscaping.Never)
17+
internal string Translate(Type type, MemberInfo? memberInfo = null, IdentifierEscaping escaping = IdentifierEscaping.Never)
1818
{
1919
var ksqlType = string.Empty;
2020

@@ -25,16 +25,16 @@ internal string Translate(Type type, IdentifierEscaping escaping = IdentifierEsc
2525
var elementType = type.GetElementType();
2626
if (elementType == null)
2727
throw new InvalidOperationException(nameof(elementType));
28-
var elementTypeName = Translate(elementType, escaping);
28+
var elementTypeName = Translate(elementType, memberInfo, escaping);
2929

3030
ksqlType = $"{KSqlTypes.Array}<{elementTypeName}>";
3131
}
3232
else if (type.IsDictionary())
3333
{
3434
Type[] typeParameters = type.GetGenericArguments();
3535

36-
var keyType = Translate(typeParameters[0], escaping);
37-
var valueType = Translate(typeParameters[1], escaping);
36+
var keyType = Translate(typeParameters[0], memberInfo, escaping);
37+
var valueType = Translate(typeParameters[1], memberInfo, escaping);
3838

3939
ksqlType = $"{KSqlTypes.Map}<{keyType}, {valueType}>";
4040
}
@@ -58,7 +58,7 @@ internal string Translate(Type type, IdentifierEscaping escaping = IdentifierEsc
5858
ksqlType = KSqlTypes.Time;
5959
else if (type == typeof(DateTimeOffset))
6060
ksqlType = KSqlTypes.Timestamp;
61-
else if (!type.IsGenericType && type.TryGetAttribute<StructAttribute>() != null)
61+
else if (!type.IsGenericType && IsStructType(type, memberInfo))
6262
{
6363
var ksqlProperties = GetProperties(type, escaping);
6464

@@ -88,7 +88,7 @@ internal string Translate(Type type, IdentifierEscaping escaping = IdentifierEsc
8888

8989
if (elementType != null)
9090
{
91-
string ksqlElementType = Translate(elementType, escaping);
91+
string ksqlElementType = Translate(elementType, memberInfo, escaping);
9292

9393
ksqlType = $"{KSqlTypes.Array}<{ksqlElementType}>";
9494
}
@@ -97,6 +97,22 @@ internal string Translate(Type type, IdentifierEscaping escaping = IdentifierEsc
9797
return ksqlType;
9898
}
9999

100+
private bool IsStructType(Type type, MemberInfo? memberInfo)
101+
{
102+
if (type.TryGetAttribute<StructAttribute>() != null)
103+
return true;
104+
105+
if (memberInfo == null)
106+
return false;
107+
108+
var entityMetadata = metadataProvider.GetEntities().FirstOrDefault(c => c.Type == typeof(TEntity));
109+
var fieldMetadata = entityMetadata?.GetFieldMetadataBy(memberInfo);
110+
return fieldMetadata is
111+
{
112+
IsStruct: true
113+
};
114+
}
115+
100116
internal IEnumerable<string> GetProperties(Type type, IdentifierEscaping escaping)
101117
{
102118
var ksqlProperties = new List<string>();
@@ -105,7 +121,7 @@ internal IEnumerable<string> GetProperties(Type type, IdentifierEscaping escapin
105121
{
106122
var memberType = GetMemberType(memberInfo);
107123

108-
var ksqlType = Translate(memberType, escaping);
124+
var ksqlType = Translate(memberType, memberInfo, escaping);
109125

110126
string columnDefinition = $"{memberInfo.Format(escaping, metadataProvider as ModelBuilder)} {ksqlType}{ExploreAttributes(type, memberInfo, memberType)}";
111127

0 commit comments

Comments
 (0)