Skip to content

BE: SR: Add compatibility w/ GCP SR #1153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public static class Cluster {
List<@Valid Masking> masking;

AuditProperties audit;

}

@Data
Expand Down Expand Up @@ -113,6 +114,7 @@ public static class ConnectCluster {
public static class SchemaRegistryAuth {
String username;
String password;
String bearerAuthCustomProviderClass;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.kafbat.ui.util.jsonschema.JsonAvroConversion;
import java.util.HashMap;
import java.util.Map;
import lombok.SneakyThrows;

interface MessageFormatter {

String format(String topic, byte[] value);

static Map<SchemaType, MessageFormatter> createMap(SchemaRegistryClient schemaRegistryClient) {
static Map<SchemaType, MessageFormatter> createMap(SchemaRegistryClient schemaRegistryClient,
String bearerAuthCustomProviderClass) {
return Map.of(
SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient),
SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient, bearerAuthCustomProviderClass),
SchemaType.JSON, new JsonSchemaMessageFormatter(schemaRegistryClient),
SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient)
);
Expand All @@ -29,17 +31,23 @@ SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient)
class AvroMessageFormatter implements MessageFormatter {
private final KafkaAvroDeserializer avroDeserializer;

AvroMessageFormatter(SchemaRegistryClient client) {
AvroMessageFormatter(SchemaRegistryClient client, String bearerAuthCustomProviderClass) {
this.avroDeserializer = new KafkaAvroDeserializer(client);
this.avroDeserializer.configure(
Map.of(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused",
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false,
KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false,
KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true
),
false
);

final Map<String, Object> avroProps = new HashMap<>();
avroProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused");
avroProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
avroProps.put(KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false);
avroProps.put(KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true);

if (bearerAuthCustomProviderClass != null && !bearerAuthCustomProviderClass.isBlank()) {
avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CREDENTIALS_SOURCE, "CUSTOM");
avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS,
String.format("class %s", bearerAuthCustomProviderClass));
}

this.avroDeserializer.configure(avroProps, false);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {

private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0;
private static final int SR_PAYLOAD_PREFIX_LENGTH = 5;
private static final String CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE = "CUSTOM";

public static String name() {
return "SchemaRegistry";
Expand Down Expand Up @@ -77,11 +78,15 @@ public void autoConfigure(PropertyResolver kafkaClusterProperties,
urls,
kafkaClusterProperties.getProperty("schemaRegistryAuth.username", String.class).orElse(null),
kafkaClusterProperties.getProperty("schemaRegistryAuth.password", String.class).orElse(null),
kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class)
.orElse(null),
kafkaClusterProperties.getProperty("schemaRegistrySsl.keystoreLocation", String.class).orElse(null),
kafkaClusterProperties.getProperty("schemaRegistrySsl.keystorePassword", String.class).orElse(null),
kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null),
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null)
),
kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class)
.orElse(null),
kafkaClusterProperties.getProperty("schemaRegistryKeySchemaNameTemplate", String.class).orElse("%s-key"),
kafkaClusterProperties.getProperty("schemaRegistrySchemaNameTemplate", String.class).orElse("%s-value"),
kafkaClusterProperties.getProperty("schemaRegistryCheckSchemaExistenceForDeserialize", Boolean.class)
Expand All @@ -103,11 +108,15 @@ public void configure(PropertyResolver serdeProperties,
urls,
serdeProperties.getProperty("username", String.class).orElse(null),
serdeProperties.getProperty("password", String.class).orElse(null),
kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class)
.orElse(null),
serdeProperties.getProperty("keystoreLocation", String.class).orElse(null),
serdeProperties.getProperty("keystorePassword", String.class).orElse(null),
kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null),
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null)
),
kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class)
.orElse(null),
serdeProperties.getProperty("keySchemaNameTemplate", String.class).orElse("%s-key"),
serdeProperties.getProperty("schemaNameTemplate", String.class).orElse("%s-value"),
serdeProperties.getProperty("checkSchemaExistenceForDeserialize", Boolean.class)
Expand All @@ -119,24 +128,27 @@ public void configure(PropertyResolver serdeProperties,
void configure(
List<String> schemaRegistryUrls,
SchemaRegistryClient schemaRegistryClient,
String bearerAuthCustomProviderClass,
String keySchemaNameTemplate,
String valueSchemaNameTemplate,
boolean checkTopicSchemaExistenceForDeserialize) {
this.schemaRegistryUrls = schemaRegistryUrls;
this.schemaRegistryClient = schemaRegistryClient;
this.keySchemaNameTemplate = keySchemaNameTemplate;
this.valueSchemaNameTemplate = valueSchemaNameTemplate;
this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient);
this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient, bearerAuthCustomProviderClass);
this.checkSchemaExistenceForDeserialize = checkTopicSchemaExistenceForDeserialize;
}

