Skip to content

Commit 796f0cc

Browse files
committed
Make PartitionTransaction.begin() abstract
1 parent 75dd5c6 commit 796f0cc

File tree

6 files changed

+33
-15
lines changed

6 files changed

+33
-15
lines changed

quixstreams/state/base/partition.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
from abc import ABC, abstractmethod
3-
from typing import TYPE_CHECKING, Literal, Optional, Type, Union
3+
from typing import TYPE_CHECKING, Literal, Optional, Union
44

55
from quixstreams.state.metadata import (
66
Marker,
@@ -25,8 +25,6 @@ class StorePartition(ABC):
2525
the persistent storage).
2626
"""
2727

28-
partition_transaction_class: Type[PartitionTransaction] = PartitionTransaction
29-
3028
def __init__(
3129
self,
3230
dumps: DumpsFunc,
@@ -108,18 +106,14 @@ def recover_from_changelog_message(
108106
:param offset: changelog message offset
109107
"""
110108

109+
@abstractmethod
111110
def begin(self) -> PartitionTransaction:
112111
"""
113112
Start a new `PartitionTransaction`
114113
115114
Using `PartitionTransaction` is a recommended way for accessing the data.
116115
"""
117-
return self.partition_transaction_class(
118-
partition=self,
119-
dumps=self._dumps,
120-
loads=self._loads,
121-
changelog_producer=self._changelog_producer,
122-
)
116+
...
123117

124118
def __enter__(self):
125119
return self

quixstreams/state/base/transaction.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
"PartitionTransactionStatus",
4545
"PartitionTransaction",
4646
"PartitionTransactionCache",
47+
"validate_transaction_status",
4748
)
4849

4950
logger = logging.getLogger(__name__)

quixstreams/state/memory/partition.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
from typing import Any, Dict, Literal, Optional, Union
44

5+
from quixstreams.state import PartitionTransaction
56
from quixstreams.state.base import PartitionTransactionCache, StorePartition
67
from quixstreams.state.exceptions import ColumnFamilyDoesNotExist
78
from quixstreams.state.metadata import METADATA_CF_NAME, Marker
@@ -62,6 +63,14 @@ def closed(self) -> bool:
6263
def close(self) -> None:
6364
self._closed = True
6465

66+
def begin(self) -> PartitionTransaction:
67+
return PartitionTransaction(
68+
partition=self,
69+
dumps=self._dumps,
70+
loads=self._loads,
71+
changelog_producer=self._changelog_producer,
72+
)
73+
6574
@_validate_partition_state()
6675
def write(
6776
self,

quixstreams/state/rocksdb/partition.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from rocksdict import AccessType, ColumnFamily, Rdict, ReadOptions, WriteBatch
1414

1515
from quixstreams.state.base import (
16+
PartitionTransaction,
1617
PartitionTransactionCache,
1718
StorePartition,
1819
)
@@ -199,6 +200,14 @@ def iter_items(
199200
if lower_bound <= key:
200201
yield key, value
201202

203+
def begin(self) -> PartitionTransaction:
204+
return PartitionTransaction(
205+
partition=self,
206+
dumps=self._dumps,
207+
loads=self._loads,
208+
changelog_producer=self._changelog_producer,
209+
)
210+
202211
def exists(self, key: bytes, cf_name: str = "default") -> bool:
203212
"""
204213
Check if a key is present in the DB.

quixstreams/state/rocksdb/timestamped.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,9 +240,17 @@ class TimestampedStorePartition(RocksDBStorePartition):
240240
`TimestampedPartitionTransaction` instances to handle atomic operations for that partition.
241241
"""
242242

243-
partition_transaction_class = TimestampedPartitionTransaction
244243
additional_column_families = (MIN_ELIGIBLE_TIMESTAMPS_CF_NAME,)
245244

245+
def begin(self) -> TimestampedPartitionTransaction:
246+
# Override the method to specify the correct return type
247+
return TimestampedPartitionTransaction(
248+
partition=self,
249+
dumps=self._dumps,
250+
loads=self._loads,
251+
changelog_producer=self._changelog_producer,
252+
)
253+
246254

247255
class TimestampedStore(RocksDBStore):
248256
"""

quixstreams/state/rocksdb/windowed/partition.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import logging
2-
from typing import Iterator, Type, cast
2+
from typing import Iterator, cast
33

44
from ..partition import RocksDBStorePartition
55
from .metadata import (
@@ -24,9 +24,6 @@ class WindowedRocksDBStorePartition(RocksDBStorePartition):
2424
stores the expiration index to delete expired windows.
2525
"""
2626

27-
partition_transaction_class: Type[WindowedRocksDBPartitionTransaction] = (
28-
WindowedRocksDBPartitionTransaction
29-
)
3027
additional_column_families = (
3128
LATEST_DELETED_VALUE_CF_NAME,
3229
LATEST_EXPIRED_WINDOW_CF_NAME,
@@ -54,7 +51,7 @@ def begin(self) -> WindowedRocksDBPartitionTransaction:
5451
5552
Using `WindowedRocksDBPartitionTransaction` is a recommended way for accessing the data.
5653
"""
57-
return self.partition_transaction_class(
54+
return WindowedRocksDBPartitionTransaction(
5855
partition=self,
5956
dumps=self._dumps,
6057
loads=self._loads,

0 commit comments

Comments
 (0)