Skip to content

BE: SR: Add compatibility w/ GCP SR #1153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ public static class Cluster {

AuditProperties audit;

boolean gcpSchemaRegistry = false;

}

@Data
Expand Down Expand Up @@ -116,6 +114,7 @@ public static class ConnectCluster {
public static class SchemaRegistryAuth {
String username;
String password;
String bearerAuthCustomProviderClass;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ interface MessageFormatter {
String format(String topic, byte[] value);

static Map<SchemaType, MessageFormatter> createMap(SchemaRegistryClient schemaRegistryClient,
boolean gcpSchemaRegistry) {
//boolean gcpSchemaRegistry) {
String bearerAuthCustomProviderClass) {
return Map.of(
SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient, gcpSchemaRegistry),
SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient, bearerAuthCustomProviderClass),
SchemaType.JSON, new JsonSchemaMessageFormatter(schemaRegistryClient),
SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient)
);
Expand All @@ -31,7 +32,7 @@ SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient)
class AvroMessageFormatter implements MessageFormatter {
private final KafkaAvroDeserializer avroDeserializer;

AvroMessageFormatter(SchemaRegistryClient client, boolean gcpSchemaRegistry) {
AvroMessageFormatter(SchemaRegistryClient client, String bearerAuthCustomProviderClass) {
this.avroDeserializer = new KafkaAvroDeserializer(client);

final Map<String, Object> avroProps = new HashMap<>();
Expand All @@ -40,10 +41,10 @@ class AvroMessageFormatter implements MessageFormatter {
avroProps.put(KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false);
avroProps.put(KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true);

if (gcpSchemaRegistry) {
if (bearerAuthCustomProviderClass != null && !bearerAuthCustomProviderClass.isBlank()) {
avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CREDENTIALS_SOURCE, "CUSTOM");
avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS,
"class com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider");
String.format("class %s", bearerAuthCustomProviderClass));
}

this.avroDeserializer.configure(avroProps, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public class SchemaRegistrySerde implements BuiltInSerde {
private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0;
private static final int SR_PAYLOAD_PREFIX_LENGTH = 5;
private static final String CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE = "CUSTOM";
private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS =
"com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider";

public static String name() {
return "SchemaRegistry";
Expand Down Expand Up @@ -80,13 +78,15 @@ public void autoConfigure(PropertyResolver kafkaClusterProperties,
urls,
kafkaClusterProperties.getProperty("schemaRegistryAuth.username", String.class).orElse(null),
kafkaClusterProperties.getProperty("schemaRegistryAuth.password", String.class).orElse(null),
kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class)
.orElse(null),
kafkaClusterProperties.getProperty("schemaRegistrySsl.keystoreLocation", String.class).orElse(null),
kafkaClusterProperties.getProperty("schemaRegistrySsl.keystorePassword", String.class).orElse(null),
kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null),
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null),
kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false)
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null)
),
kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false),
kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class)
.orElse(null),
kafkaClusterProperties.getProperty("schemaRegistryKeySchemaNameTemplate", String.class).orElse("%s-key"),
kafkaClusterProperties.getProperty("schemaRegistrySchemaNameTemplate", String.class).orElse("%s-value"),
kafkaClusterProperties.getProperty("schemaRegistryCheckSchemaExistenceForDeserialize", Boolean.class)
Expand All @@ -108,13 +108,15 @@ public void configure(PropertyResolver serdeProperties,
urls,
serdeProperties.getProperty("username", String.class).orElse(null),
serdeProperties.getProperty("password", String.class).orElse(null),
kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class)
.orElse(null),
serdeProperties.getProperty("keystoreLocation", String.class).orElse(null),
serdeProperties.getProperty("keystorePassword", String.class).orElse(null),
kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null),
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null),
kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false)
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null)
),
kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false),
kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class)
.orElse(null),
serdeProperties.getProperty("keySchemaNameTemplate", String.class).orElse("%s-key"),
serdeProperties.getProperty("schemaNameTemplate", String.class).orElse("%s-value"),
serdeProperties.getProperty("checkSchemaExistenceForDeserialize", Boolean.class)
Expand All @@ -126,26 +128,27 @@ public void configure(PropertyResolver serdeProperties,
void configure(
List<String> schemaRegistryUrls,
SchemaRegistryClient schemaRegistryClient,
boolean gcpSchemaRegistry,
String bearerAuthCustomProviderClass,
String keySchemaNameTemplate,
String valueSchemaNameTemplate,
boolean checkTopicSchemaExistenceForDeserialize) {
this.schemaRegistryUrls = schemaRegistryUrls;
this.schemaRegistryClient = schemaRegistryClient;
this.keySchemaNameTemplate = keySchemaNameTemplate;
this.valueSchemaNameTemplate = valueSchemaNameTemplate;
this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient, gcpSchemaRegistry);
this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient, bearerAuthCustomProviderClass);
this.checkSchemaExistenceForDeserialize = checkTopicSchemaExistenceForDeserialize;
}

private static SchemaRegistryClient createSchemaRegistryClient(List<String> urls,
@Nullable String username,
@Nullable String password,
@Nullable String bearerAuthCustomProviderClass,
@Nullable String keyStoreLocation,
@Nullable String keyStorePassword,
@Nullable String trustStoreLocation,
@Nullable String trustStorePassword,
boolean gcpSchemaRegistry) {
@Nullable String trustStorePassword
) {
Map<String, String> configs = new HashMap<>();
if (username != null && password != null) {
configs.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
Expand Down Expand Up @@ -175,9 +178,9 @@ private static SchemaRegistryClient createSchemaRegistryClient(List<String> urls
keyStorePassword);
}

if (gcpSchemaRegistry) {
if (bearerAuthCustomProviderClass != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called a class? The implementation rather retrieves a token via custom implementation from GCP which is later used as a bearer token value, there are no "classes" in use per se.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use this to reference this class. And I changed the bearer token implementation for the WebClient using this class too. Sorry 🙏 .

configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CREDENTIALS_SOURCE, CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE);
configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS);
configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, bearerAuthCustomProviderClass);
}

