Skip to content

BE: SR: Add compatibility w/ GCP SR #1153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public static class Cluster {
List<@Valid Masking> masking;

AuditProperties audit;

}

@Data
Expand Down Expand Up @@ -113,6 +114,7 @@ public static class ConnectCluster {
public static class SchemaRegistryAuth {
String username;
String password;
String bearerAuthCustomProviderClass;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.kafbat.ui.util.jsonschema.JsonAvroConversion;
import java.util.HashMap;
import java.util.Map;
import lombok.SneakyThrows;

interface MessageFormatter {

String format(String topic, byte[] value);

static Map<SchemaType, MessageFormatter> createMap(SchemaRegistryClient schemaRegistryClient) {
static Map<SchemaType, MessageFormatter> createMap(SchemaRegistryClient schemaRegistryClient,
String bearerAuthCustomProviderClass) {
return Map.of(
SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient),
SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient, bearerAuthCustomProviderClass),
SchemaType.JSON, new JsonSchemaMessageFormatter(schemaRegistryClient),
SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient)
);
Expand All @@ -29,17 +31,23 @@ SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient)
class AvroMessageFormatter implements MessageFormatter {
private final KafkaAvroDeserializer avroDeserializer;

AvroMessageFormatter(SchemaRegistryClient client) {
AvroMessageFormatter(SchemaRegistryClient client, String bearerAuthCustomProviderClass) {
this.avroDeserializer = new KafkaAvroDeserializer(client);
this.avroDeserializer.configure(
Map.of(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused",
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false,
KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false,
KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true
),
false
);

final Map<String, Object> avroProps = new HashMap<>();
avroProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused");
avroProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
avroProps.put(KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false);
avroProps.put(KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true);

if (bearerAuthCustomProviderClass != null && !bearerAuthCustomProviderClass.isBlank()) {
avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CREDENTIALS_SOURCE, "CUSTOM");
avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS,
String.format("class %s", bearerAuthCustomProviderClass));
}

this.avroDeserializer.configure(avroProps, false);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {

private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0;
private static final int SR_PAYLOAD_PREFIX_LENGTH = 5;
private static final String CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE = "CUSTOM";

public static String name() {
return "SchemaRegistry";
Expand Down Expand Up @@ -77,11 +78,15 @@ public void autoConfigure(PropertyResolver kafkaClusterProperties,
urls,
kafkaClusterProperties.getProperty("schemaRegistryAuth.username", String.class).orElse(null),
kafkaClusterProperties.getProperty("schemaRegistryAuth.password", String.class).orElse(null),
kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class)
.orElse(null),
kafkaClusterProperties.getProperty("schemaRegistrySsl.keystoreLocation", String.class).orElse(null),
kafkaClusterProperties.getProperty("schemaRegistrySsl.keystorePassword", String.class).orElse(null),
kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null),
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null)
),
kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class)
.orElse(null),
kafkaClusterProperties.getProperty("schemaRegistryKeySchemaNameTemplate", String.class).orElse("%s-key"),
kafkaClusterProperties.getProperty("schemaRegistrySchemaNameTemplate", String.class).orElse("%s-value"),
kafkaClusterProperties.getProperty("schemaRegistryCheckSchemaExistenceForDeserialize", Boolean.class)
Expand All @@ -103,11 +108,15 @@ public void configure(PropertyResolver serdeProperties,
urls,
serdeProperties.getProperty("username", String.class).orElse(null),
serdeProperties.getProperty("password", String.class).orElse(null),
kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class)
.orElse(null),
serdeProperties.getProperty("keystoreLocation", String.class).orElse(null),
serdeProperties.getProperty("keystorePassword", String.class).orElse(null),
kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null),
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null)
),
kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class)
.orElse(null),
serdeProperties.getProperty("keySchemaNameTemplate", String.class).orElse("%s-key"),
serdeProperties.getProperty("schemaNameTemplate", String.class).orElse("%s-value"),
serdeProperties.getProperty("checkSchemaExistenceForDeserialize", Boolean.class)
Expand All @@ -119,24 +128,27 @@ public void configure(PropertyResolver serdeProperties,
void configure(
List<String> schemaRegistryUrls,
SchemaRegistryClient schemaRegistryClient,
String bearerAuthCustomProviderClass,
String keySchemaNameTemplate,
String valueSchemaNameTemplate,
boolean checkTopicSchemaExistenceForDeserialize) {
this.schemaRegistryUrls = schemaRegistryUrls;
this.schemaRegistryClient = schemaRegistryClient;
this.keySchemaNameTemplate = keySchemaNameTemplate;
this.valueSchemaNameTemplate = valueSchemaNameTemplate;
this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient);
this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient, bearerAuthCustomProviderClass);
this.checkSchemaExistenceForDeserialize = checkTopicSchemaExistenceForDeserialize;
}

