From 9fc01c1930564ce6e507464e2faacd4f44c05757 Mon Sep 17 00:00:00 2001 From: Fred Vollmer Date: Wed, 30 Oct 2024 14:33:13 -0400 Subject: [PATCH 1/4] Allow for strongly-typed query properties --- .../KSql/Linq/ComplexTypesTests.cs | 9 ++-- .../KSql/Linq/QbservableExtensionsTests.cs | 18 +++---- .../KSqlDbServiceCollectionExtensionsTests.cs | 20 ++++---- ...elCaseStringEnumConverterAttributeTests.cs | 29 +++++++++++ .../Context/KSqlDBContextOptionsTests.cs | 24 ++++------ .../KSql/Query/Context/KSqlDBContextTests.cs | 38 ++++++--------- .../KSqlDbContextOptionsBuilderTests.cs | 23 ++++----- .../KSql/Query/KStreamSetDependenciesTests.cs | 12 ++--- ...s => AutoOffsetResetSerializationTests.cs} | 23 ++++----- ... ProcessingGuaranteeSerializationTests.cs} | 32 ++++++------- .../AutoOffsetResetExtensionsTests.cs | 32 ------------- .../QueryParametersExtensionsTests.cs | 24 +++++----- .../QueryStreamEndpointParametersTests.cs | 8 ++-- .../SnakeCaseJsonStringEnumConverter.cs | 14 ++++++ .../Query/Context/KSqlDBContextOptions.cs | 14 ++---- .../KSql/Query/Options/AutoOffsetReset.cs | 5 ++ .../Options/AutoOffsetResetExtensions.cs | 34 ------------- .../KSql/Query/Options/ProcessingGuarantee.cs | 7 ++- .../Options/ProcessingGuaranteeExtensions.cs | 48 ------------------- .../KSql/RestApi/Parameters/IQueryOptions.cs | 8 ++-- .../RestApi/Parameters/IQueryParameters.cs | 9 ++-- .../Parameters/QueryEndpointParameters.cs | 18 +++---- .../RestApi/Parameters/QueryParameters.cs | 11 ++--- .../QueryStreamEndpointParameters.cs | 18 +++---- .../Parameters/QueryStreamParameters.cs | 10 ++-- .../ksqlDb.RestApi.Client.csproj | 4 +- 26 files changed, 198 insertions(+), 294 deletions(-) create mode 100644 Tests/ksqlDB.RestApi.Client.Tests/Infrastructure/Attributes/JsonCamelCaseStringEnumConverterAttributeTests.cs rename Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Options/{AutoOffsetResetExtensionsTests.cs => AutoOffsetResetSerializationTests.cs} (63%) rename Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Options/{ProcessingGuaranteeExtensionsTests.cs => ProcessingGuaranteeSerializationTests.cs} (57%) delete mode 100644 Tests/ksqlDB.RestApi.Client.Tests/KSql/RestApi/Parameters/AutoOffsetResetExtensionsTests.cs create mode 100644 ksqlDb.RestApi.Client/Infrastructure/Attributes/SnakeCaseJsonStringEnumConverter.cs delete mode 100644 ksqlDb.RestApi.Client/KSql/Query/Options/AutoOffsetResetExtensions.cs delete mode 100644 ksqlDb.RestApi.Client/KSql/Query/Options/ProcessingGuaranteeExtensions.cs diff --git a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/ComplexTypesTests.cs b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/ComplexTypesTests.cs index bfd4cd0e..616c7318 100644 --- a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/ComplexTypesTests.cs +++ b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/ComplexTypesTests.cs @@ -5,6 +5,7 @@ using ksqlDB.RestApi.Client.KSql.Linq; using ksqlDB.RestApi.Client.KSql.Query.Context; using ksqlDB.RestApi.Client.KSql.Query.Functions; +using ksqlDB.RestApi.Client.KSql.Query.Options; using ksqlDB.RestApi.Client.KSql.RestApi; using ksqlDB.RestApi.Client.KSql.RestApi.Parameters; using ksqlDB.RestApi.Client.KSql.RestApi.Statements; @@ -110,9 +111,9 @@ public void TransformMap_WithNestedMap() QueryStreamParameters queryStreamParameters = new() { - Sql = ksql, - [QueryStreamParameters.AutoOffsetResetPropertyName] = "earliest", + Sql = ksql }; + queryStreamParameters.Set(QueryStreamParameters.AutoOffsetResetPropertyName, AutoOffsetReset.Earliest); //Act var source = Context.CreatePushQuery(queryStreamParameters) @@ -147,9 +148,9 @@ public void TransformMap_WithNestedStruct() QueryStreamParameters queryStreamParameters = new() { - Sql = ksql, - [QueryStreamParameters.AutoOffsetResetPropertyName] = "earliest", + Sql = ksql }; + queryStreamParameters.Set(QueryStreamParameters.AutoOffsetResetPropertyName, AutoOffsetReset.Earliest); //Act var source = Context.CreatePushQuery(queryStreamParameters) diff --git a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/QbservableExtensionsTests.cs b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/QbservableExtensionsTests.cs index 697977fd..8fdaa58a 100644 --- a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/QbservableExtensionsTests.cs +++ b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Linq/QbservableExtensionsTests.cs @@ -445,8 +445,8 @@ public async Task QueryRawKSql() QueryStreamParameters queryParameters = new() { Sql = ksql, - [QueryStreamParameters.AutoOffsetResetPropertyName] = "earliest", }; + queryParameters.Set(QueryStreamParameters.AutoOffsetResetPropertyName, AutoOffsetReset.Earliest); var source = Context.CreatePushQuery(queryParameters); @@ -467,9 +467,9 @@ public async Task QueryStreamRawKSql() QueryStreamParameters queryStreamParameters = new() { - Sql = ksql, - [QueryStreamParameters.AutoOffsetResetPropertyName] = "earliest", + Sql = ksql }; + queryStreamParameters.Set(QueryStreamParameters.AutoOffsetResetPropertyName, AutoOffsetReset.Earliest); var source = Context.CreatePushQuery(queryStreamParameters); @@ -656,9 +656,9 @@ public async Task SelectAsInt() QueryStreamParameters queryStreamParameters = new() { - Sql = ksql, - [QueryParameters.AutoOffsetResetPropertyName] = "earliest", + Sql = ksql }; + queryStreamParameters.Set(QueryStreamParameters.AutoOffsetResetPropertyName, AutoOffsetReset.Earliest); var source = Context.CreatePushQuery(queryStreamParameters); @@ -680,9 +680,9 @@ public async Task SelectAsArray() QueryStreamParameters queryStreamParameters = new() { - Sql = ksql, - [QueryParameters.AutoOffsetResetPropertyName] = "earliest", + Sql = ksql }; + queryStreamParameters.Set(QueryStreamParameters.AutoOffsetResetPropertyName, AutoOffsetReset.Earliest); var source = Context.CreatePushQuery(queryStreamParameters); @@ -709,9 +709,9 @@ public async Task SelectAsStruct() QueryStreamParameters queryStreamParameters = new() { - Sql = ksql, - [QueryParameters.AutoOffsetResetPropertyName] = "earliest", + Sql = ksql }; + queryStreamParameters.Set(QueryStreamParameters.AutoOffsetResetPropertyName, AutoOffsetReset.Earliest); var source = Context.CreatePushQuery(queryStreamParameters); diff --git a/Tests/ksqlDB.RestApi.Client.Tests/DependencyInjection/KSqlDbServiceCollectionExtensionsTests.cs b/Tests/ksqlDB.RestApi.Client.Tests/DependencyInjection/KSqlDbServiceCollectionExtensionsTests.cs index 2a32dc27..fd07dba5 100644 --- a/Tests/ksqlDB.RestApi.Client.Tests/DependencyInjection/KSqlDbServiceCollectionExtensionsTests.cs +++ b/Tests/ksqlDB.RestApi.Client.Tests/DependencyInjection/KSqlDbServiceCollectionExtensionsTests.cs @@ -35,7 +35,7 @@ public void ConfigureKSqlDb_IKSqlDBContext() //Assert var descriptor = ClassUnderTest.TryGetRegistration(); - + descriptor.Should().NotBeNull(); descriptor!.Lifetime.Should().Be(ServiceLifetime.Scoped); } @@ -53,7 +53,7 @@ public void ConfigureKSqlDb_SetupParametersAction() //Assert var descriptor = ClassUnderTest.TryGetRegistration(); - + descriptor.Should().NotBeNull(); descriptor!.Lifetime.Should().Be(ServiceLifetime.Scoped); } @@ -72,7 +72,7 @@ public void ConfigureKSqlDb_BuildServiceProviderAndResolve() //Assert context.Should().NotBeNull(); - context?.ContextOptions.QueryStreamParameters[KSqlDbConfigs.ProcessingGuarantee].ToProcessingGuarantee().Should().Be(ProcessingGuarantee.AtLeastOnce); + context?.ContextOptions.QueryStreamParameters.Get(KSqlDbConfigs.ProcessingGuarantee).Should().Be(ProcessingGuarantee.AtLeastOnce); } [Test] @@ -85,7 +85,7 @@ public void ConfigureKSqlDb_IKSqlDbRestApiClient() //Assert var descriptor = ClassUnderTest.TryGetRegistration(); - + descriptor.Should().NotBeNull(); descriptor!.Lifetime.Should().Be(ServiceLifetime.Scoped); } @@ -113,7 +113,7 @@ public void ConfigureKSqlDb_IHttpClientFactory() //Assert var descriptor = ClassUnderTest.TryGetRegistration(); - + descriptor.Should().NotBeNull(); descriptor!.Lifetime.Should().Be(ServiceLifetime.Transient); } @@ -128,7 +128,7 @@ public void ConfigureKSqlDb_KSqlDBContextOptions() //Assert var descriptor = ClassUnderTest.TryGetRegistration(); - + descriptor.Should().NotBeNull(); descriptor!.Lifetime.Should().Be(ServiceLifetime.Singleton); } @@ -174,7 +174,7 @@ public void AddDbContext_RegisterAsInterface() //Assert context.Should().NotBeNull(); } - + [Test] public void AddDbContext_KSqlDBContext_DefaultLifetimeIsScoped() { @@ -188,7 +188,7 @@ public void AddDbContext_KSqlDBContext_DefaultLifetimeIsScoped() descriptor.Should().NotBeNull(); descriptor!.Lifetime.Should().Be(ServiceLifetime.Scoped); } - + [Test] public void AddDbContext_KSqlDBContext_ContextLifetimeChangedToTransientScope() { @@ -239,9 +239,9 @@ public void AddDbContext_RestApiLifetimeChangedToTransientScope() descriptor.Should().NotBeNull(); descriptor!.Lifetime.Should().Be(ServiceLifetime.Transient); } - + #endregion - + #region ContextFactory [Test] diff --git a/Tests/ksqlDB.RestApi.Client.Tests/Infrastructure/Attributes/JsonCamelCaseStringEnumConverterAttributeTests.cs b/Tests/ksqlDB.RestApi.Client.Tests/Infrastructure/Attributes/JsonCamelCaseStringEnumConverterAttributeTests.cs new file mode 100644 index 00000000..a129d317 --- /dev/null +++ b/Tests/ksqlDB.RestApi.Client.Tests/Infrastructure/Attributes/JsonCamelCaseStringEnumConverterAttributeTests.cs @@ -0,0 +1,29 @@ +using System.Text.Json.Serialization; +using FluentAssertions; +using ksqlDb.RestApi.Client.Infrastructure.Attributes; +using NUnit.Framework; + +namespace ksqlDb.RestApi.Client.Tests.Infrastructure.Attributes +{ + enum TestEnum + { + A, + B + } + + public class JsonSnakeCaseStringEnumConverterAttributeTests + { + [Test] + public void CreatesConverter() + { + // Arrange + var attr = new JsonSnakeCaseStringEnumConverterAttribute(); + + // Act + var converter = attr.CreateConverter(typeof(TestEnum)); + + // Assert + converter.Should().BeOfType>(); + } + } +} diff --git a/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Context/KSqlDBContextOptionsTests.cs b/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Context/KSqlDBContextOptionsTests.cs index a518d218..8bbf9981 100644 --- a/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Context/KSqlDBContextOptionsTests.cs +++ b/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Context/KSqlDBContextOptionsTests.cs @@ -32,7 +32,7 @@ public void Url_ShouldNotBeEmpty() //Assert url.Should().Be(TestParameters.KsqlDbUrl); } - + [Test] public void NotSetBasicAuthCredentials() { @@ -45,7 +45,7 @@ public void NotSetBasicAuthCredentials() ClassUnderTest.BasicAuthUserName.Should().BeEmpty(); ClassUnderTest.BasicAuthPassword.Should().BeEmpty(); } - + [Test] public void SetBasicAuthCredentials() { @@ -73,7 +73,7 @@ public void SetProcessingGuarantee_WasNotSet() //Assert Assert.ThrowsException(() => - ClassUnderTest.QueryStreamParameters[parameterName].Should().BeEmpty()); + ClassUnderTest.QueryStreamParameters.Get(parameterName)); } [Test] @@ -87,9 +87,7 @@ public void SetProcessingGuarantee_SetToAtLeastOnce() ClassUnderTest.SetProcessingGuarantee(processingGuarantee); //Assert - string expectedValue = "at_least_once"; - - ClassUnderTest.QueryStreamParameters[parameterName].Should().Be(expectedValue); + ClassUnderTest.QueryStreamParameters.Get(parameterName).Should().Be(processingGuarantee); } [Test] @@ -103,9 +101,7 @@ public void SetProcessingGuarantee_SetToExactlyOnce() ClassUnderTest.SetProcessingGuarantee(processingGuarantee); //Assert - string expectedValue = "exactly_once"; - - ClassUnderTest.QueryStreamParameters[parameterName].Should().Be(expectedValue); + ClassUnderTest.QueryStreamParameters.Get(parameterName).Should().Be(processingGuarantee); } [Test] @@ -119,9 +115,7 @@ public void SetProcessingGuarantee_SetToExactlyOnceV2() ClassUnderTest.SetProcessingGuarantee(processingGuarantee); //Assert - string expectedValue = "exactly_once_v2"; - - ClassUnderTest.QueryStreamParameters[parameterName].Should().Be(expectedValue); + ClassUnderTest.QueryStreamParameters.Get(parameterName).Should().Be(processingGuarantee); } [Test] @@ -134,12 +128,10 @@ public void SetAutoOffsetReset() ClassUnderTest.SetAutoOffsetReset(autoOffsetReset); //Assert - string expectedValue = autoOffsetReset.ToString().ToLower(); - - ClassUnderTest.QueryStreamParameters[QueryStreamParameters.AutoOffsetResetPropertyName].Should().Be(expectedValue); + ClassUnderTest.QueryStreamParameters.Get(QueryStreamParameters.AutoOffsetResetPropertyName).Should().Be(autoOffsetReset); } - + [Test] public void JsonSerializerOptions() { diff --git a/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Context/KSqlDBContextTests.cs b/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Context/KSqlDBContextTests.cs index e495277c..2fec9618 100644 --- a/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Context/KSqlDBContextTests.cs +++ b/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Context/KSqlDBContextTests.cs @@ -41,13 +41,8 @@ public void CreatePushQuery_Subscribe_KSqlDbProvidersRunWasCalled() public void CreateStreamSet_Subscribe_QueryOptionsWereTakenFromContext() { //Arrange - var contextOptions = new KSqlDBContextOptions(TestParameters.KsqlDbUrl) - { - QueryStreamParameters = - { - [QueryParameters.AutoOffsetResetPropertyName] = AutoOffsetReset.Latest.ToString() - } - }; + var contextOptions = new KSqlDBContextOptions(TestParameters.KsqlDbUrl); + contextOptions.QueryStreamParameters.Set(QueryParameters.AutoOffsetResetPropertyName, AutoOffsetReset.Latest); var context = new TestableDbProvider(contextOptions); @@ -56,7 +51,7 @@ public void CreateStreamSet_Subscribe_QueryOptionsWereTakenFromContext() //Assert context.KSqlDbProviderMock.Verify( - c => c.Run(It.Is(c => c[QueryParameters.AutoOffsetResetPropertyName] == "Latest"), + c => c.Run(It.Is(c => c.Get(QueryParameters.AutoOffsetResetPropertyName) == AutoOffsetReset.Latest), It.IsAny()), Times.Once); } @@ -97,7 +92,7 @@ public void SetAutoOffsetReset_Subscribe_ProcessingGuarantee() //Assert context.KSqlDbProviderMock.Verify( - c => c.Run(It.Is(c => c["auto.offset.reset"] == "latest"), + c => c.Run(It.Is(c => c.Get("auto.offset.reset") == AutoOffsetReset.Latest), It.IsAny()), Times.Once); } @@ -115,7 +110,7 @@ public void CreateStreamSet_Subscribe_ProcessingGuarantee() //Assert context.KSqlDbProviderMock.Verify( - c => c.Run(It.Is(c => c[KSqlDbConfigs.ProcessingGuarantee] == "exactly_once"), + c => c.Run(It.Is(c => c.Get(KSqlDbConfigs.ProcessingGuarantee) == ProcessingGuarantee.ExactlyOnce), It.IsAny()), Times.Once); } @@ -136,13 +131,8 @@ public void CreateStreamSet_CalledMultipleTimes_KSqlQueryGeneratorBuildKSqlWasNo public void CreateStreamSet_Subscribe_KSqlQueryGenerator() { //Arrange - var contextOptions = new KSqlDBContextOptions(TestParameters.KsqlDbUrl) - { - QueryStreamParameters = - { - ["auto.offset.reset"] = "latest" - } - }; + var contextOptions = new KSqlDBContextOptions(TestParameters.KsqlDbUrl); + contextOptions.QueryStreamParameters.Set("auto.offset.reset", AutoOffsetReset.Latest); var context = new TestableDbProvider(contextOptions); @@ -151,7 +141,7 @@ public void CreateStreamSet_Subscribe_KSqlQueryGenerator() //Assert context.KSqlDbProviderMock.Verify( - c => c.Run(It.Is(parameters => parameters["auto.offset.reset"] == "latest"), + c => c.Run(It.Is(parameters => parameters.Get("auto.offset.reset") == AutoOffsetReset.Latest), It.IsAny()), Times.Once); } @@ -190,9 +180,9 @@ public void CreatePushQuery_RawKSQL_ReturnAsyncEnumerable() QueryStreamParameters queryStreamParameters = new QueryStreamParameters { - Sql = ksql, - [QueryStreamParameters.AutoOffsetResetPropertyName] = "earliest", + Sql = ksql }; + queryStreamParameters.Set(QueryStreamParameters.AutoOffsetResetPropertyName, AutoOffsetReset.Earliest); var context = new TestableDbProvider(TestParameters.KsqlDbUrl); @@ -212,9 +202,9 @@ public void CreatePushQuery_InvalidQueryParameterTypeThrows() var queryParameters = new QueryParameters { - Sql = ksql, - [QueryParameters.AutoOffsetResetPropertyName] = "earliest", + Sql = ksql }; + queryParameters.Set(QueryStreamParameters.AutoOffsetResetPropertyName, AutoOffsetReset.Earliest); var context = new TestableDbProvider(TestParameters.KsqlDbUrl); @@ -234,9 +224,9 @@ public void CreateQuery_RawKSQL_ReturnAsyncEnumerable() QueryStreamParameters queryParameters = new QueryStreamParameters { - Sql = ksql, - [QueryStreamParameters.AutoOffsetResetPropertyName] = "earliest", + Sql = ksql }; + queryParameters.Set(QueryStreamParameters.AutoOffsetResetPropertyName, AutoOffsetReset.Earliest); var context = new TestableDbProvider(TestParameters.KsqlDbUrl); diff --git a/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Context/Options/KSqlDbContextOptionsBuilderTests.cs b/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Context/Options/KSqlDbContextOptionsBuilderTests.cs index c81eeccd..94d85a3c 100644 --- a/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Context/Options/KSqlDbContextOptionsBuilderTests.cs +++ b/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Context/Options/KSqlDbContextOptionsBuilderTests.cs @@ -89,7 +89,7 @@ public void SetProcessingGuarantee() var options = setupParameters.SetProcessingGuarantee(ProcessingGuarantee.AtLeastOnce).Options; //Assert - options.QueryStreamParameters[KSqlDbConfigs.ProcessingGuarantee].Should().Be("at_least_once"); + options.QueryStreamParameters.Get(KSqlDbConfigs.ProcessingGuarantee).Should().Be(ProcessingGuarantee.AtLeastOnce); } [Test] @@ -106,7 +106,7 @@ public void SetProcessingGuarantee_ThenSetupPushQuery() }).Options; //Assert - options.QueryStreamParameters[KSqlDbConfigs.ProcessingGuarantee].Should().Be("at_least_once"); + options.QueryStreamParameters.Get(KSqlDbConfigs.ProcessingGuarantee).Should().Be(ProcessingGuarantee.AtLeastOnce); } [Test] @@ -122,8 +122,7 @@ public void SetAutoOffsetReset() .SetAutoOffsetReset(autoOffsetReset).Options; //Assert - string expectedValue = autoOffsetReset.ToString().ToLower(); - options.QueryStreamParameters[QueryStreamParameters.AutoOffsetResetPropertyName].Should().Be(expectedValue); + options.QueryStreamParameters.Get(QueryStreamParameters.AutoOffsetResetPropertyName).Should().Be(autoOffsetReset); } [Test] @@ -182,7 +181,7 @@ public void SetupPushQuery_OptionsQueryStreamParameters_AutoOffsetResetIsSetToDe }).Options; //Assert - options.QueryStreamParameters.Properties[QueryStreamParameters.AutoOffsetResetPropertyName].Should().BeEquivalentTo("earliest"); + options.QueryStreamParameters.Get(QueryStreamParameters.AutoOffsetResetPropertyName).Should().Be(AutoOffsetReset.Earliest); } [Test] @@ -190,13 +189,12 @@ public void SetupPushQueryNotCalled_OptionsQueryStreamParameters_AutoOffsetReset { //Arrange var setupParameters = ClassUnderTest.UseKSqlDb(TestParameters.KsqlDbUrl); - string earliestAtoOffsetReset = AutoOffsetReset.Earliest.ToString().ToLower(); //Act var options = setupParameters.Options; //Assert - options.QueryStreamParameters.Properties[QueryStreamParameters.AutoOffsetResetPropertyName].Should().BeEquivalentTo(earliestAtoOffsetReset); + options.QueryStreamParameters.Get(QueryStreamParameters.AutoOffsetResetPropertyName).Should().Be(AutoOffsetReset.Earliest); } [Test] @@ -209,11 +207,14 @@ public void SetupPushQuery_AmendOptionsQueryStreamParametersProperty_AutoOffsetR //Act var options = setupParameters.SetupPushQuery(c => { - c.Properties[QueryStreamParameters.AutoOffsetResetPropertyName] = latestAtoOffsetReset; + c.Set(QueryStreamParameters.AutoOffsetResetPropertyName, AutoOffsetReset.Latest); }).Options; //Assert - options.QueryStreamParameters.Properties[QueryStreamParameters.AutoOffsetResetPropertyName].Should().BeEquivalentTo(latestAtoOffsetReset); + options.QueryStreamParameters.Get(QueryStreamParameters.AutoOffsetResetPropertyName) + .ToString() + .ToLower() + .Should().Be(latestAtoOffsetReset); } [Test] @@ -258,11 +259,11 @@ public void SetupPullQuery_PropertyWasSet() //Act var options = setupParameters.SetupPullQuery(opt => { - opt[KSqlDbConfigs.KsqlQueryPullTableScanEnabled] = "true"; + opt.Set(KSqlDbConfigs.KsqlQueryPullTableScanEnabled, true); }).Options; //Assert - options.PullQueryParameters.Properties[KSqlDbConfigs.KsqlQueryPullTableScanEnabled].Should().BeEquivalentTo("true"); + options.PullQueryParameters.Get(KSqlDbConfigs.KsqlQueryPullTableScanEnabled).Should().Be(true); } #endregion } diff --git a/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/KStreamSetDependenciesTests.cs b/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/KStreamSetDependenciesTests.cs index 077e085a..c7981f5c 100644 --- a/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/KStreamSetDependenciesTests.cs +++ b/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/KStreamSetDependenciesTests.cs @@ -12,10 +12,8 @@ public class KStreamSetDependenciesTests public void QueryStreamParameters_CloneIsReturned() { //Arrange - var queryStreamParameters = new QueryStreamParameters - { - [QueryStreamParameters.AutoOffsetResetPropertyName] = AutoOffsetReset.Latest.ToKSqlValue() - }; + var queryStreamParameters = new QueryStreamParameters(); + queryStreamParameters.Set(QueryStreamParameters.AutoOffsetResetPropertyName, AutoOffsetReset.Latest); var kStreamSetDependencies = new KStreamSetDependencies(null!, null!, null!, queryStreamParameters); //Act @@ -30,10 +28,8 @@ public void QueryStreamParameters_CloneIsReturned() public void QueryStreamParameters_CloneIsReturned_SqlIsChanged() { //Arrange - var queryStreamParameters = new QueryStreamParameters - { - [QueryStreamParameters.AutoOffsetResetPropertyName] = AutoOffsetReset.Latest.ToKSqlValue() - }; + var queryStreamParameters = new QueryStreamParameters(); + queryStreamParameters.Set(QueryStreamParameters.AutoOffsetResetPropertyName, AutoOffsetReset.Latest); var kStreamSetDependencies = new KStreamSetDependencies(null!, null!, null!, queryStreamParameters); //Act diff --git a/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Options/AutoOffsetResetExtensionsTests.cs b/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Options/AutoOffsetResetSerializationTests.cs similarity index 63% rename from Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Options/AutoOffsetResetExtensionsTests.cs rename to Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Options/AutoOffsetResetSerializationTests.cs index 7bb2d045..559e0e89 100644 --- a/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Options/AutoOffsetResetExtensionsTests.cs +++ b/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Options/AutoOffsetResetSerializationTests.cs @@ -1,3 +1,4 @@ +using System.Text.Json; using FluentAssertions; using ksqlDB.RestApi.Client.KSql.Query.Options; using NUnit.Framework; @@ -13,10 +14,10 @@ public void ToAutoOffsetReset_UnknownValue() //Arrange //Assert - Assert.Throws(() => + Assert.Throws(() => { //Act - "xyz".ToAutoOffsetReset(); + JsonSerializer.Deserialize("\"xyz\""); }); } @@ -26,45 +27,45 @@ public void ToAutoOffsetReset_Earliest() //Arrange //Act - var value = "earliest".ToAutoOffsetReset(); + var value = JsonSerializer.Deserialize("\"earliest\""); //Assert value.Should().Be(AutoOffsetReset.Earliest); } - + [Test] public void ToAutoOffsetReset_Latest() { //Arrange //Act - var value = "latest".ToAutoOffsetReset(); + var value = JsonSerializer.Deserialize("\"latest\""); //Assert value.Should().Be(AutoOffsetReset.Latest); } - + [Test] public void ToKSqlValue_Earliest() { //Arrange //Act - var value = AutoOffsetReset.Earliest.ToKSqlValue(); + var value = JsonSerializer.Serialize(AutoOffsetReset.Earliest); //Assert - value.Should().BeEquivalentTo("earliest"); + value.Should().Be("\"earliest\""); } - + [Test] public void ToKSqlValue_Latest() { //Arrange //Act - var value = AutoOffsetReset.Latest.ToKSqlValue(); + var value = JsonSerializer.Serialize(AutoOffsetReset.Latest); //Assert - value.Should().BeEquivalentTo("latest"); + value.Should().Be("\"latest\""); } } diff --git a/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Options/ProcessingGuaranteeExtensionsTests.cs b/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Options/ProcessingGuaranteeSerializationTests.cs similarity index 57% rename from Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Options/ProcessingGuaranteeExtensionsTests.cs rename to Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Options/ProcessingGuaranteeSerializationTests.cs index de778ce5..da6c6c0b 100644 --- a/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Options/ProcessingGuaranteeExtensionsTests.cs +++ b/Tests/ksqlDB.RestApi.Client.Tests/KSql/Query/Options/ProcessingGuaranteeSerializationTests.cs @@ -1,3 +1,4 @@ +using System.Text.Json; using FluentAssertions; using ksqlDB.RestApi.Client.KSql.Query.Options; using NUnit.Framework; @@ -5,19 +6,18 @@ namespace ksqlDb.RestApi.Client.Tests.KSql.Query.Options; -public class ProcessingGuaranteeExtensionsTests +public class ProcessingGuaranteeSerializationTests { [Test] public void ToProcessingGuarantee_UnknownValue() { //Arrange - //Assert - Assert.Throws(() => + Assert.Throws(() => { //Act - "xyz".ToProcessingGuarantee(); + JsonSerializer.Deserialize("\"xyz\""); }); } @@ -27,7 +27,7 @@ public void ToProcessingGuarantee_ExactlyOnce() //Arrange //Act - var value = ProcessingGuaranteeExtensions.ExactlyOnce.ToProcessingGuarantee(); + var value = JsonSerializer.Deserialize("\"exactlyOnce\""); //Assert value.Should().Be(ProcessingGuarantee.ExactlyOnce); @@ -39,45 +39,45 @@ public void ToProcessingGuarantee_ExactlyOnceV2() //Arrange //Act - var value = ProcessingGuaranteeExtensions.ExactlyOnceV2.ToProcessingGuarantee(); + var value = JsonSerializer.Deserialize("\"exactlyOnceV2\""); //Assert value.Should().Be(ProcessingGuarantee.ExactlyOnceV2); } - + [Test] public void ToProcessingGuarantee_AtLeastOnce() { //Arrange //Act - var value = ProcessingGuaranteeExtensions.AtLeastOnce.ToProcessingGuarantee(); + var value = JsonSerializer.Deserialize("\"atLeastOnce\""); //Assert value.Should().Be(ProcessingGuarantee.AtLeastOnce); } - + [Test] - public void ToKSqlValue_AtLeastOnce() + public void ToKSqlValue_ExactlyOnce() { //Arrange //Act - var value = ProcessingGuarantee.AtLeastOnce.ToKSqlValue(); + var value = JsonSerializer.Serialize(ProcessingGuarantee.ExactlyOnce); //Assert - value.Should().BeEquivalentTo("at_least_once"); + value.Should().Be("\"exactly_once\""); } - + [Test] - public void ToKSqlValue_ExactlyOnce() + public void ToKSqlValue_ExactlyOnceV2() { //Arrange //Act - var value = ProcessingGuarantee.ExactlyOnce.ToKSqlValue(); + var value = JsonSerializer.Serialize(ProcessingGuarantee.ExactlyOnceV2); //Assert - value.Should().BeEquivalentTo("exactly_once"); + value.Should().Be("\"exactly_once_v2\""); } } diff --git a/Tests/ksqlDB.RestApi.Client.Tests/KSql/RestApi/Parameters/AutoOffsetResetExtensionsTests.cs b/Tests/ksqlDB.RestApi.Client.Tests/KSql/RestApi/Parameters/AutoOffsetResetExtensionsTests.cs deleted file mode 100644 index 3468d710..00000000 --- a/Tests/ksqlDB.RestApi.Client.Tests/KSql/RestApi/Parameters/AutoOffsetResetExtensionsTests.cs +++ /dev/null @@ -1,32 +0,0 @@ -using FluentAssertions; -using ksqlDB.RestApi.Client.KSql.Query.Options; -using NUnit.Framework; - -namespace ksqlDb.RestApi.Client.Tests.KSql.RestApi.Parameters; - -public class AutoOffsetResetExtensionsTests -{ - [Test] - public void ToKSqlValue() - { - //Arrange - - //Act - var result = AutoOffsetReset.Latest.ToKSqlValue(); - - //Assert - result.Should().Be("latest"); - } - - [Test] - public void ToAutoOffsetReset() - { - //Arrange - - //Act - var result = "latest".ToAutoOffsetReset(); - - //Assert - result.Should().Be(AutoOffsetReset.Latest); - } -} diff --git a/Tests/ksqlDB.RestApi.Client.Tests/KSql/RestApi/Parameters/QueryParametersExtensionsTests.cs b/Tests/ksqlDB.RestApi.Client.Tests/KSql/RestApi/Parameters/QueryParametersExtensionsTests.cs index 30825696..79d98af8 100644 --- a/Tests/ksqlDB.RestApi.Client.Tests/KSql/RestApi/Parameters/QueryParametersExtensionsTests.cs +++ b/Tests/ksqlDB.RestApi.Client.Tests/KSql/RestApi/Parameters/QueryParametersExtensionsTests.cs @@ -14,9 +14,9 @@ public void FillFrom_DestinationParametersAreSetFromSource() //Arrange var source = new QueryStreamParameters { - Sql = "Select", - ["key"] = "value" + Sql = "Select" }; + source.Set("key", "value"); var destination = new QueryStreamParameters(); //Act @@ -54,7 +54,7 @@ public void ToLogInfo_QueryStreamParameters_NoParameters() var result = queryParameters.ToLogInfo(); //Assert - result.Should().Be(@"Sql: + result.Should().Be(@"Sql: Parameters: ".ReplaceLineEndings()); } @@ -65,18 +65,18 @@ public void ToLogInfo_QueryStreamParameters() //Arrange var queryParameters = new QueryStreamParameters { - AutoOffsetReset = AutoOffsetReset.Earliest, - [KSqlDbConfigs.ProcessingGuarantee] = ProcessingGuarantee.AtLeastOnce.ToKSqlValue() + AutoOffsetReset = AutoOffsetReset.Earliest }; + queryParameters.Set(KSqlDbConfigs.ProcessingGuarantee, ProcessingGuarantee.AtLeastOnce); //Act var result = queryParameters.ToLogInfo(); //Assert - result.Should().Be(@"Sql: + result.Should().Be(@"Sql: Parameters: -auto.offset.reset = earliest -processing.guarantee = at_least_once +auto.offset.reset = ""earliest"" +processing.guarantee = ""at_least_once"" ".ReplaceLineEndings()); } @@ -107,17 +107,17 @@ public void ToLogInfo_QueryParameters() var queryParameters = new QueryParameters { AutoOffsetReset = AutoOffsetReset.Earliest, - [KSqlDbConfigs.ProcessingGuarantee] = ProcessingGuarantee.AtLeastOnce.ToKSqlValue() }; + queryParameters.Set(KSqlDbConfigs.ProcessingGuarantee, ProcessingGuarantee.AtLeastOnce); //Act var result = queryParameters.ToLogInfo(); //Assert - result.Should().Be(@"Sql: + result.Should().Be(@"Sql: Parameters: -ksql.streams.auto.offset.reset = earliest -processing.guarantee = at_least_once +ksql.streams.auto.offset.reset = ""earliest"" +processing.guarantee = ""at_least_once"" ".ReplaceLineEndings()); } } diff --git a/Tests/ksqlDB.RestApi.Client.Tests/KSql/RestApi/Parameters/QueryStreamEndpointParametersTests.cs b/Tests/ksqlDB.RestApi.Client.Tests/KSql/RestApi/Parameters/QueryStreamEndpointParametersTests.cs index fa757b89..2ef093dc 100644 --- a/Tests/ksqlDB.RestApi.Client.Tests/KSql/RestApi/Parameters/QueryStreamEndpointParametersTests.cs +++ b/Tests/ksqlDB.RestApi.Client.Tests/KSql/RestApi/Parameters/QueryStreamEndpointParametersTests.cs @@ -13,9 +13,9 @@ public void Clone() //Arrange var source = new QueryStreamParameters { - Sql = "Select", - ["key"] = "value" + Sql = "Select" }; + source.Set("key", "value"); //Act var clone = source.Clone(); @@ -38,7 +38,7 @@ public void QueryStreamParameters_AutoOffsetReset_CorrectKeyWasUsed() var clone = source.Clone(); //Assert - clone.Properties[QueryStreamParameters.AutoOffsetResetPropertyName].Should().Be(nameof(AutoOffsetReset.Earliest).ToLower()); + clone.Get(QueryStreamParameters.AutoOffsetResetPropertyName).Should().Be(AutoOffsetReset.Earliest); } [Test] @@ -54,7 +54,7 @@ public void QueryParameters_AutoOffsetReset_CorrectKeyWasUsed() var clone = source.Clone(); //Assert - clone.Properties[QueryParameters.AutoOffsetResetPropertyName].Should().Be(nameof(AutoOffsetReset.Latest).ToLower()); + clone.Get(QueryParameters.AutoOffsetResetPropertyName).Should().Be(AutoOffsetReset.Latest); } } } diff --git a/ksqlDb.RestApi.Client/Infrastructure/Attributes/SnakeCaseJsonStringEnumConverter.cs b/ksqlDb.RestApi.Client/Infrastructure/Attributes/SnakeCaseJsonStringEnumConverter.cs new file mode 100644 index 00000000..0dc73e95 --- /dev/null +++ b/ksqlDb.RestApi.Client/Infrastructure/Attributes/SnakeCaseJsonStringEnumConverter.cs @@ -0,0 +1,14 @@ +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace ksqlDb.RestApi.Client.Infrastructure.Attributes +{ + [AttributeUsage(AttributeTargets.Enum)] + public class JsonSnakeCaseStringEnumConverterAttribute : JsonConverterAttribute where TEnum : struct, Enum + { + public override JsonConverter CreateConverter(Type typeToConvert) + { + return new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower); + } + } +} diff --git a/ksqlDb.RestApi.Client/KSql/Query/Context/KSqlDBContextOptions.cs b/ksqlDb.RestApi.Client/KSql/Query/Context/KSqlDBContextOptions.cs index b93712b4..7aeab301 100644 --- a/ksqlDb.RestApi.Client/KSql/Query/Context/KSqlDBContextOptions.cs +++ b/ksqlDb.RestApi.Client/KSql/Query/Context/KSqlDBContextOptions.cs @@ -1,5 +1,6 @@ using System.Globalization; using System.Text.Json; +using System.Text.Json.Nodes; using ksqlDB.RestApi.Client.KSql.Config; using ksqlDb.RestApi.Client.KSql.Query.Context.Options; using ksqlDB.RestApi.Client.KSql.Query.Options; @@ -25,10 +26,8 @@ public KSqlDBContextOptions(string url) Url = url; - QueryStreamParameters = new QueryStreamParameters - { - [QueryStreamParameters.AutoOffsetResetPropertyName] = AutoOffsetReset.Earliest.ToString().ToLower(), - }; + QueryStreamParameters = new QueryStreamParameters(); + QueryStreamParameters.Set(QueryStreamParameters.AutoOffsetResetPropertyName, AutoOffsetReset.Earliest); PullQueryParameters = new PullQueryParameters(); @@ -67,9 +66,7 @@ public KSqlDBContextOptions(string url) /// Type of processing guarantee. public void SetProcessingGuarantee(ProcessingGuarantee processingGuarantee) { - string guarantee = processingGuarantee.ToKSqlValue(); - - QueryStreamParameters[KSqlDbConfigs.ProcessingGuarantee] = guarantee; + QueryStreamParameters.Set(KSqlDbConfigs.ProcessingGuarantee, processingGuarantee); } /// @@ -78,8 +75,7 @@ public void SetProcessingGuarantee(ProcessingGuarantee processingGuarantee) /// The type of auto offset reset. public void SetAutoOffsetReset(AutoOffsetReset autoOffsetReset) { - QueryStreamParameters[QueryStreamParameters.AutoOffsetResetPropertyName] = - autoOffsetReset.ToKSqlValue(); + QueryStreamParameters.Set(QueryStreamParameters.AutoOffsetResetPropertyName, autoOffsetReset); } /// diff --git a/ksqlDb.RestApi.Client/KSql/Query/Options/AutoOffsetReset.cs b/ksqlDb.RestApi.Client/KSql/Query/Options/AutoOffsetReset.cs index c7bd20af..ea7979c4 100644 --- a/ksqlDb.RestApi.Client/KSql/Query/Options/AutoOffsetReset.cs +++ b/ksqlDb.RestApi.Client/KSql/Query/Options/AutoOffsetReset.cs @@ -1,8 +1,13 @@ +using System.Text.Json; +using System.Text.Json.Serialization; +using ksqlDb.RestApi.Client.Infrastructure.Attributes; + namespace ksqlDB.RestApi.Client.KSql.Query.Options; /// /// Kafka offset reset policies define how a consumer should handle situations where it tries to read from a partition in Kafka but there is no valid offset available or the offset is out of range. /// +[JsonSnakeCaseStringEnumConverter] public enum AutoOffsetReset { /// diff --git a/ksqlDb.RestApi.Client/KSql/Query/Options/AutoOffsetResetExtensions.cs b/ksqlDb.RestApi.Client/KSql/Query/Options/AutoOffsetResetExtensions.cs deleted file mode 100644 index 11cca39c..00000000 --- a/ksqlDb.RestApi.Client/KSql/Query/Options/AutoOffsetResetExtensions.cs +++ /dev/null @@ -1,34 +0,0 @@ -namespace ksqlDB.RestApi.Client.KSql.Query.Options; - -/// -/// Provides extension methods for converting string values to and vice versa. -/// -public static class AutoOffsetResetExtensions -{ - /// - /// Converts a string value to . - /// - /// The string value representing the auto offset reset policy. - /// The corresponding value. - /// Thrown when the provided is not a valid option. - public static AutoOffsetReset ToAutoOffsetReset(this string autoOffsetResetValue) - { - if (autoOffsetResetValue == "earliest") - return AutoOffsetReset.Earliest; - - if (autoOffsetResetValue == "latest") - return AutoOffsetReset.Latest; - - throw new ArgumentOutOfRangeException(nameof(autoOffsetResetValue), autoOffsetResetValue, null); - } - - /// - /// Converts a value to its corresponding string representation for KSql. - /// - /// The value. - /// The string representation of the value for KSql. - public static string ToKSqlValue(this AutoOffsetReset value) - { - return value.ToString().ToLower(); - } -} diff --git a/ksqlDb.RestApi.Client/KSql/Query/Options/ProcessingGuarantee.cs b/ksqlDb.RestApi.Client/KSql/Query/Options/ProcessingGuarantee.cs index d288cc9b..4dc456b6 100644 --- a/ksqlDb.RestApi.Client/KSql/Query/Options/ProcessingGuarantee.cs +++ b/ksqlDb.RestApi.Client/KSql/Query/Options/ProcessingGuarantee.cs @@ -1,8 +1,11 @@ +using ksqlDb.RestApi.Client.Infrastructure.Attributes; + namespace ksqlDB.RestApi.Client.KSql.Query.Options; /// /// Specifies the guarantees that Kafka provides regarding message delivery and processing semantics. /// +[JsonSnakeCaseStringEnumConverter] public enum ProcessingGuarantee { /// @@ -15,11 +18,11 @@ public enum ProcessingGuarantee /// Records are processed once. To achieve a true exactly-once system, end consumers and producers must also implement exactly-once semantics. /// processing.guarantee="exactly_once_v2" /// - /// + /// ExactlyOnceV2, /// /// Records are never lost but may be redelivered. - /// processing.guarantee="at_least_once" + /// processing.guarantee="at_least_once" /// AtLeastOnce } diff --git a/ksqlDb.RestApi.Client/KSql/Query/Options/ProcessingGuaranteeExtensions.cs b/ksqlDb.RestApi.Client/KSql/Query/Options/ProcessingGuaranteeExtensions.cs deleted file mode 100644 index 3c807980..00000000 --- a/ksqlDb.RestApi.Client/KSql/Query/Options/ProcessingGuaranteeExtensions.cs +++ /dev/null @@ -1,48 +0,0 @@ -namespace ksqlDB.RestApi.Client.KSql.Query.Options; - -/// -/// Provides extension methods for converting between string values and enum, -/// as well as converting values to their corresponding string representation for KSql. -/// -public static class ProcessingGuaranteeExtensions -{ - internal const string AtLeastOnce = "at_least_once"; - internal const string ExactlyOnce = "exactly_once"; - internal const string ExactlyOnceV2 = "exactly_once_v2"; - - /// - /// Converts a string value to . - /// - /// The string value representing the processing guarantee. - /// The corresponding value. - /// Thrown when the provided is not a valid option. - public static ProcessingGuarantee ToProcessingGuarantee(this string processingGuaranteeValue) - { - return processingGuaranteeValue switch - { - AtLeastOnce => ProcessingGuarantee.AtLeastOnce, - ExactlyOnce => ProcessingGuarantee.ExactlyOnce, - ExactlyOnceV2 => ProcessingGuarantee.ExactlyOnceV2, - _ => throw new ArgumentOutOfRangeException(nameof(processingGuaranteeValue), processingGuaranteeValue, null) - }; - } - - /// - /// Converts a value to its corresponding string representation for KSql. - /// - /// The value. - /// The string representation of the value for KSql. - /// Thrown when the provided is not a valid option. - public static string ToKSqlValue(this ProcessingGuarantee processingGuarantee) - { - string guarantee = processingGuarantee switch - { - ProcessingGuarantee.AtLeastOnce => AtLeastOnce, - ProcessingGuarantee.ExactlyOnce => ExactlyOnce, - ProcessingGuarantee.ExactlyOnceV2 => ExactlyOnceV2, - _ => throw new ArgumentOutOfRangeException(nameof(processingGuarantee), processingGuarantee, null) - }; - - return guarantee; - } -} diff --git a/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/IQueryOptions.cs b/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/IQueryOptions.cs index 6cbd5259..aaacca54 100644 --- a/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/IQueryOptions.cs +++ b/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/IQueryOptions.cs @@ -1,6 +1,8 @@ -namespace ksqlDB.RestApi.Client.KSql.RestApi.Parameters; +using System.Text.Json.Nodes; + +namespace ksqlDB.RestApi.Client.KSql.RestApi.Parameters; public interface IQueryOptions { - Dictionary Properties { get; } -} \ No newline at end of file + Dictionary Properties { get; } +} diff --git a/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/IQueryParameters.cs b/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/IQueryParameters.cs index e958ea6e..725c9347 100644 --- a/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/IQueryParameters.cs +++ b/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/IQueryParameters.cs @@ -10,10 +10,7 @@ public interface IQueryParameters : IQueryOptions /// string Sql { get; set; } - /// - /// Indexer to access properties by key. - /// - /// The key of the property. - /// The value of the property. - string this[string key] { get; set; } + void Set(string key, T value); + + T Get(string key); } diff --git a/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryEndpointParameters.cs b/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryEndpointParameters.cs index ed9732ad..06e7c98a 100644 --- a/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryEndpointParameters.cs +++ b/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryEndpointParameters.cs @@ -1,3 +1,4 @@ +using System.Text.Json.Nodes; using System.Text.Json.Serialization; using ksqlDB.RestApi.Client.KSql.RestApi.Statements; @@ -20,17 +21,16 @@ public class QueryEndpointParameters : IKSqlDbParameters /// Property overrides to run the statements with. /// [JsonPropertyName("streamsProperties")] - public Dictionary Properties { get; } = new(); + public Dictionary Properties { get; } = new(); - /// - /// Indexer to access properties by key. - /// - /// The key of the property. - /// The value of the property. - public string this[string key] + public TValue Get(string key) + { + return Properties[key].GetValue(); + } + + public void Set(string key, TValue value) { - get => Properties[key]; - set => Properties[key] = value; + Properties[key] = JsonValue.Create(value); } internal EndpointType EndpointType { get; set; } = EndpointType.Query; diff --git a/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryParameters.cs b/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryParameters.cs index 2b2fb9ba..3a97cf3e 100644 --- a/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryParameters.cs +++ b/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryParameters.cs @@ -1,3 +1,4 @@ +using System.Text.Json.Nodes; using System.Text.Json.Serialization; using ksqlDB.RestApi.Client.KSql.Query.Options; @@ -16,13 +17,7 @@ public class QueryParameters : QueryEndpointParameters, IPushQu [JsonIgnore] public AutoOffsetReset AutoOffsetReset { - get - { - var value = this[AutoOffsetResetPropertyName]; - - return value.ToAutoOffsetReset(); - } - - set => this[AutoOffsetResetPropertyName] = value.ToKSqlValue(); + get => Get(AutoOffsetResetPropertyName); + set => Set(AutoOffsetResetPropertyName, value); } } diff --git a/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryStreamEndpointParameters.cs b/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryStreamEndpointParameters.cs index 9f8d53ee..075de3a2 100644 --- a/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryStreamEndpointParameters.cs +++ b/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryStreamEndpointParameters.cs @@ -1,3 +1,4 @@ +using System.Text.Json.Nodes; using System.Text.Json.Serialization; namespace ksqlDB.RestApi.Client.KSql.RestApi.Parameters; @@ -19,17 +20,16 @@ public class QueryStreamEndpointParameters : IKSqlDbParameters /// Property overrides to run the statements with. /// [JsonPropertyName("properties")] - public Dictionary Properties { get; } = new(); + public Dictionary Properties { get; } = new(); - /// - /// Indexer to access properties by key. - /// - /// The key of the property. - /// The value of the property. - public string this[string key] + public TValue Get(string key) + { + return Properties[key].GetValue(); + } + + public void Set(string key, TValue value) { - get => Properties[key]; - set => Properties[key] = value; + Properties[key] = JsonValue.Create(value); } /// diff --git a/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryStreamParameters.cs b/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryStreamParameters.cs index 3341ddf7..8ac8de22 100644 --- a/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryStreamParameters.cs +++ b/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryStreamParameters.cs @@ -1,3 +1,4 @@ +using System.Text.Json.Nodes; using System.Text.Json.Serialization; using ksqlDB.RestApi.Client.KSql.Query.Options; @@ -16,13 +17,8 @@ public sealed class QueryStreamParameters : QueryStreamEndpointParameters Get(AutoOffsetResetPropertyName); - return value.ToAutoOffsetReset(); - } - - set => this[AutoOffsetResetPropertyName] = value.ToKSqlValue(); + set => Set(AutoOffsetResetPropertyName, value); } } diff --git a/ksqlDb.RestApi.Client/ksqlDb.RestApi.Client.csproj b/ksqlDb.RestApi.Client/ksqlDb.RestApi.Client.csproj index 9445f28c..f4536f3c 100644 --- a/ksqlDb.RestApi.Client/ksqlDb.RestApi.Client.csproj +++ b/ksqlDb.RestApi.Client/ksqlDb.RestApi.Client.csproj @@ -32,7 +32,7 @@ - + @@ -56,4 +56,4 @@ - \ No newline at end of file + From d1f882f40f5810b0e098f95c98c57109f3f82eb0 Mon Sep 17 00:00:00 2001 From: Fred Vollmer Date: Thu, 31 Oct 2024 10:05:10 -0400 Subject: [PATCH 2/4] fix build --- .../KSql/RestApi/Generators/TypeGenerator.cs | 8 +++++--- .../KSql/RestApi/Parameters/QueryEndpointParameters.cs | 2 +- .../RestApi/Parameters/QueryStreamEndpointParameters.cs | 2 +- .../KSql/RestApi/Statements/CreateEntity.cs | 7 ++++--- .../KSql/RestApi/Statements/KSqlTypeTranslator.cs | 2 +- 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/ksqlDb.RestApi.Client/KSql/RestApi/Generators/TypeGenerator.cs b/ksqlDb.RestApi.Client/KSql/RestApi/Generators/TypeGenerator.cs index 6e1182e9..9fadafa9 100644 --- a/ksqlDb.RestApi.Client/KSql/RestApi/Generators/TypeGenerator.cs +++ b/ksqlDb.RestApi.Client/KSql/RestApi/Generators/TypeGenerator.cs @@ -13,6 +13,8 @@ namespace ksqlDB.RestApi.Client.KSql.RestApi.Generators; internal sealed class TypeGenerator(IMetadataProvider metadataProvider) : EntityInfo(metadataProvider) { + private readonly IMetadataProvider _metadataProvider = metadataProvider; + internal string Print(TypeProperties properties) { StringBuilder stringBuilder = new(); @@ -30,7 +32,7 @@ private void PrintProperties(StringBuilder stringBuilder, IdentifierEscaping { var ksqlProperties = new List(); - KSqlTypeTranslator typeTranslator = new(metadataProvider); + KSqlTypeTranslator typeTranslator = new(_metadataProvider); foreach (var memberInfo in Members()) { @@ -38,14 +40,14 @@ private void PrintProperties(StringBuilder stringBuilder, IdentifierEscaping var ksqlType = typeTranslator.Translate(type, memberInfo, escaping); - var memberName = memberInfo.GetMemberName(metadataProvider); + var memberName = memberInfo.GetMemberName(_metadataProvider); var columnDefinition = $"{EscapeName(memberName, escaping)} {ksqlType}{typeTranslator.ExploreAttributes(typeof(T), memberInfo, type)}"; ksqlProperties.Add(columnDefinition); } stringBuilder.Append(string.Join(", ", ksqlProperties)); } - + private static string EscapeName(string name, IdentifierEscaping escaping) => (escaping, IdentifierUtil.IsValid(name)) switch { diff --git a/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryEndpointParameters.cs b/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryEndpointParameters.cs index 06e7c98a..2c6e8439 100644 --- a/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryEndpointParameters.cs +++ b/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryEndpointParameters.cs @@ -30,7 +30,7 @@ public TValue Get(string key) public void Set(string key, TValue value) { - Properties[key] = JsonValue.Create(value); + Properties[key] = JsonValue.Create(value) ?? throw new InvalidOperationException("The value could not be converted to JSON"); } internal EndpointType EndpointType { get; set; } = EndpointType.Query; diff --git a/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryStreamEndpointParameters.cs b/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryStreamEndpointParameters.cs index 075de3a2..cba6d880 100644 --- a/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryStreamEndpointParameters.cs +++ b/ksqlDb.RestApi.Client/KSql/RestApi/Parameters/QueryStreamEndpointParameters.cs @@ -29,7 +29,7 @@ public TValue Get(string key) public void Set(string key, TValue value) { - Properties[key] = JsonValue.Create(value); + Properties[key] = JsonValue.Create(value) ?? throw new InvalidOperationException("The value could not be converted to JSON"); } /// diff --git a/ksqlDb.RestApi.Client/KSql/RestApi/Statements/CreateEntity.cs b/ksqlDb.RestApi.Client/KSql/RestApi/Statements/CreateEntity.cs index 584272ce..3c42bc65 100644 --- a/ksqlDb.RestApi.Client/KSql/RestApi/Statements/CreateEntity.cs +++ b/ksqlDb.RestApi.Client/KSql/RestApi/Statements/CreateEntity.cs @@ -13,6 +13,7 @@ namespace ksqlDB.RestApi.Client.KSql.RestApi.Statements; internal sealed class CreateEntity(IMetadataProvider metadataProvider) : EntityInfo(metadataProvider) { private readonly StringBuilder stringBuilder = new(); + private readonly IMetadataProvider _metadataProvider = metadataProvider; internal string Print(StatementContext statementContext, EntityCreationMetadata metadata, bool? ifNotExists) { @@ -41,7 +42,7 @@ internal string Print(StatementContext statementContext, EntityCreationMetada private void PrintProperties(StatementContext statementContext, EntityCreationMetadata metadata) { var ksqlProperties = new List(); - KSqlTypeTranslator typeTranslator = new(metadataProvider); + KSqlTypeTranslator typeTranslator = new(_metadataProvider); foreach (var memberInfo in Members(metadata.IncludeReadOnlyProperties)) { @@ -49,7 +50,7 @@ private void PrintProperties(StatementContext statementContext, EntityCreatio var ksqlType = typeTranslator.Translate(type, memberInfo, metadata.IdentifierEscaping); - var columnName = IdentifierUtil.Format(memberInfo, metadata.IdentifierEscaping, metadataProvider); + var columnName = IdentifierUtil.Format(memberInfo, metadata.IdentifierEscaping, _metadataProvider); string columnDefinition = $"\t{columnName} {ksqlType}{typeTranslator.ExploreAttributes(typeof(T), memberInfo, type)}"; columnDefinition += TryAttachKey(statementContext.KSqlEntityType, memberInfo); @@ -85,7 +86,7 @@ private void PrintCreateOrReplace(StatementContext statementContext, EntityCr private string TryAttachKey(KSqlEntityType entityType, MemberInfo memberInfo) { - var entityMetadata = metadataProvider.GetEntities().FirstOrDefault(c => c.Type == typeof(T)); + var entityMetadata = _metadataProvider.GetEntities().FirstOrDefault(c => c.Type == typeof(T)); var primaryKey = entityMetadata?.PrimaryKeyMemberInfo; diff --git a/ksqlDb.RestApi.Client/KSql/RestApi/Statements/KSqlTypeTranslator.cs b/ksqlDb.RestApi.Client/KSql/RestApi/Statements/KSqlTypeTranslator.cs index 3f7991f7..f671796a 100644 --- a/ksqlDb.RestApi.Client/KSql/RestApi/Statements/KSqlTypeTranslator.cs +++ b/ksqlDb.RestApi.Client/KSql/RestApi/Statements/KSqlTypeTranslator.cs @@ -123,7 +123,7 @@ internal IEnumerable GetProperties(Type type, IdentifierEscaping escapin var ksqlType = Translate(memberType, memberInfo, escaping); - string columnDefinition = $"{memberInfo.Format(escaping, metadataProvider as ModelBuilder)} {ksqlType}{ExploreAttributes(type, memberInfo, memberType)}"; + string columnDefinition = $"{memberInfo.Format(escaping, metadataProvider)} {ksqlType}{ExploreAttributes(type, memberInfo, memberType)}"; ksqlProperties.Add(columnDefinition); } From 3343af32204cc83abc64fecf8aa56a503354eb1e Mon Sep 17 00:00:00 2001 From: Fred Vollmer Date: Tue, 5 Nov 2024 14:55:06 -0500 Subject: [PATCH 3/4] Add integration test --- .../Infrastructure/IntegrationTests.cs | 5 +++-- .../KSql/Query/Context/KSqlDbContextTests.cs | 21 +++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/Tests/ksqlDB.RestApi.Client.IntegrationTests/Infrastructure/IntegrationTests.cs b/Tests/ksqlDB.RestApi.Client.IntegrationTests/Infrastructure/IntegrationTests.cs index cab51245..b88a722a 100644 --- a/Tests/ksqlDB.RestApi.Client.IntegrationTests/Infrastructure/IntegrationTests.cs +++ b/Tests/ksqlDB.RestApi.Client.IntegrationTests/Infrastructure/IntegrationTests.cs @@ -33,13 +33,14 @@ public override void TestInitialize() Context = CreateKSqlDbContext(EndpointType.QueryStream); } - protected KSqlDBContext CreateKSqlDbContext(EndpointType endpointType, ModelBuilder? modelBuilder = null) + protected KSqlDBContext CreateKSqlDbContext(EndpointType endpointType, ModelBuilder? modelBuilder = null, Action? configureOptions = null) { ContextOptions = new KSqlDBContextOptions(KSqlDbRestApiProvider.KsqlDbUrl) { ShouldPluralizeFromItemName = false, EndpointType = endpointType }; + configureOptions?.Invoke(ContextOptions); modelBuilder ??= new ModelBuilder(); return new KSqlDBContext(ContextOptions, modelBuilder); @@ -62,7 +63,7 @@ protected static async Task> CollectActualValues(IAsyncEnumerable if (expectedItemsCount.HasValue) source = source.Take(expectedItemsCount.Value); - + await foreach (var item in source.WithCancellation(cts.Token)) { actualValues.Add(item); diff --git a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Context/KSqlDbContextTests.cs b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Context/KSqlDbContextTests.cs index cf12c9af..e0943ede 100644 --- a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Context/KSqlDbContextTests.cs +++ b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Context/KSqlDbContextTests.cs @@ -15,6 +15,7 @@ using ksqlDB.RestApi.Client.KSql.RestApi.Statements.Properties; using Microsoft.Extensions.DependencyInjection; using NUnit.Framework; +using EndpointType = ksqlDB.RestApi.Client.KSql.Query.Options.EndpointType; namespace ksqlDb.RestApi.Client.IntegrationTests.KSql.Query.Context; @@ -178,5 +179,25 @@ public async Task TimeTypes_InsertValues_ValuesReceived() // "; } + public async Task ContextOptions_EndpointParameters_NonStringParameter() + { + // Arrange + var context = CreateKSqlDbContext(EndpointType.Query, configureOptions: options => + { + options.PullQueryParameters.Set("ksql.query.pull.table.scan.enabled", true); + }); + + var config = new InsertProperties { EntityName = EntityName, ShouldPluralizeEntityName = false }; + var entity1 = new Movie { Id = 1, Title = "T1" }; + + //Act + context.Add(entity1, config); + var response = await context.SaveChangesAsync(); + + // Assert + response.Should().NotBeNull(); + response!.StatusCode.Should().Be(HttpStatusCode.OK); + } + #endregion } From 816f6d9168894ba97ae381353cd900b61173a34c Mon Sep 17 00:00:00 2001 From: Fred Vollmer Date: Wed, 6 Nov 2024 12:39:20 -0500 Subject: [PATCH 4/4] add test attr --- .../KSql/Query/Context/KSqlDbContextTests.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Context/KSqlDbContextTests.cs b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Context/KSqlDbContextTests.cs index e0943ede..b7a4cb00 100644 --- a/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Context/KSqlDbContextTests.cs +++ b/Tests/ksqlDB.RestApi.Client.IntegrationTests/KSql/Query/Context/KSqlDbContextTests.cs @@ -179,6 +179,7 @@ public async Task TimeTypes_InsertValues_ValuesReceived() // "; } + [Test] public async Task ContextOptions_EndpointParameters_NonStringParameter() { // Arrange