private static SchemaRegistryClient createSchemaRegistryClient(List<String> urls,
@Nullable String username,
@Nullable String password,
@Nullable String bearerAuthCustomProviderClass,
@Nullable String keyStoreLocation,
@Nullable String keyStorePassword,
@Nullable String trustStoreLocation,
@Nullable String trustStorePassword) {
@Nullable String trustStorePassword
) {
Map<String, String> configs = new HashMap<>();
if (username != null && password != null) {
configs.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
Expand Down Expand Up @@ -166,6 +178,11 @@ private static SchemaRegistryClient createSchemaRegistryClient(List<String> urls
keyStorePassword);
}

if (bearerAuthCustomProviderClass != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called a class? The implementation rather retrieves a token via custom implementation from GCP which is later used as a bearer token value, there are no "classes" in use per se.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use this to reference this class. And I changed the bearer token implementation for the WebClient using this class too. Sorry 🙏 .

configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CREDENTIALS_SOURCE, CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE);
configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, bearerAuthCustomProviderClass);
}

return new CachedSchemaRegistryClient(
urls,
1_000,
Expand Down
16 changes: 13 additions & 3 deletions api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import io.kafbat.ui.util.KafkaServicesValidation;
import io.kafbat.ui.util.ReactiveFailover;
import io.kafbat.ui.util.WebClientConfigurator;
import io.kafbat.ui.util.gcp.GcpBearerAuthFilter;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Stream;
Expand Down Expand Up @@ -167,13 +169,21 @@ private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterPrope
}

private ReactiveFailover<KafkaSrClientApi> schemaRegistryClient(ClustersProperties.Cluster clusterProperties) {

var auth = Optional.ofNullable(clusterProperties.getSchemaRegistryAuth())
.orElse(new ClustersProperties.SchemaRegistryAuth());
WebClient webClient = new WebClientConfigurator()
WebClientConfigurator webClientConfigurator = new WebClientConfigurator()
.configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl())
.configureBasicAuth(auth.getUsername(), auth.getPassword())
.configureBufferSize(webClientMaxBuffSize)
.build();
.configureBufferSize(webClientMaxBuffSize);

if (auth.getBearerAuthCustomProviderClass() != null && Objects.equals(auth.getBearerAuthCustomProviderClass(),
GcpBearerAuthFilter.getGcpBearerAuthCustomProviderClass())) {
webClientConfigurator.filter(new GcpBearerAuthFilter());
}

WebClient webClient = webClientConfigurator.build();

return ReactiveFailover.create(
parseUrlList(clusterProperties.getSchemaRegistry()),
url -> new KafkaSrClientApi(new ApiClient(webClient, null, null).setBasePath(url)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import io.kafbat.ui.sr.api.KafkaSrClientApi;
import io.kafbat.ui.sr.model.Compatibility;
import io.kafbat.ui.sr.model.CompatibilityCheckResponse;
import io.kafbat.ui.sr.model.CompatibilityConfig;
import io.kafbat.ui.sr.model.CompatibilityLevelChange;
import io.kafbat.ui.sr.model.GetGlobalCompatibilityLevel200Response;
import io.kafbat.ui.sr.model.NewSubject;
import io.kafbat.ui.sr.model.SchemaSubject;
import io.kafbat.ui.util.ReactiveFailover;
import io.kafbat.ui.util.gcp.GcpBearerAuthFilter;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand Down Expand Up @@ -148,14 +150,16 @@ public Mono<Compatibility> getSchemaCompatibilityLevel(KafkaCluster cluster,
String schemaName) {
return api(cluster)
.mono(c -> c.getSubjectCompatibilityLevel(schemaName, true))
.map(CompatibilityConfig::getCompatibilityLevel)
.map(compatibilityResponse ->
normalizeCompatibilityResponse(cluster, compatibilityResponse))
.onErrorResume(error -> Mono.empty());
}

public Mono<Compatibility> getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) {
return api(cluster)
.mono(KafkaSrClientApi::getGlobalCompatibilityLevel)
.map(CompatibilityConfig::getCompatibilityLevel);
.map(compatibilityResponse ->
normalizeCompatibilityResponse(cluster, compatibilityResponse));
}

private Mono<Compatibility> getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster,
Expand All @@ -169,4 +173,18 @@ public Mono<CompatibilityCheckResponse> checksSchemaCompatibility(KafkaCluster c
NewSubject newSchemaSubject) {
return api(cluster).mono(c -> c.checkSchemaCompatibility(schemaName, LATEST, true, newSchemaSubject));
}

private Compatibility normalizeCompatibilityResponse(KafkaCluster cluster,
GetGlobalCompatibilityLevel200Response compatibilityResponse) {
if (cluster.getOriginalProperties().getSchemaRegistryAuth() != null
&& Objects.equals(cluster.getOriginalProperties().getSchemaRegistryAuth().getBearerAuthCustomProviderClass(),
GcpBearerAuthFilter.getGcpBearerAuthCustomProviderClass())) {
return compatibilityResponse.getCompatibility();
} else {
return compatibilityResponse.getCompatibilityLevel();
}
}
}