private static SchemaRegistryClient createSchemaRegistryClient(List<String> urls,
@Nullable String username,
@Nullable String password,
@Nullable String bearerAuthCustomProviderClass,
@Nullable String keyStoreLocation,
@Nullable String keyStorePassword,
@Nullable String trustStoreLocation,
@Nullable String trustStorePassword) {
@Nullable String trustStorePassword
) {
Map<String, String> configs = new HashMap<>();
if (username != null && password != null) {
configs.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
Expand Down Expand Up @@ -166,6 +178,11 @@ private static SchemaRegistryClient createSchemaRegistryClient(List<String> urls
keyStorePassword);
}

if (bearerAuthCustomProviderClass != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called a class? The implementation rather retrieves a token via custom implementation from GCP which is later used as a bearer token value, there are no "classes" in use per se.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use this to reference this class. And I changed the bearer token implementation for the WebClient using this class too. Sorry 🙏 .

configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CREDENTIALS_SOURCE, CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE);
configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, bearerAuthCustomProviderClass);
}

return new CachedSchemaRegistryClient(
urls,
1_000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,13 @@ private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterPrope
}

private ReactiveFailover<KafkaSrClientApi> schemaRegistryClient(ClustersProperties.Cluster clusterProperties) {

var auth = Optional.ofNullable(clusterProperties.getSchemaRegistryAuth())
.orElse(new ClustersProperties.SchemaRegistryAuth());
WebClient webClient = new WebClientConfigurator()
.configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl())
.configureBasicAuth(auth.getUsername(), auth.getPassword())
.configureBearerTokenAuth(auth.getBearerAuthCustomProviderClass())
.configureBufferSize(webClientMaxBuffSize)
.build();
return ReactiveFailover.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.kafbat.ui.sr.model.SchemaSubject;
import io.kafbat.ui.util.ReactiveFailover;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand All @@ -34,6 +35,9 @@ public class SchemaRegistryService {

private static final String LATEST = "latest";

private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS =
"com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider";

@AllArgsConstructor
public static class SubjectWithCompatibilityLevel {
@Delegate
Expand Down Expand Up @@ -148,14 +152,14 @@ public Mono<Compatibility> getSchemaCompatibilityLevel(KafkaCluster cluster,
String schemaName) {
return api(cluster)
.mono(c -> c.getSubjectCompatibilityLevel(schemaName, true))
.map(CompatibilityConfig::getCompatibilityLevel)
.map(compatibilityConfig -> selectCompatibilityFormat(cluster, compatibilityConfig))
.onErrorResume(error -> Mono.empty());
}

public Mono<Compatibility> getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) {
return api(cluster)
.mono(KafkaSrClientApi::getGlobalCompatibilityLevel)
.map(CompatibilityConfig::getCompatibilityLevel);
.map(compatibilityConfig -> selectCompatibilityFormat(cluster, compatibilityConfig));
}

private Mono<Compatibility> getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster,
Expand All @@ -169,4 +173,17 @@ public Mono<CompatibilityCheckResponse> checksSchemaCompatibility(KafkaCluster c
NewSubject newSchemaSubject) {
return api(cluster).mono(c -> c.checkSchemaCompatibility(schemaName, LATEST, true, newSchemaSubject));
}

private Compatibility selectCompatibilityFormat(KafkaCluster cluster, CompatibilityConfig compatibilityConfig) {
if (cluster.getOriginalProperties().getSchemaRegistryAuth() != null
&& Objects.equals(cluster.getOriginalProperties().getSchemaRegistryAuth().getBearerAuthCustomProviderClass(),
GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS)) {
return compatibilityConfig.getCompatibility();
} else {
return compatibilityConfig.getCompatibilityLevel();
}
}
}



