Skip to content

Commit 19e4229

Browse files
Clear corrupted RocksDB (#888)
Co-authored-by: Daniil Gusev <daniil@quix.io>
1 parent e57494b commit 19e4229

File tree

6 files changed

+90
-6
lines changed

6 files changed

+90
-6
lines changed

quixstreams/state/rocksdb/exceptions.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,6 @@
44

55

66
class ColumnFamilyAlreadyExists(StateError): ...
7+
8+
9+
class RocksDBCorruptedError(StateError): ...

quixstreams/state/rocksdb/options.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ class RocksDBOptions(RocksDBOptionsType):
3232
:param open_max_retries: number of times to retry opening the database
3333
if it's locked by another process. To disable retrying, pass 0
3434
:param open_retry_backoff: number of seconds to wait between each retry.
35+
:param on_corrupted_recreate: when True, the corrupted DB will be destroyed
36+
if the `use_changelog_topics=True` is also set on the Application.
37+
If this option is True, but `use_changelog_topics=False`,
38+
the DB won't be destroyed.
39+
Note that the application doesn't validate the contents of the changelog topics.
40+
Default - `False`.
3541
3642
Please see `rocksdict.Options` for a complete description of other options.
3743
"""
@@ -51,6 +57,7 @@ class RocksDBOptions(RocksDBOptionsType):
5157
open_max_retries: int = 10
5258
open_retry_backoff: float = 3.0
5359
use_fsync: bool = True
60+
on_corrupted_recreate: bool = False
5461

5562
def to_options(self) -> rocksdict.Options:
5663
"""

quixstreams/state/rocksdb/partition.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from quixstreams.state.recovery import ChangelogProducer
2323
from quixstreams.state.serialization import int_from_bytes, int_to_bytes
2424

25-
from .exceptions import ColumnFamilyAlreadyExists
25+
from .exceptions import ColumnFamilyAlreadyExists, RocksDBCorruptedError
2626
from .metadata import (
2727
CHANGELOG_OFFSET_KEY,
2828
)
@@ -350,11 +350,38 @@ def _open_rocksdict(self) -> Rdict:
350350
options = self._rocksdb_options
351351
options.create_if_missing(True)
352352
options.create_missing_column_families(True)
353-
rdict = Rdict(
353+
create_rdict = lambda: Rdict(
354354
path=self._path,
355355
options=options,
356356
access_type=AccessType.read_write(),
357357
)
358+
# TODO: Add docs
359+
360+
try:
361+
rdict = create_rdict()
362+
except Exception as exc:
363+
if not str(exc).startswith("Corruption"):
364+
raise
365+
elif not self._changelog_producer:
366+
raise RocksDBCorruptedError(
367+
f'State store at "{self._path}" is corrupted '
368+
f"and cannot be recovered from the changelog topic: "
369+
"`use_changelog_topics` is set to False."
370+
) from exc
371+
elif not self._options.on_corrupted_recreate:
372+
raise RocksDBCorruptedError(
373+
f'State store at "{self._path}" is corrupted '
374+
f"but may be recovered from the changelog topic. "
375+
"Pass `rocksdb_options=RocksDBOptions(..., on_corrupted_recreate=True)` "
376+
"to the Application to destroy the corrupted state "
377+
"and recover it from the changelog."
378+
) from exc
379+
380+
logger.warning(f"Destroying corrupted RocksDB path={self._path}")
381+
Rdict.destroy(self._path)
382+
logger.warning(f"Recreating corrupted RocksDB path={self._path}")
383+
rdict = create_rdict()
384+
358385
# Ensure metadata column family is created without defining it upfront
359386
try:
360387
rdict.get_column_family(METADATA_CF_NAME)

quixstreams/state/rocksdb/types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ class RocksDBOptionsType(Protocol):
2323
open_max_retries: int
2424
open_retry_backoff: float
2525
use_fsync: bool
26+
on_corrupted_recreate: bool
2627

2728
def to_options(self) -> rocksdict.Options: ...

tests/test_quixstreams/test_state/fixtures.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,15 +134,15 @@ def factory(
134134

135135
def rocksdb_partition_factory(tmp_path, changelog_producer_mock):
136136
def factory(
137-
name: str = "db",
137+
name: str = "",
138138
options: Optional[RocksDBOptions] = None,
139-
changelog_producer: Optional[ChangelogProducer] = None,
139+
changelog_producer: Optional[ChangelogProducer] = changelog_producer_mock,
140140
) -> RocksDBStorePartition:
141141
path = (tmp_path / name).as_posix()
142142
_options = options or RocksDBOptions(open_max_retries=0, open_retry_backoff=3.0)
143143
return RocksDBStorePartition(
144144
path,
145-
changelog_producer=changelog_producer or changelog_producer_mock,
145+
changelog_producer=changelog_producer,
146146
options=_options,
147147
)
148148

tests/test_quixstreams/test_state/test_rocksdb/test_rocksdb_partition.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
RocksDBOptions,
1212
RocksDBStorePartition,
1313
)
14+
from quixstreams.state.rocksdb.exceptions import RocksDBCorruptedError
1415
from quixstreams.state.rocksdb.windowed.serialization import append_integer
1516

1617

@@ -67,6 +68,51 @@ def test_open_arbitrary_exception_fails(self, store_partition_factory):
6768

6869
assert str(raised.value) == "some exception"
6970

71+
def test_db_corrupted_fails_with_no_changelog(
72+
self, store_partition_factory, tmp_path
73+
):
74+
# Initialize and corrupt the database by messing with the MANIFEST
75+
path = tmp_path.as_posix()
76+
Rdict(path=path)
77+
next(tmp_path.glob("MANIFEST*")).write_bytes(b"")
78+
79+
with pytest.raises(
80+
RocksDBCorruptedError,
81+
match=f'State store at "{path}" is corrupted and cannot be recovered '
82+
f"from the changelog topic",
83+
):
84+
store_partition_factory(changelog_producer=None)
85+
86+
def test_db_corrupted_fails_with_on_corrupted_recreate_false(
87+
self, store_partition_factory, tmp_path
88+
):
89+
# Initialize and corrupt the database by messing with the MANIFEST
90+
path = tmp_path.as_posix()
91+
Rdict(path=path)
92+
next(tmp_path.glob("MANIFEST*")).write_bytes(b"")
93+
94+
with pytest.raises(
95+
RocksDBCorruptedError,
96+
match=f'State store at "{path}" is corrupted but may be recovered '
97+
f"from the changelog topic",
98+
):
99+
store_partition_factory()
100+
101+
def test_db_corrupted_manifest_file(self, store_partition_factory, tmp_path):
102+
Rdict(path=tmp_path.as_posix()) # initialize db
103+
next(tmp_path.glob("MANIFEST*")).write_bytes(b"") # write random bytes
104+
105+
store_partition_factory(options=RocksDBOptions(on_corrupted_recreate=True))
106+
107+
def test_db_corrupted_sst_file(self, store_partition_factory, tmp_path):
108+
rdict = Rdict(path=tmp_path.as_posix()) # initialize db
109+
rdict[b"key"] = b"value" # write something
110+
rdict.flush() # flush creates .sst file
111+
rdict.close() # required to release the lock
112+
next(tmp_path.glob("*.sst")).unlink() # delete the .sst file
113+
114+
store_partition_factory(options=RocksDBOptions(on_corrupted_recreate=True))
115+
70116
def test_create_and_get_column_family(self, store_partition: RocksDBStorePartition):
71117
store_partition.create_column_family("cf")
72118
assert store_partition.get_column_family("cf")
@@ -123,7 +169,7 @@ def test_custom_options(self, store_partition_factory, tmp_path):
123169
Pass custom "logs_dir" to Rdict and ensure it exists and has some files
124170
"""
125171

126-
logs_dir = Path(tmp_path / "db" / "logs")
172+
logs_dir = Path(tmp_path / "logs")
127173
options = RocksDBOptions(db_log_dir=logs_dir.as_posix())
128174
with store_partition_factory(options=options):
129175
assert logs_dir.is_dir()

0 commit comments

Comments
 (0)