Skip to content

Commit 0a06c23

Browse files
committed
Handle various corruption scenarios
1 parent 892d968 commit 0a06c23

File tree

2 files changed

+24
-24
lines changed

2 files changed

+24
-24
lines changed

quixstreams/state/rocksdb/partition.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import logging
2-
import re
32
import time
43
from typing import (
54
Dict,
@@ -35,12 +34,6 @@
3534

3635
logger = logging.getLogger(__name__)
3736

38-
ROCKSDB_CORRUPTION_REGEX = re.compile(
39-
r"^Corruption: Corruption: IO error: No such file or "
40-
r"directory: While open a file for random read: .*: "
41-
r"No such file or directory The file .* may be corrupted\.$"
42-
)
43-
4437

4538
class RocksDBStorePartition(StorePartition):
4639
"""
@@ -366,7 +359,7 @@ def _open_rocksdict(self) -> Rdict:
366359
try:
367360
rdict = create_rdict()
368361
except Exception as exc:
369-
if ROCKSDB_CORRUPTION_REGEX.fullmatch(str(exc)) is None:
362+
if not str(exc).startswith("Corruption"):
370363
raise
371364
elif not self._changelog_producer:
372365
raise RocksDBCorruptedError(

tests/test_quixstreams/test_state/test_rocksdb/test_rocksdb_partition.py

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,6 @@
1616
from quixstreams.state.rocksdb.windowed.serialization import append_integer
1717

1818

19-
@pytest.fixture
20-
def corrupted_db(tmp_path):
21-
"""
22-
Create a corrupted RocksDB database in a temporary directory.
23-
"""
24-
rdict = Rdict(path=tmp_path.as_posix())
25-
# Write something and flush so that the .sst file is created
26-
rdict[b"key"] = b"value"
27-
rdict.flush()
28-
# Delete the .sst file
29-
next(tmp_path.glob("*.sst")).unlink()
30-
31-
3219
class TestRocksDBStorePartition:
3320
def test_open_db_locked_retries(self, store_partition_factory, executor):
3421
db1 = store_partition_factory("db")
@@ -83,8 +70,12 @@ def test_open_arbitrary_exception_fails(self, store_partition_factory):
8370
assert str(raised.value) == "some exception"
8471

8572
def test_db_corrupted_fails_with_no_changelog(
86-
self, store_partition_factory, corrupted_db
73+
self, store_partition_factory, tmp_path
8774
):
75+
# Initialize and corrupt the database by messing with the MANIFEST
76+
Rdict(path=tmp_path.as_posix())
77+
next(tmp_path.glob("MANIFEST*")).write_bytes(b"")
78+
8879
with pytest.raises(
8980
RocksDBCorruptedError,
9081
match=(
@@ -95,8 +86,12 @@ def test_db_corrupted_fails_with_no_changelog(
9586
store_partition_factory(changelog_producer=None)
9687

9788
def test_db_corrupted_fails_with_on_corrupted_recreate_false(
98-
self, store_partition_factory, corrupted_db
89+
self, store_partition_factory, tmp_path
9990
):
91+
# Initialize and corrupt the database by messing with the MANIFEST
92+
Rdict(path=tmp_path.as_posix())
93+
next(tmp_path.glob("MANIFEST*")).write_bytes(b"")
94+
10095
with pytest.raises(
10196
RocksDBCorruptedError,
10297
match=re.escape(
@@ -107,7 +102,19 @@ def test_db_corrupted_fails_with_on_corrupted_recreate_false(
107102
):
108103
store_partition_factory()
109104

110-
def test_db_corrupted_recreated(self, store_partition_factory, corrupted_db):
105+
def test_db_corrupted_manifest_file(self, store_partition_factory, tmp_path):
106+
Rdict(path=tmp_path.as_posix()) # initialize db
107+
next(tmp_path.glob("MANIFEST*")).write_bytes(b"") # write random bytes
108+
109+
store_partition_factory(options=RocksDBOptions(on_corrupted_recreate=True))
110+
111+
def test_db_corrupted_sst_file(self, store_partition_factory, tmp_path):
112+
rdict = Rdict(path=tmp_path.as_posix()) # initialize db
113+
rdict[b"key"] = b"value" # write something
114+
rdict.flush() # flush creates .sst file
115+
rdict.close() # required to release the lock
116+
next(tmp_path.glob("*.sst")).unlink() # delete the .sst file
117+
111118
store_partition_factory(options=RocksDBOptions(on_corrupted_recreate=True))
112119

113120
def test_create_and_get_column_family(self, store_partition: RocksDBStorePartition):

0 commit comments

Comments
 (0)