Skip to content

Commit 61023fb

Browse files
authored
Fix invalid mapping for oauth_cb in BaseSettings (#606)
Also remove `oauthbearer_token_refresh_cb` since it's the same as `oauth_cb`
1 parent fa55253 commit 61023fb

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

quixstreams/kafka/configuration.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from typing import Callable, Literal, Optional, Tuple, Type
22

3+
import pydantic
34
from pydantic import AliasChoices, Field, SecretStr
45
from pydantic.functional_validators import BeforeValidator
56
from pydantic_settings import PydanticBaseSettingsSource
@@ -44,11 +45,17 @@ class ConnectionConfig(BaseSettings):
4445
sasl_kerberos_min_time_before_relogin: Optional[int] = None
4546
sasl_kerberos_service_name: Optional[str] = None
4647
sasl_kerberos_principal: Optional[str] = None
48+
4749
# for oauth_cb, see https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration
48-
oauth_cb: Optional[Callable[[str], Tuple[str, float]]] = None
50+
oauth_cb: Optional[Callable[[str], Tuple[str, float]]] = pydantic.Field(
51+
# Prevent the AliasGenerator from changing the field name to "oauth.cb"
52+
default=None,
53+
alias_priority=2,
54+
serialization_alias="oauth_cb",
55+
)
56+
4957
sasl_oauthbearer_config: Optional[str] = None
5058
enable_sasl_oauthbearer_unsecure_jwt: Optional[bool] = None
51-
oauthbearer_token_refresh_cb: Optional[Callable] = None
5259
sasl_oauthbearer_method: Annotated[
5360
Optional[Literal["default", "oidc"]],
5461
BeforeValidator(lambda v: v.lower() if v is not None else v),

tests/test_quixstreams/test_kafka/test_configuration.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,14 @@ def test_from_librdkafka_dict(self, mechanism_casing):
3131
"bootstrap.servers": "url",
3232
"sasl.mechanism": mechanism_casing,
3333
"sasl.username": "my-username",
34+
"oauth_cb": lambda _: _,
3435
}
3536
config = ConnectionConfig.from_librdkafka_dict(librdkafka_dict)
3637

3738
assert config.bootstrap_servers == librdkafka_dict["bootstrap.servers"]
3839
assert config.sasl_mechanism == librdkafka_dict["sasl.mechanism"].upper()
3940
assert config.sasl_username == librdkafka_dict["sasl.username"]
41+
assert config.oauth_cb == librdkafka_dict["oauth_cb"]
4042

4143
def test_from_librdkafka_dict_extras_raise(self):
4244
librdkafka_dict = {
@@ -96,6 +98,11 @@ def test_sasl_mechanism_aliases(self):
9698
assert "sasl.mechanism" in d
9799
assert "sasl.mechanisms" not in d
98100

101+
def test_oauth_cb(self):
102+
config = ConnectionConfig(bootstrap_servers="url", oauth_cb=lambda _: _)
103+
rd_config = config.as_librdkafka_dict()
104+
assert config.oauth_cb == rd_config["oauth_cb"]
105+
99106
def test_secret_field(self):
100107
"""
101108
Confirm a secret field is obscured

0 commit comments

Comments
 (0)