Skip to content

Commit ccef54a

Browse files
authored
CI: Implement mypy pre-commit check (#643)
1 parent 0ac2ff1 commit ccef54a

File tree

27 files changed

+233
-134
lines changed

27 files changed

+233
-134
lines changed

.github/workflows/ci.yml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@ jobs:
2424
- name: Setup Python
2525
uses: actions/setup-python@v4
2626
with:
27-
python-version: 3.9
27+
python-version: 3.12
28+
- name: Install requirements
29+
run: |
30+
python -m pip install -U pip
31+
python -m pip install -U -r requirements.txt -r tests/requirements.txt -r requirements-mypy.txt
2832
- uses: pre-commit/action@v3.0.1
2933

3034
test:
@@ -44,13 +48,10 @@ jobs:
4448
uses: actions/setup-python@v4
4549
with:
4650
python-version: ${{ matrix.python }}
47-
- name: Update pip
48-
run: |
49-
python -m pip install -U pip
5051
- name: Install requirements
5152
run: |
52-
python -m pip install -U -r tests/requirements.txt
53-
python -m pip install -U -r requirements.txt
53+
python -m pip install -U pip
54+
python -m pip install -U -r requirements.txt -r tests/requirements.txt
5455
- name: Run tests
5556
run: |
5657
python -m pytest -v --log-cli-level=ERROR

.pre-commit-config.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,10 @@ repos:
1111
entry: python conda/requirements.py
1212
language: python
1313
files: ^(requirements\.txt|pyproject\.toml)$
14+
- repo: https://github.com/pre-commit/mirrors-mypy
15+
rev: v1.13.0
16+
hooks:
17+
- id: mypy
18+
args: []
19+
language: system
20+
files: ^quixstreams/

pyproject.toml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,26 @@ log_cli_level = "INFO"
9898
log_cli_format = "[%(levelname)s] %(name)s: %(message)s"
9999
# Custom markers
100100
markers = ["timeit"]
101+
102+
[[tool.mypy.overrides]]
103+
module = "confluent_kafka.*"
104+
ignore_missing_imports = true
105+
106+
[[tool.mypy.overrides]]
107+
module = [
108+
"quixstreams.sinks.community.*",
109+
"quixstreams.sources.community.*",
110+
]
111+
ignore_errors = true
112+
113+
[[tool.mypy.overrides]]
114+
module = [
115+
"quixstreams.core.*",
116+
"quixstreams.dataframe.*",
117+
"quixstreams.models.*",
118+
"quixstreams.platforms.*",
119+
"quixstreams.state.*",
120+
"quixstreams.rowproducer.*"
121+
]
122+
ignore_errors = true
123+

quixstreams/app.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
import time
77
import warnings
88
from pathlib import Path
9-
from typing import Callable, List, Literal, Optional, Tuple, Type, Union
9+
from typing import Callable, List, Literal, Optional, Protocol, Tuple, Type, Union
1010

1111
from confluent_kafka import TopicPartition
1212
from pydantic import AliasGenerator, Field
13+
from pydantic_settings import BaseSettings as PydanticBaseSettings
1314
from pydantic_settings import PydanticBaseSettingsSource, SettingsConfigDict
1415
from typing_extensions import Self
1516

@@ -60,6 +61,17 @@
6061
_default_max_poll_interval_ms = 300000
6162

6263

64+
class TopicManagerFactory(Protocol):
65+
def __call__(
66+
self,
67+
topic_admin: TopicAdmin,
68+
consumer_group: str,
69+
timeout: float = 30,
70+
create_timeout: float = 60,
71+
auto_create_topics: bool = True,
72+
) -> TopicManager: ...
73+
74+
6375
class Application:
6476
"""
6577
The main Application class.
@@ -205,19 +217,21 @@ def __init__(
205217
producer_extra_config = producer_extra_config or {}
206218
consumer_extra_config = consumer_extra_config or {}
207219

220+
state_dir = Path(state_dir)
221+
208222
# We can't use os.getenv as defaults (and have testing work nicely)
209223
# since it evaluates getenv when the function is defined.
210224
# In general this is just a most robust approach.
211225
broker_address = broker_address or os.getenv("Quix__Broker__Address")
212226
quix_sdk_token = quix_sdk_token or os.getenv("Quix__Sdk__Token")
213-
consumer_group = consumer_group or os.getenv(
214-
"Quix__Consumer_Group", "quixstreams-default"
215-
)
227+
228+
if not consumer_group:
229+
consumer_group = os.getenv("Quix__Consumer_Group", "quixstreams-default")
216230

217231
if broker_address:
218232
# If broker_address is passed to the app it takes priority over any quix configuration
219233
self._is_quix_app = False
220-
self._topic_manager_factory = TopicManager
234+
self._topic_manager_factory: TopicManagerFactory = TopicManager
221235
if isinstance(broker_address, str):
222236
broker_address = ConnectionConfig(bootstrap_servers=broker_address)
223237
else:
@@ -249,7 +263,6 @@ def __init__(
249263
QuixTopicManager, quix_config_builder=quix_config_builder
250264
)
251265
# Check if the state dir points to the mounted PVC while running on Quix
252-
state_dir = Path(state_dir)
253266
check_state_dir(state_dir=state_dir)
254267
quix_app_config = quix_config_builder.get_application_config(consumer_group)
255268

@@ -487,12 +500,13 @@ def dataframe(
487500
:param source: a `quixstreams.sources` "BaseSource" instance
488501
:return: `StreamingDataFrame` object
489502
"""
490-
if not source and not topic:
491-
raise ValueError("one of `source` or `topic` is required")
492503

493-
if source:
504+
if source is not None:
494505
topic = self.add_source(source, topic)
495506

507+
if topic is None:
508+
raise ValueError("one of `source` or `topic` is required")
509+
496510
sdf = StreamingDataFrame(
497511
topic=topic,
498512
topic_manager=self._topic_manager,
@@ -1018,7 +1032,7 @@ class ApplicationConfig(BaseSettings):
10181032
@classmethod
10191033
def settings_customise_sources(
10201034
cls,
1021-
settings_cls: Type[BaseSettings],
1035+
settings_cls: Type[PydanticBaseSettings],
10221036
init_settings: PydanticBaseSettingsSource,
10231037
env_settings: PydanticBaseSettingsSource,
10241038
dotenv_settings: PydanticBaseSettingsSource,

quixstreams/checkpointing/checkpoint.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from confluent_kafka import KafkaException, TopicPartition
77

8-
from quixstreams.kafka import Consumer
8+
from quixstreams.kafka import BaseConsumer
99
from quixstreams.processing.pausing import PausingManager
1010
from quixstreams.rowproducer import RowProducer
1111
from quixstreams.sinks import SinkManager
@@ -48,7 +48,7 @@ def __init__(
4848
# processed offsets within the checkpoint
4949
self._starting_tp_offsets: Dict[Tuple[str, int], int] = {}
5050
# A mapping of <(topic, partition, store_name): PartitionTransaction>
51-
self._store_transactions: Dict[(str, int, str), PartitionTransaction] = {}
51+
self._store_transactions: Dict[Tuple[str, int, str], PartitionTransaction] = {}
5252
# Passing zero or lower will flush the checkpoint after each processed message
5353
self._commit_interval = max(commit_interval, 0)
5454

@@ -123,7 +123,7 @@ def __init__(
123123
self,
124124
commit_interval: float,
125125
producer: RowProducer,
126-
consumer: Consumer,
126+
consumer: BaseConsumer,
127127
state_manager: StateStoreManager,
128128
sink_manager: SinkManager,
129129
pausing_manager: PausingManager,

quixstreams/context.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
"copy_context",
1212
)
1313

14-
_current_message_context = ContextVar("current_message_context")
14+
_current_message_context: ContextVar[Optional[MessageContext]] = ContextVar(
15+
"current_message_context"
16+
)
1517

1618

1719
class MessageContextNotSetError(QuixException): ...
@@ -48,7 +50,7 @@ def alter_context(value):
4850
_current_message_context.set(context)
4951

5052

51-
def message_context() -> MessageContext:
53+
def message_context() -> Optional[MessageContext]:
5254
"""
5355
Get a MessageContext for the current message, which houses most of the message
5456
metadata, like:
@@ -74,6 +76,5 @@ def message_context() -> MessageContext:
7476
"""
7577
try:
7678
return _current_message_context.get()
77-
7879
except LookupError:
7980
raise MessageContextNotSetError("Message context is not set")

quixstreams/kafka/configuration.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@
33
import pydantic
44
from pydantic import AliasChoices, Field, SecretStr
55
from pydantic.functional_validators import BeforeValidator
6-
from pydantic_settings import PydanticBaseSettingsSource
6+
from pydantic_settings import (
7+
BaseSettings as PydanticBaseSettings,
8+
)
9+
from pydantic_settings import (
10+
PydanticBaseSettingsSource,
11+
)
712
from typing_extensions import Annotated, Self
813

914
from quixstreams.utils.settings import BaseSettings
@@ -93,7 +98,7 @@ class ConnectionConfig(BaseSettings):
9398
@classmethod
9499
def settings_customise_sources(
95100
cls,
96-
settings_cls: Type[BaseSettings],
101+
settings_cls: Type[PydanticBaseSettings],
97102
init_settings: PydanticBaseSettingsSource,
98103
env_settings: PydanticBaseSettingsSource,
99104
dotenv_settings: PydanticBaseSettingsSource,

quixstreams/kafka/consumer.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import functools
22
import logging
33
import typing
4-
from typing import Callable, List, Optional, Tuple, Union
4+
from typing import Any, Callable, List, Optional, Tuple, Union
55

66
from confluent_kafka import (
77
Consumer as ConfluentConsumer,
@@ -18,6 +18,7 @@
1818
from .configuration import ConnectionConfig
1919

2020
__all__ = (
21+
"BaseConsumer",
2122
"Consumer",
2223
"AutoOffsetReset",
2324
"RebalancingCallback",
@@ -64,7 +65,7 @@ def wrapper(*args, **kwargs):
6465
return wrapper
6566

6667

67-
class Consumer:
68+
class BaseConsumer:
6869
def __init__(
6970
self,
7071
broker_address: Union[str, ConnectionConfig],
@@ -147,7 +148,7 @@ def poll(self, timeout: Optional[float] = None) -> Optional[Message]:
147148
"""
148149
return self._consumer.poll(timeout=timeout if timeout is not None else -1)
149150

150-
def subscribe(
151+
def _subscribe(
151152
self,
152153
topics: List[str],
153154
on_assign: Optional[RebalancingCallback] = None,
@@ -302,7 +303,8 @@ def commit(
302303
raise ValueError(
303304
'Parameters "message" and "offsets" are mutually exclusive'
304305
)
305-
kwargs = {
306+
307+
kwargs: dict[str, Any] = {
306308
"asynchronous": asynchronous,
307309
}
308310
if offsets is not None:
@@ -559,3 +561,14 @@ def __enter__(self):
559561

560562
def __exit__(self, exc_type, exc_val, exc_tb):
561563
self.close()
564+
565+
566+
class Consumer(BaseConsumer):
567+
def subscribe(
568+
self,
569+
topics: List[str],
570+
on_assign: Optional[RebalancingCallback] = None,
571+
on_revoke: Optional[RebalancingCallback] = None,
572+
on_lost: Optional[RebalancingCallback] = None,
573+
):
574+
return super()._subscribe(topics, on_assign, on_revoke, on_lost)

quixstreams/kafka/producer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def __init__(
4646
logger: logging.Logger = logger,
4747
error_callback: Callable[[KafkaError], None] = _default_error_cb,
4848
extra_config: Optional[dict] = None,
49-
flush_timeout: Optional[int] = None,
49+
flush_timeout: Optional[float] = None,
5050
):
5151
"""
5252
A wrapper around `confluent_kafka.Producer`.
@@ -190,7 +190,7 @@ def __init__(
190190
logger: logging.Logger = logger,
191191
error_callback: Callable[[KafkaError], None] = _default_error_cb,
192192
extra_config: Optional[dict] = None,
193-
flush_timeout: Optional[int] = None,
193+
flush_timeout: Optional[float] = None,
194194
):
195195
super().__init__(
196196
broker_address=broker_address,

quixstreams/platforms/quix/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class QuixApplicationConfig:
7777

7878
librdkafka_connection_config: ConnectionConfig
7979
librdkafka_extra_config: dict
80-
consumer_group: Optional[str] = None
80+
consumer_group: str
8181

8282

8383
class QuixKafkaConfigsBuilder:

0 commit comments

Comments
 (0)