14 changes: 14 additions & 0 deletions api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.io.FileInputStream;
import java.security.KeyStore;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import javax.net.ssl.KeyManagerFactory;
Expand All @@ -24,16 +26,20 @@
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.util.ResourceUtils;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;


public class WebClientConfigurator {

private final WebClient.Builder builder = WebClient.builder();
private HttpClient httpClient = HttpClient
.create()
.proxyWithSystemProperties();

private final List<ExchangeFilterFunction> filters = new ArrayList<>();

public WebClientConfigurator() {
configureObjectMapper(defaultOM());
}
Expand All @@ -45,6 +51,13 @@ private static ObjectMapper defaultOM() {
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

public WebClientConfigurator filter(ExchangeFilterFunction filter) {
if (filter != null) {
this.filters.add(filter);
}
return this;
}

public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
@Nullable ClustersProperties.KeystoreConfig keystoreConfig) {
if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) {
Expand Down Expand Up @@ -151,6 +164,7 @@ public WebClientConfigurator configureResponseTimeout(Duration responseTimeout)
}

public WebClient build() {
builder.filters(filterList -> filterList.addAll(this.filters));
return builder.clientConnector(new ReactorClientHttpConnector(httpClient)).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.kafbat.ui.util.gcp;

import com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider;
import org.jetbrains.annotations.NotNull;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import reactor.core.publisher.Mono;

public class GcpBearerAuthFilter implements ExchangeFilterFunction {

private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS =
GcpBearerAuthCredentialProvider.class.getName();

private final GcpBearerAuthCredentialProvider credentialProvider;

public GcpBearerAuthFilter() {
this.credentialProvider = new GcpBearerAuthCredentialProvider();
}

@NotNull
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
// This Mono ensures token fetching happens for EACH request
return Mono.fromCallable(() -> this.credentialProvider.getBearerToken(null))
.flatMap(token -> {
// Create a new request with the Authorization header
ClientRequest newRequest = ClientRequest.from(request)
.headers(headers -> headers.setBearerAuth(token))
.build();
// Pass the new request to the next filter in the chain
return next.exchange(newRequest);
});
}

public static String getGcpBearerAuthCustomProviderClass() {
return GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class SchemaRegistrySerdeTest {
@BeforeEach
void init() {
serde = new SchemaRegistrySerde();
serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", true);
serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", true);
}

@ParameterizedTest
Expand Down Expand Up @@ -135,7 +135,7 @@ class SerdeWithDisabledSubjectExistenceCheck {

@BeforeEach
void init() {
serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", false);
serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", false);
}

@Test
Expand All @@ -151,7 +151,7 @@ class SerdeWithEnabledSubjectExistenceCheck {

@BeforeEach
void init() {
serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", true);
serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", true);
}

@Test
Expand Down
2 changes: 2 additions & 0 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4334,6 +4334,8 @@ components:
type: string
password:
type: string
bearerAuthCustomProviderClass:
type: string
schemaRegistrySsl:
type: object
properties:
Expand Down
Loading
Loading