return new CachedSchemaRegistryClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,15 @@ private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterPrope

private ReactiveFailover<KafkaSrClientApi> schemaRegistryClient(ClustersProperties.Cluster clusterProperties) {


//System.out.println("Creating Schema Registry Client for cluster: " + clusterProperties.getName());
var auth = Optional.ofNullable(clusterProperties.getSchemaRegistryAuth())
.orElse(new ClustersProperties.SchemaRegistryAuth());
//System.out.println("Auth details: " + auth.toString());
WebClient webClient = new WebClientConfigurator()
.configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl())
.configureBasicAuth(auth.getUsername(), auth.getPassword())
.configureGcpBearerAuth(clusterProperties.isGcpSchemaRegistry())
.configureBearerTokenAuth(auth.getBearerAuthCustomProviderClass())
.configureBufferSize(webClientMaxBuffSize)
.build();
return ReactiveFailover.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.kafbat.ui.sr.model.SchemaSubject;
import io.kafbat.ui.util.ReactiveFailover;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand All @@ -34,6 +35,9 @@ public class SchemaRegistryService {

private static final String LATEST = "latest";

private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS =
"com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider";

@AllArgsConstructor
public static class SubjectWithCompatibilityLevel {
@Delegate
Expand Down Expand Up @@ -148,21 +152,14 @@ public Mono<Compatibility> getSchemaCompatibilityLevel(KafkaCluster cluster,
String schemaName) {
return api(cluster)
.mono(c -> c.getSubjectCompatibilityLevel(schemaName, true))
.map(compatibilityConfig ->
cluster.getOriginalProperties().isGcpSchemaRegistry()
? compatibilityConfig.getCompatibility()
: compatibilityConfig.getCompatibilityLevel())
.map(compatibilityConfig -> selectCompatibilityFormat(cluster, compatibilityConfig))
.onErrorResume(error -> Mono.empty());
}

public Mono<Compatibility> getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) {
return api(cluster)
.mono(KafkaSrClientApi::getGlobalCompatibilityLevel)
.map(compatibilityConfig ->
cluster.getOriginalProperties().isGcpSchemaRegistry()
? compatibilityConfig.getCompatibility()
: compatibilityConfig.getCompatibilityLevel()
);
.map(compatibilityConfig -> selectCompatibilityFormat(cluster, compatibilityConfig));
}

private Mono<Compatibility> getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster,
Expand All @@ -176,4 +173,17 @@ public Mono<CompatibilityCheckResponse> checksSchemaCompatibility(KafkaCluster c
NewSubject newSchemaSubject) {
return api(cluster).mono(c -> c.checkSchemaCompatibility(schemaName, LATEST, true, newSchemaSubject));
}

private Compatibility selectCompatibilityFormat(KafkaCluster cluster, CompatibilityConfig compatibilityConfig) {
if (cluster.getOriginalProperties().getSchemaRegistryAuth() != null
&& Objects.equals(cluster.getOriginalProperties().getSchemaRegistryAuth().getBearerAuthCustomProviderClass(),
GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS)) {
return compatibilityConfig.getCompatibility();
} else {
return compatibilityConfig.getCompatibilityLevel();
}
}
}



12 changes: 9 additions & 3 deletions api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
import java.security.KeyStore;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import javax.validation.constraints.Null;
import lombok.SneakyThrows;
import org.openapitools.jackson.nullable.JsonNullableModule;
import org.springframework.http.MediaType;
Expand All @@ -36,6 +38,9 @@

public class WebClientConfigurator {

private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS =
"com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider";

private final WebClient.Builder builder = WebClient.builder();
private HttpClient httpClient = HttpClient
.create()
Expand All @@ -52,9 +57,10 @@ private static ObjectMapper defaultOM() {
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

public WebClientConfigurator configureGcpBearerAuth(boolean enabled) {
if (enabled) {
System.out.println("Configuring GCP Bearer Auth");
public WebClientConfigurator configureBearerTokenAuth(@Nullable String bearerAuthCustomProviderClass) {
//System.out.println("Configuring GCP Bearer Auth in web client");
//System.out.println("Bearer Auth Custom Provider Class: " + bearerAuthCustomProviderClass);
if (Objects.equals(bearerAuthCustomProviderClass, GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS)) {
builder.filter(createGcpBearerAuthFilter());
}
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class SchemaRegistrySerdeTest {
@BeforeEach
void init() {
serde = new SchemaRegistrySerde();
serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", true);
serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", true);
}

@ParameterizedTest
Expand Down Expand Up @@ -135,7 +135,7 @@ class SerdeWithDisabledSubjectExistenceCheck {

@BeforeEach
void init() {
serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", false);
serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", false);
}

@Test
Expand All @@ -151,7 +151,7 @@ class SerdeWithEnabledSubjectExistenceCheck {

@BeforeEach
void init() {
serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", true);
serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", true);
}

@Test
Expand Down
4 changes: 2 additions & 2 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4334,15 +4334,15 @@ components:
type: string
password:
type: string
bearerAuthCustomProviderClass:
type: string
schemaRegistrySsl:
type: object
properties:
keystoreLocation:
type: string
keystorePassword:
type: string
gcpSchemaRegistry:
type: boolean
ksqldbServer:
type: string
ksqldbServerSsl:
Expand Down
Loading