42 changes: 42 additions & 0 deletions api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.auth.oauth2.GoogleCredentials;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.exception.ValidationException;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import javax.net.ssl.KeyManagerFactory;
Expand All @@ -24,11 +28,18 @@
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.util.ResourceUtils;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;


public class WebClientConfigurator {

private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS =
"com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider";

private final WebClient.Builder builder = WebClient.builder();
private HttpClient httpClient = HttpClient
.create()
Expand All @@ -45,6 +56,37 @@ private static ObjectMapper defaultOM() {
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

public WebClientConfigurator configureBearerTokenAuth(@Nullable String bearerAuthCustomProviderClass) {
if (Objects.equals(bearerAuthCustomProviderClass, GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS)) {
builder.filter(createGcpBearerAuthFilter());
}
return this;
}

private ExchangeFilterFunction createGcpBearerAuthFilter() {
return (request, next) -> {
return Mono.fromCallable(() -> {
try {
// Get credentials using Application Default Credentials (from the GKE service account)
GoogleCredentials credentials = GoogleCredentials.getApplicationDefault()
.createScoped(Collections.singleton("https://www.googleapis.com/auth/cloud-platform"));

credentials.refreshIfExpired();
return credentials.getAccessToken().getTokenValue();
} catch (IOException e) {
throw new RuntimeException("Failed to get GCP access token", e);
}
})
.flatMap(token -> {
ClientRequest newRequest = ClientRequest.from(request)
// Add the Authorization header
.headers(headers -> headers.setBearerAuth(token))
.build();
return next.exchange(newRequest);
});
};
}

public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
@Nullable ClustersProperties.KeystoreConfig keystoreConfig) {
if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class SchemaRegistrySerdeTest {
@BeforeEach
void init() {
serde = new SchemaRegistrySerde();
serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", true);
serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", true);
}

@ParameterizedTest
Expand Down Expand Up @@ -135,7 +135,7 @@ class SerdeWithDisabledSubjectExistenceCheck {

@BeforeEach
void init() {
serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", false);
serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", false);
}

@Test
Expand All @@ -151,7 +151,7 @@ class SerdeWithEnabledSubjectExistenceCheck {

@BeforeEach
void init() {
serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", true);
serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", true);
}

@Test
Expand Down
2 changes: 2 additions & 0 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4334,6 +4334,8 @@ components:
type: string
password:
type: string
bearerAuthCustomProviderClass:
type: string
schemaRegistrySsl:
type: object
properties:
Expand Down
9 changes: 7 additions & 2 deletions contract/src/main/resources/swagger/kafka-sr-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,13 @@ components:
properties:
compatibilityLevel:
$ref: '#/components/schemas/Compatibility'
required:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why has compatibility become no longer required?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to change it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I finally added a new compatibilityConfigGcp schema detached from the existent compatibilityConfig. And I had to modify code in SchemaRegistryService class.

If this new modification is not ok then please let me know if you have any recommendation about how to address this as the only alternative I can think of is to add a new kafka-gcp-sr-api and a new GcpSchemaRegistryService class.

- compatibilityLevel
# GCP Managed Kafka Schema registries specific fields
alias:
type: string
compatibility:
$ref: '#/components/schemas/Compatibility'
normalize:
type: boolean

CompatibilityLevelChange:
type: object
Expand Down
Loading