Skip to content

Commit 9ffe2df

Browse files
committed
Add more info to the logs and error messages
1 parent 0a06c23 commit 9ffe2df

File tree

2 files changed

+21
-20
lines changed

2 files changed

+21
-20
lines changed

quixstreams/state/rocksdb/partition.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,7 @@ def _open_rocksdict(self) -> Rdict:
355355
options=options,
356356
access_type=AccessType.read_write(),
357357
)
358+
# TODO: Add docs
358359

359360
try:
360361
rdict = create_rdict()
@@ -363,18 +364,22 @@ def _open_rocksdict(self) -> Rdict:
363364
raise
364365
elif not self._changelog_producer:
365366
raise RocksDBCorruptedError(
366-
"State is corrupted and cannot be recovered from changelog: "
367+
f'State store at "{self._path}" is corrupted '
368+
f"and cannot be recovered from the changelog topic: "
367369
"`use_changelog_topics` is set to False."
368-
)
370+
) from exc
369371
elif not self._options.on_corrupted_recreate:
370372
raise RocksDBCorruptedError(
371-
"State is corrupted but may be recovered from changelog. "
372-
"Set `RocksDBOptions(..., on_corrupted_recreate=True)` "
373-
"to destroy the state and recreate it from changelog."
374-
)
375-
376-
logger.info("Recreating corrupted RocksDB...")
373+
f'State store at "{self._path}" is corrupted '
374+
f"but may be recovered from the changelog topic. "
375+
"Pass `rocksdb_options=RocksDBOptions(..., on_corrupted_recreate=True)` "
376+
"to the Application to destroy the corrupted state "
377+
"and recover it from the changelog."
378+
) from exc
379+
380+
logger.warning(f"Destroying corrupted RocksDB path={self._path}")
377381
Rdict.destroy(self._path)
382+
logger.warning(f"Recreating corrupted RocksDB path={self._path}")
378383
rdict = create_rdict()
379384

380385
# Ensure metadata column family is created without defining it upfront

tests/test_quixstreams/test_state/test_rocksdb/test_rocksdb_partition.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import re
21
import time
32
from pathlib import Path
43
from unittest.mock import patch
@@ -73,32 +72,29 @@ def test_db_corrupted_fails_with_no_changelog(
7372
self, store_partition_factory, tmp_path
7473
):
7574
# Initialize and corrupt the database by messing with the MANIFEST
76-
Rdict(path=tmp_path.as_posix())
75+
path = tmp_path.as_posix()
76+
Rdict(path=path)
7777
next(tmp_path.glob("MANIFEST*")).write_bytes(b"")
7878

7979
with pytest.raises(
8080
RocksDBCorruptedError,
81-
match=(
82-
"State is corrupted and cannot be recovered from changelog: "
83-
"`use_changelog_topics` is set to False."
84-
),
81+
match=f'State store at "{path}" is corrupted and cannot be recovered '
82+
f"from the changelog topic",
8583
):
8684
store_partition_factory(changelog_producer=None)
8785

8886
def test_db_corrupted_fails_with_on_corrupted_recreate_false(
8987
self, store_partition_factory, tmp_path
9088
):
9189
# Initialize and corrupt the database by messing with the MANIFEST
92-
Rdict(path=tmp_path.as_posix())
90+
path = tmp_path.as_posix()
91+
Rdict(path=path)
9392
next(tmp_path.glob("MANIFEST*")).write_bytes(b"")
9493

9594
with pytest.raises(
9695
RocksDBCorruptedError,
97-
match=re.escape(
98-
"State is corrupted but may be recovered from changelog. "
99-
"Set `RocksDBOptions(..., on_corrupted_recreate=True)` "
100-
"to destroy the state and recreate it from changelog."
101-
),
96+
match=f'State store at "{path}" is corrupted but may be recovered '
97+
f"from the changelog topic",
10298
):
10399
store_partition_factory()
104100

0 commit comments

Comments
 (0)