diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index 23be86369..310c7283d 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -70,6 +70,7 @@ public static class Cluster { List<@Valid Masking> masking; AuditProperties audit; + } @Data @@ -113,6 +114,7 @@ public static class ConnectCluster { public static class SchemaRegistryAuth { String username; String password; + String bearerAuthCustomProviderClass; } @Data diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java index 5ab77ffeb..c6e5a1d72 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java @@ -11,6 +11,7 @@ import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer; import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; import io.kafbat.ui.util.jsonschema.JsonAvroConversion; +import java.util.HashMap; import java.util.Map; import lombok.SneakyThrows; @@ -18,9 +19,10 @@ interface MessageFormatter { String format(String topic, byte[] value); - static Map createMap(SchemaRegistryClient schemaRegistryClient) { + static Map createMap(SchemaRegistryClient schemaRegistryClient, + String bearerAuthCustomProviderClass) { return Map.of( - SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient), + SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient, bearerAuthCustomProviderClass), SchemaType.JSON, new JsonSchemaMessageFormatter(schemaRegistryClient), SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient) ); @@ -29,17 +31,23 @@ SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient) class AvroMessageFormatter implements MessageFormatter { private final KafkaAvroDeserializer avroDeserializer; - AvroMessageFormatter(SchemaRegistryClient client) { + AvroMessageFormatter(SchemaRegistryClient client, String bearerAuthCustomProviderClass) { this.avroDeserializer = new KafkaAvroDeserializer(client); - this.avroDeserializer.configure( - Map.of( - AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused", - KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false, - KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false, - KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true - ), - false - ); + + final Map avroProps = new HashMap<>(); + avroProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused"); + avroProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); + avroProps.put(KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false); + avroProps.put(KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true); + + if (bearerAuthCustomProviderClass != null && !bearerAuthCustomProviderClass.isBlank()) { + avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CREDENTIALS_SOURCE, "CUSTOM"); + avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, + String.format("class %s", bearerAuthCustomProviderClass)); + } + + this.avroDeserializer.configure(avroProps, false); + } @Override diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java index d6f7a3699..f4697d33b 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java @@ -42,6 +42,7 @@ public class SchemaRegistrySerde implements BuiltInSerde { private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0; private static final int SR_PAYLOAD_PREFIX_LENGTH = 5; + private static final String CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE = "CUSTOM"; public static String name() { return "SchemaRegistry"; @@ -77,11 +78,15 @@ public void autoConfigure(PropertyResolver kafkaClusterProperties, urls, kafkaClusterProperties.getProperty("schemaRegistryAuth.username", String.class).orElse(null), kafkaClusterProperties.getProperty("schemaRegistryAuth.password", String.class).orElse(null), + kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class) + .orElse(null), kafkaClusterProperties.getProperty("schemaRegistrySsl.keystoreLocation", String.class).orElse(null), kafkaClusterProperties.getProperty("schemaRegistrySsl.keystorePassword", String.class).orElse(null), kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null), kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null) ), + kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class) + .orElse(null), kafkaClusterProperties.getProperty("schemaRegistryKeySchemaNameTemplate", String.class).orElse("%s-key"), kafkaClusterProperties.getProperty("schemaRegistrySchemaNameTemplate", String.class).orElse("%s-value"), kafkaClusterProperties.getProperty("schemaRegistryCheckSchemaExistenceForDeserialize", Boolean.class) @@ -103,11 +108,15 @@ public void configure(PropertyResolver serdeProperties, urls, serdeProperties.getProperty("username", String.class).orElse(null), serdeProperties.getProperty("password", String.class).orElse(null), + kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class) + .orElse(null), serdeProperties.getProperty("keystoreLocation", String.class).orElse(null), serdeProperties.getProperty("keystorePassword", String.class).orElse(null), kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null), kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null) ), + kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class) + .orElse(null), serdeProperties.getProperty("keySchemaNameTemplate", String.class).orElse("%s-key"), serdeProperties.getProperty("schemaNameTemplate", String.class).orElse("%s-value"), serdeProperties.getProperty("checkSchemaExistenceForDeserialize", Boolean.class) @@ -119,6 +128,7 @@ public void configure(PropertyResolver serdeProperties, void configure( List schemaRegistryUrls, SchemaRegistryClient schemaRegistryClient, + String bearerAuthCustomProviderClass, String keySchemaNameTemplate, String valueSchemaNameTemplate, boolean checkTopicSchemaExistenceForDeserialize) { @@ -126,17 +136,19 @@ void configure( this.schemaRegistryClient = schemaRegistryClient; this.keySchemaNameTemplate = keySchemaNameTemplate; this.valueSchemaNameTemplate = valueSchemaNameTemplate; - this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient); + this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient, bearerAuthCustomProviderClass); this.checkSchemaExistenceForDeserialize = checkTopicSchemaExistenceForDeserialize; } private static SchemaRegistryClient createSchemaRegistryClient(List urls, @Nullable String username, @Nullable String password, + @Nullable String bearerAuthCustomProviderClass, @Nullable String keyStoreLocation, @Nullable String keyStorePassword, @Nullable String trustStoreLocation, - @Nullable String trustStorePassword) { + @Nullable String trustStorePassword + ) { Map configs = new HashMap<>(); if (username != null && password != null) { configs.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); @@ -166,6 +178,11 @@ private static SchemaRegistryClient createSchemaRegistryClient(List urls keyStorePassword); } + if (bearerAuthCustomProviderClass != null) { + configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CREDENTIALS_SOURCE, CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE); + configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, bearerAuthCustomProviderClass); + } + return new CachedSchemaRegistryClient( urls, 1_000, diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java index f8c528f90..12fc6a4ba 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java @@ -16,10 +16,12 @@ import io.kafbat.ui.util.KafkaServicesValidation; import io.kafbat.ui.util.ReactiveFailover; import io.kafbat.ui.util.WebClientConfigurator; +import io.kafbat.ui.util.gcp.GcpBearerAuthFilter; import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.stream.Stream; @@ -167,13 +169,21 @@ private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterPrope } private ReactiveFailover schemaRegistryClient(ClustersProperties.Cluster clusterProperties) { + var auth = Optional.ofNullable(clusterProperties.getSchemaRegistryAuth()) .orElse(new ClustersProperties.SchemaRegistryAuth()); - WebClient webClient = new WebClientConfigurator() + WebClientConfigurator webClientConfigurator = new WebClientConfigurator() .configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl()) .configureBasicAuth(auth.getUsername(), auth.getPassword()) - .configureBufferSize(webClientMaxBuffSize) - .build(); + .configureBufferSize(webClientMaxBuffSize); + + if (auth.getBearerAuthCustomProviderClass() != null && Objects.equals(auth.getBearerAuthCustomProviderClass(), + GcpBearerAuthFilter.getGcpBearerAuthCustomProviderClass())) { + webClientConfigurator.filter(new GcpBearerAuthFilter()); + } + + WebClient webClient = webClientConfigurator.build(); + return ReactiveFailover.create( parseUrlList(clusterProperties.getSchemaRegistry()), url -> new KafkaSrClientApi(new ApiClient(webClient, null, null).setBasePath(url)), diff --git a/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java b/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java index c725a787e..01426f07f 100644 --- a/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java +++ b/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java @@ -9,12 +9,14 @@ import io.kafbat.ui.sr.api.KafkaSrClientApi; import io.kafbat.ui.sr.model.Compatibility; import io.kafbat.ui.sr.model.CompatibilityCheckResponse; -import io.kafbat.ui.sr.model.CompatibilityConfig; import io.kafbat.ui.sr.model.CompatibilityLevelChange; +import io.kafbat.ui.sr.model.GetGlobalCompatibilityLevel200Response; import io.kafbat.ui.sr.model.NewSubject; import io.kafbat.ui.sr.model.SchemaSubject; import io.kafbat.ui.util.ReactiveFailover; +import io.kafbat.ui.util.gcp.GcpBearerAuthFilter; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.Getter; @@ -148,14 +150,16 @@ public Mono getSchemaCompatibilityLevel(KafkaCluster cluster, String schemaName) { return api(cluster) .mono(c -> c.getSubjectCompatibilityLevel(schemaName, true)) - .map(CompatibilityConfig::getCompatibilityLevel) + .map(compatibilityResponse -> + normalizeCompatibilityResponse(cluster, compatibilityResponse)) .onErrorResume(error -> Mono.empty()); } public Mono getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) { return api(cluster) .mono(KafkaSrClientApi::getGlobalCompatibilityLevel) - .map(CompatibilityConfig::getCompatibilityLevel); + .map(compatibilityResponse -> + normalizeCompatibilityResponse(cluster, compatibilityResponse)); } private Mono getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster, @@ -169,4 +173,18 @@ public Mono checksSchemaCompatibility(KafkaCluster c NewSubject newSchemaSubject) { return api(cluster).mono(c -> c.checkSchemaCompatibility(schemaName, LATEST, true, newSchemaSubject)); } + + private Compatibility normalizeCompatibilityResponse(KafkaCluster cluster, + GetGlobalCompatibilityLevel200Response compatibilityResponse) { + if (cluster.getOriginalProperties().getSchemaRegistryAuth() != null + && Objects.equals(cluster.getOriginalProperties().getSchemaRegistryAuth().getBearerAuthCustomProviderClass(), + GcpBearerAuthFilter.getGcpBearerAuthCustomProviderClass())) { + return compatibilityResponse.getCompatibility(); + } else { + return compatibilityResponse.getCompatibilityLevel(); + } + } } + + + diff --git a/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java b/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java index 170530be1..c4405e89f 100644 --- a/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java +++ b/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java @@ -11,6 +11,8 @@ import java.io.FileInputStream; import java.security.KeyStore; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.function.Consumer; import javax.annotation.Nullable; import javax.net.ssl.KeyManagerFactory; @@ -24,9 +26,11 @@ import org.springframework.http.codec.json.Jackson2JsonEncoder; import org.springframework.util.ResourceUtils; import org.springframework.util.unit.DataSize; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.WebClient; import reactor.netty.http.client.HttpClient; + public class WebClientConfigurator { private final WebClient.Builder builder = WebClient.builder(); @@ -34,6 +38,8 @@ public class WebClientConfigurator { .create() .proxyWithSystemProperties(); + private final List filters = new ArrayList<>(); + public WebClientConfigurator() { configureObjectMapper(defaultOM()); } @@ -45,6 +51,13 @@ private static ObjectMapper defaultOM() { .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } + public WebClientConfigurator filter(ExchangeFilterFunction filter) { + if (filter != null) { + this.filters.add(filter); + } + return this; + } + public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig, @Nullable ClustersProperties.KeystoreConfig keystoreConfig) { if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) { @@ -151,6 +164,7 @@ public WebClientConfigurator configureResponseTimeout(Duration responseTimeout) } public WebClient build() { + builder.filters(filterList -> filterList.addAll(this.filters)); return builder.clientConnector(new ReactorClientHttpConnector(httpClient)).build(); } } diff --git a/api/src/main/java/io/kafbat/ui/util/gcp/GcpBearerAuthFilter.java b/api/src/main/java/io/kafbat/ui/util/gcp/GcpBearerAuthFilter.java new file mode 100644 index 000000000..3faae1d76 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/util/gcp/GcpBearerAuthFilter.java @@ -0,0 +1,40 @@ +package io.kafbat.ui.util.gcp; + +import com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider; +import org.jetbrains.annotations.NotNull; +import org.springframework.web.reactive.function.client.ClientRequest; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; +import org.springframework.web.reactive.function.client.ExchangeFunction; +import reactor.core.publisher.Mono; + +public class GcpBearerAuthFilter implements ExchangeFilterFunction { + + private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS = + GcpBearerAuthCredentialProvider.class.getName(); + + private final GcpBearerAuthCredentialProvider credentialProvider; + + public GcpBearerAuthFilter() { + this.credentialProvider = new GcpBearerAuthCredentialProvider(); + } + + @NotNull + @Override + public Mono filter(ClientRequest request, ExchangeFunction next) { + // This Mono ensures token fetching happens for EACH request + return Mono.fromCallable(() -> this.credentialProvider.getBearerToken(null)) + .flatMap(token -> { + // Create a new request with the Authorization header + ClientRequest newRequest = ClientRequest.from(request) + .headers(headers -> headers.setBearerAuth(token)) + .build(); + // Pass the new request to the next filter in the chain + return next.exchange(newRequest); + }); + } + + public static String getGcpBearerAuthCustomProviderClass() { + return GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS; + } +} diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java index d66a8d004..9a4be577f 100644 --- a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java +++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java @@ -35,7 +35,7 @@ class SchemaRegistrySerdeTest { @BeforeEach void init() { serde = new SchemaRegistrySerde(); - serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", true); + serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", true); } @ParameterizedTest @@ -135,7 +135,7 @@ class SerdeWithDisabledSubjectExistenceCheck { @BeforeEach void init() { - serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", false); + serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", false); } @Test @@ -151,7 +151,7 @@ class SerdeWithEnabledSubjectExistenceCheck { @BeforeEach void init() { - serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", true); + serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", true); } @Test diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 8769e6aa1..1d92b4eff 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -4334,6 +4334,8 @@ components: type: string password: type: string + bearerAuthCustomProviderClass: + type: string schemaRegistrySsl: type: object properties: diff --git a/contract/src/main/resources/swagger/kafka-sr-api.yaml b/contract/src/main/resources/swagger/kafka-sr-api.yaml index 2b082d689..c2b888e2f 100644 --- a/contract/src/main/resources/swagger/kafka-sr-api.yaml +++ b/contract/src/main/resources/swagger/kafka-sr-api.yaml @@ -176,7 +176,9 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/CompatibilityConfig' + oneOf: + - $ref: '#/components/schemas/CompatibilityConfig' + - $ref: '#/components/schemas/CompatibilityConfigGcp' 404: description: Not found put: @@ -220,7 +222,9 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/CompatibilityConfig' + oneOf: + - $ref: '#/components/schemas/CompatibilityConfig' + - $ref: '#/components/schemas/CompatibilityConfigGcp' 404: description: Not found put: @@ -384,6 +388,18 @@ components: required: - compatibilityLevel + CompatibilityConfigGcp: + type: object + properties: + alias: + type: string + compatibility: + $ref: '#/components/schemas/Compatibility' + normalize: + type: boolean + required: + - compatibility + CompatibilityLevelChange: type: object properties: @@ -392,7 +408,6 @@ components: required: - compatibility - Compatibility: type: string enum: @@ -404,7 +419,6 @@ components: - FULL_TRANSITIVE - NONE - CompatibilityCheckResponse: type: object properties: