Skip to content

Add counter to timestamped store #886

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quixstreams/state/metadata.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import enum

SEPARATOR = b"|"
SEPARATOR_LENGTH = len(SEPARATOR)

CHANGELOG_CF_MESSAGE_HEADER = "__column_family__"
CHANGELOG_PROCESSED_OFFSETS_MESSAGE_HEADER = "__processed_tp_offsets__"
Expand Down
6 changes: 3 additions & 3 deletions quixstreams/state/rocksdb/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from quixstreams.state.exceptions import ColumnFamilyDoesNotExist
from quixstreams.state.metadata import METADATA_CF_NAME, Marker
from quixstreams.state.recovery import ChangelogProducer
from quixstreams.state.serialization import int_from_int64_bytes, int_to_int64_bytes
from quixstreams.state.serialization import int_from_bytes, int_to_bytes

from .exceptions import ColumnFamilyAlreadyExists
from .metadata import (
Expand Down Expand Up @@ -231,7 +231,7 @@ def get_changelog_offset(self) -> Optional[int]:
if offset_bytes is None:
return None

return int_from_int64_bytes(offset_bytes)
return int_from_bytes(offset_bytes)

def write_changelog_offset(self, offset: int):
"""
Expand Down Expand Up @@ -396,7 +396,7 @@ def _init_rocksdb(self) -> Rdict:
def _update_changelog_offset(self, batch: WriteBatch, offset: int):
batch.put(
CHANGELOG_OFFSET_KEY,
int_to_int64_bytes(offset),
int_to_bytes(offset),
self.get_column_family_handle(METADATA_CF_NAME),
)

Expand Down
6 changes: 4 additions & 2 deletions quixstreams/state/rocksdb/timestamped.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from quixstreams.state.serialization import (
DumpsFunc,
LoadsFunc,
int_to_int64_bytes,
encode_integer_pair,
serialize,
)

Expand Down Expand Up @@ -211,7 +211,9 @@ def _ensure_bytes(self, prefix: Any) -> bytes:

def _serialize_key(self, key: Union[int, bytes], prefix: bytes) -> bytes:
if isinstance(key, int):
return prefix + SEPARATOR + int_to_int64_bytes(key)
# TODO: Currently using constant 0, but will be
# replaced with a global counter in the future
return prefix + SEPARATOR + encode_integer_pair(key, 0)
elif isinstance(key, bytes):
return prefix + SEPARATOR + key
raise TypeError(f"Invalid key type: {type(key)}")
Expand Down
37 changes: 8 additions & 29 deletions quixstreams/state/rocksdb/windowed/serialization.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
import struct

from quixstreams.state.metadata import SEPARATOR
from quixstreams.state.metadata import SEPARATOR, SEPARATOR_LENGTH
from quixstreams.state.serialization import (
int_to_int64_bytes,
decode_integer_pair,
int_to_bytes,
)

__all__ = ("parse_window_key", "encode_integer_pair", "append_integer")

_TIMESTAMP_BYTE_LENGTH = len(int_to_int64_bytes(0))
_SEPARATOR_LENGTH = len(SEPARATOR)
_TIMESTAMPS_SEGMENT_LEN = _TIMESTAMP_BYTE_LENGTH * 2 + _SEPARATOR_LENGTH
__all__ = ("parse_window_key", "append_integer")

_window_pack_format = ">q" + "c" * _SEPARATOR_LENGTH + "q"
_window_packer = struct.Struct(_window_pack_format)
_window_pack = _window_packer.pack
_window_unpack = _window_packer.unpack
_TIMESTAMP_BYTE_LENGTH = len(int_to_bytes(0))
_TIMESTAMPS_SEGMENT_LEN = _TIMESTAMP_BYTE_LENGTH * 2 + SEPARATOR_LENGTH


def parse_window_key(key: bytes) -> tuple[bytes, int, int]:
Expand All @@ -33,24 +26,10 @@ def parse_window_key(key: bytes) -> tuple[bytes, int, int]:
key[-_TIMESTAMPS_SEGMENT_LEN:],
)

start_ms, _, end_ms = _window_unpack(timestamps_bytes)
start_ms, end_ms = decode_integer_pair(timestamps_bytes)
return message_key, start_ms, end_ms


def encode_integer_pair(integer_1: int, integer_2: int) -> bytes:
"""
Encode a pair of integers into bytes of the following format:
```<integer_1>|<integer_2>```

Encoding integers this way make them sortable in RocksDB within the same prefix.

:param integer_1: first integer
:param integer_2: second integer
:return: integers as bytes
"""
return _window_pack(integer_1, SEPARATOR, integer_2)


def append_integer(base_bytes: bytes, integer: int) -> bytes:
"""
Append integer to the base bytes
Expand All @@ -61,4 +40,4 @@ def append_integer(base_bytes: bytes, integer: int) -> bytes:
:param integer: integer to append
:return: bytes
"""
return base_bytes + SEPARATOR + int_to_int64_bytes(integer)
return base_bytes + SEPARATOR + int_to_bytes(integer)
11 changes: 4 additions & 7 deletions quixstreams/state/rocksdb/windowed/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from quixstreams.state.serialization import (
DumpsFunc,
LoadsFunc,
encode_integer_pair,
int_to_bytes,
serialize,
)
from quixstreams.state.types import ExpiredWindowDetail, WindowDetail
Expand All @@ -29,12 +31,7 @@
LATEST_TIMESTAMPS_CF_NAME,
VALUES_CF_NAME,
)
from .serialization import (
append_integer,
encode_integer_pair,
int_to_int64_bytes,
parse_window_key,
)
from .serialization import append_integer, parse_window_key
from .state import WindowedTransactionState

if TYPE_CHECKING:
Expand Down Expand Up @@ -330,7 +327,7 @@ def expire_all_windows(
if not windows:
return
last_expired = windows[-1] # windows are ordered
suffixes: set[bytes] = set(int_to_int64_bytes(window) for window in windows)
suffixes: set[bytes] = set(int_to_bytes(window) for window in windows)
for key in self.keys():
if key[-8:] in suffixes:
prefix, start, end = parse_window_key(key)
Expand Down
44 changes: 39 additions & 5 deletions quixstreams/state/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,28 @@
from typing import Any, Callable

from .exceptions import StateSerializationError
from .metadata import SEPARATOR, SEPARATOR_LENGTH

__all__ = (
"DumpsFunc",
"LoadsFunc",
"serialize",
"deserialize",
"int_to_int64_bytes",
"int_from_int64_bytes",
"int_to_bytes",
"int_from_bytes",
"encode_integer_pair",
"decode_integer_pair",
)

_int_packer = struct.Struct(">q")
_int_packer = struct.Struct(">Q")
_int_pack = _int_packer.pack
_int_unpack = _int_packer.unpack

_int_pair_pack_format = ">Q" + "c" * SEPARATOR_LENGTH + "Q"
_int_pair_packer = struct.Struct(_int_pair_pack_format)
_int_pair_pack = _int_pair_packer.pack
_int_pair_unpack = _int_pair_packer.unpack

DumpsFunc = Callable[[Any], bytes]
LoadsFunc = Callable[[bytes], Any]

Expand All @@ -36,9 +44,35 @@ def deserialize(value: bytes, loads: LoadsFunc) -> Any:
) from exc


def int_to_int64_bytes(value: int) -> bytes:
def int_to_bytes(value: int) -> bytes:
return _int_pack(value)


def int_from_int64_bytes(value: bytes) -> int:
def int_from_bytes(value: bytes) -> int:
return _int_unpack(value)[0]


def encode_integer_pair(integer_1: int, integer_2: int) -> bytes:
"""
Encode a pair of integers into bytes of the following format:
```<integer_1>|<integer_2>```

Encoding integers this way make them sortable in RocksDB within the same prefix.

:param integer_1: first integer
:param integer_2: second integer
:return: integers as bytes
"""
return _int_pair_pack(integer_1, SEPARATOR, integer_2)


def decode_integer_pair(value: bytes) -> tuple[int, int]:
"""
Decode a pair of integers from bytes of the following format:
```<integer_1>|<integer_2>```

:param value: bytes
:return: tuple of integers
"""
integer_1, _, integer_2 = _int_pair_unpack(value)
return integer_1, integer_2
Original file line number Diff line number Diff line change
@@ -1,29 +1,8 @@
import pytest

from quixstreams.state.metadata import SEPARATOR
from quixstreams.state.rocksdb.windowed.serialization import (
append_integer,
encode_integer_pair,
parse_window_key,
)


@pytest.mark.parametrize(
"start, end",
[
(0, 0),
(1, 2),
(-1, 2),
(2, -2),
],
)
def test_encode_integer_pair(start, end):
key = encode_integer_pair(start, end)
assert isinstance(key, bytes)

prefix, decoded_start, decoded_end = parse_window_key(key)
assert decoded_start == start
assert decoded_end == end
from quixstreams.state.rocksdb.windowed.serialization import append_integer
from quixstreams.state.serialization import encode_integer_pair


@pytest.mark.parametrize("base_bytes", [b"", b"base_bytes"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from quixstreams.state.rocksdb.windowed.metadata import VALUES_CF_NAME
from quixstreams.state.rocksdb.windowed.serialization import encode_integer_pair
from quixstreams.state.serialization import encode_integer_pair


@pytest.fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
CHANGELOG_CF_MESSAGE_HEADER,
CHANGELOG_PROCESSED_OFFSETS_MESSAGE_HEADER,
)
from quixstreams.state.rocksdb.windowed.serialization import encode_integer_pair
from quixstreams.state.serialization import encode_integer_pair
from quixstreams.utils.json import dumps


Expand Down
23 changes: 23 additions & 0 deletions tests/test_quixstreams/test_state/test_serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pytest

from quixstreams.state.serialization import (
decode_integer_pair,
encode_integer_pair,
)


@pytest.mark.parametrize(
"start, end",
[
(0, 0),
(1, 18446744073709551615),
],
)
def test_encode_integer_pair(start, end):
# This test also covers decode_integer_pair function
key = encode_integer_pair(start, end)
assert isinstance(key, bytes)

decoded_start, decoded_end = decode_integer_pair(key)
assert decoded_start == start
assert decoded_end == end