Skip to content

Commit f161800

Browse files
authored
typing (#876)
1 parent 3efd812 commit f161800

File tree

4 files changed

+32
-29
lines changed

4 files changed

+32
-29
lines changed

quixstreams/state/base/partition.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
from abc import ABC, abstractmethod
3-
from typing import TYPE_CHECKING, Generic, Literal, Optional, TypeVar, Union, cast
3+
from typing import TYPE_CHECKING, Literal, Optional, Type, Union
44

55
from quixstreams.state.metadata import (
66
Marker,
@@ -17,17 +17,15 @@
1717

1818
logger = logging.getLogger(__name__)
1919

20-
T = TypeVar("T", bound=PartitionTransaction)
2120

22-
23-
class StorePartition(ABC, Generic[T]):
21+
class StorePartition(ABC):
2422
"""
2523
A base class to access state in the underlying storage.
2624
It represents a single instance of some storage (e.g. a single database for
2725
the persistent storage).
2826
"""
2927

30-
partition_transaction_class = PartitionTransaction
28+
partition_transaction_class: Type[PartitionTransaction] = PartitionTransaction
3129

3230
def __init__(
3331
self,
@@ -110,20 +108,17 @@ def recover_from_changelog_message(
110108
:param offset: changelog message offset
111109
"""
112110

113-
def begin(self) -> T:
111+
def begin(self) -> PartitionTransaction:
114112
"""
115113
Start a new `PartitionTransaction`
116114
117115
Using `PartitionTransaction` is a recommended way for accessing the data.
118116
"""
119-
return cast(
120-
T,
121-
self.partition_transaction_class(
122-
partition=self,
123-
dumps=self._dumps,
124-
loads=self._loads,
125-
changelog_producer=self._changelog_producer,
126-
),
117+
return self.partition_transaction_class(
118+
partition=self,
119+
dumps=self._dumps,
120+
loads=self._loads,
121+
changelog_producer=self._changelog_producer,
127122
)
128123

129124
def __enter__(self):

quixstreams/state/rocksdb/partition.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,13 @@
66
List,
77
Literal,
88
Optional,
9-
TypeVar,
109
Union,
1110
cast,
1211
)
1312

1413
from rocksdict import AccessType, ColumnFamily, Rdict, ReadOptions, WriteBatch
1514

1615
from quixstreams.state.base import (
17-
PartitionTransaction,
1816
PartitionTransactionCache,
1917
StorePartition,
2018
)
@@ -35,10 +33,8 @@
3533

3634
logger = logging.getLogger(__name__)
3735

38-
T = TypeVar("T", bound=PartitionTransaction)
3936

40-
41-
class RocksDBStorePartition(StorePartition[T]):
37+
class RocksDBStorePartition(StorePartition):
4238
"""
4339
A base class to access state in RocksDB.
4440
It represents a single RocksDB database.

quixstreams/state/rocksdb/windowed/partition.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import logging
2-
from typing import Iterator, cast
2+
from typing import Iterator, Type, cast
33

44
from ..partition import RocksDBStorePartition
55
from .metadata import (
@@ -15,9 +15,7 @@
1515
logger = logging.getLogger(__name__)
1616

1717

18-
class WindowedRocksDBStorePartition(
19-
RocksDBStorePartition[WindowedRocksDBPartitionTransaction]
20-
):
18+
class WindowedRocksDBStorePartition(RocksDBStorePartition):
2119
"""
2220
A base class to access windowed state in RocksDB.
2321
It represents a single RocksDB database.
@@ -26,7 +24,9 @@ class WindowedRocksDBStorePartition(
2624
stores the expiration index to delete expired windows.
2725
"""
2826

29-
partition_transaction_class = WindowedRocksDBPartitionTransaction
27+
partition_transaction_class: Type[WindowedRocksDBPartitionTransaction] = (
28+
WindowedRocksDBPartitionTransaction
29+
)
3030
additional_column_families = (
3131
LATEST_DELETED_VALUE_CF_NAME,
3232
LATEST_EXPIRED_WINDOW_CF_NAME,
@@ -47,3 +47,16 @@ def iter_keys(self, cf_name: str = "default") -> Iterator[bytes]:
4747
"""
4848
cf_dict = self.get_column_family(cf_name)
4949
return cast(Iterator[bytes], cf_dict.keys())
50+
51+
def begin(self) -> WindowedRocksDBPartitionTransaction:
52+
"""
53+
Start a new `WindowedRocksDBPartitionTransaction`
54+
55+
Using `WindowedRocksDBPartitionTransaction` is a recommended way for accessing the data.
56+
"""
57+
return self.partition_transaction_class(
58+
partition=self,
59+
dumps=self._dumps,
60+
loads=self._loads,
61+
changelog_producer=self._changelog_producer,
62+
)

quixstreams/state/rocksdb/windowed/transaction.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
from .partition import WindowedRocksDBStorePartition
4242

4343

44-
class WindowedRocksDBPartitionTransaction(PartitionTransaction):
44+
class WindowedRocksDBPartitionTransaction(PartitionTransaction[bytes, dict]):
4545
def __init__(
4646
self,
4747
partition: "WindowedRocksDBStorePartition",
@@ -469,8 +469,7 @@ def get_windows(
469469
):
470470
_, start, end = parse_window_key(key)
471471
if start_from_ms < start <= start_to_ms:
472-
value = self._deserialize_value(value)
473-
result.append(((start, end), value, prefix))
472+
result.append(((start, end), self._deserialize_value(value), prefix))
474473

475474
return result
476475

@@ -532,7 +531,7 @@ def _get_items(
532531

533532
def _get_timestamp(
534533
self, cache: TimestampsCache, prefix: bytes, default: Any = None
535-
) -> Any:
534+
) -> int:
536535
cached_ts = cache.timestamps.get(prefix)
537536
if cached_ts is not None:
538537
return cached_ts
@@ -585,7 +584,7 @@ def _get_next_count(self) -> int:
585584
if cache.counter is None:
586585
cache.counter = self.get(
587586
default=-1, key=cache.key, prefix=b"", cf_name=cache.cf_name
588-
)
587+
) # type:ignore[call-overload]
589588

590589
cache.counter += 1
591590

0 commit comments

Comments
 (0)