Skip to content

Commit d12c60f

Browse files
committed
move retry logic to standalone module
1 parent 690c974 commit d12c60f

File tree

6 files changed

+151
-237
lines changed

6 files changed

+151
-237
lines changed

tests/query/test_query_transaction.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,6 @@ def test_context_manager_normal_flow(self, tx: BaseTxContext):
6565
assert tx._tx_state._state == QueryTxStateEnum.COMMITTED
6666

6767
def test_context_manager_does_not_hide_exceptions(self, tx: BaseTxContext):
68-
with pytest.raises(RuntimeError):
68+
with pytest.raises(Exception):
6969
with tx:
70-
tx.commit()
70+
raise Exception()

ydb/_retries.py

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import random
2+
import time
3+
4+
from . import issues
5+
from ._errors import check_retriable_error
6+
7+
class BackoffSettings(object):
8+
def __init__(self, ceiling=6, slot_duration=0.001, uncertain_ratio=0.5):
9+
self.ceiling = ceiling
10+
self.slot_duration = slot_duration
11+
self.uncertain_ratio = uncertain_ratio
12+
13+
def calc_timeout(self, retry_number):
14+
slots_count = 1 << min(retry_number, self.ceiling)
15+
max_duration_ms = slots_count * self.slot_duration * 1000.0
16+
# duration_ms = random.random() * max_duration_ms * uncertain_ratio) + max_duration_ms * (1 - uncertain_ratio)
17+
duration_ms = max_duration_ms * (random.random() * self.uncertain_ratio + 1.0 - self.uncertain_ratio)
18+
return duration_ms / 1000.0
19+
20+
21+
class RetrySettings(object):
22+
def __init__(
23+
self,
24+
max_retries=10,
25+
max_session_acquire_timeout=None,
26+
on_ydb_error_callback=None,
27+
backoff_ceiling=6,
28+
backoff_slot_duration=1,
29+
get_session_client_timeout=5,
30+
fast_backoff_settings=None,
31+
slow_backoff_settings=None,
32+
idempotent=False,
33+
):
34+
self.max_retries = max_retries
35+
self.max_session_acquire_timeout = max_session_acquire_timeout
36+
self.on_ydb_error_callback = (lambda e: None) if on_ydb_error_callback is None else on_ydb_error_callback
37+
self.fast_backoff = BackoffSettings(10, 0.005) if fast_backoff_settings is None else fast_backoff_settings
38+
self.slow_backoff = (
39+
BackoffSettings(backoff_ceiling, backoff_slot_duration)
40+
if slow_backoff_settings is None
41+
else slow_backoff_settings
42+
)
43+
self.retry_not_found = True
44+
self.idempotent = idempotent
45+
self.retry_internal_error = True
46+
self.unknown_error_handler = lambda e: None
47+
self.get_session_client_timeout = get_session_client_timeout
48+
if max_session_acquire_timeout is not None:
49+
self.get_session_client_timeout = min(self.max_session_acquire_timeout, self.get_session_client_timeout)
50+
51+
def with_fast_backoff(self, backoff_settings):
52+
self.fast_backoff = backoff_settings
53+
return self
54+
55+
def with_slow_backoff(self, backoff_settings):
56+
self.slow_backoff = backoff_settings
57+
return self
58+
59+
60+
class YdbRetryOperationSleepOpt(object):
61+
def __init__(self, timeout):
62+
self.timeout = timeout
63+
64+
def __eq__(self, other):
65+
return type(self) == type(other) and self.timeout == other.timeout
66+
67+
def __repr__(self):
68+
return "YdbRetryOperationSleepOpt(%s)" % self.timeout
69+
70+
71+
class YdbRetryOperationFinalResult(object):
72+
def __init__(self, result):
73+
self.result = result
74+
self.exc = None
75+
76+
def __eq__(self, other):
77+
return type(self) == type(other) and self.result == other.result and self.exc == other.exc
78+
79+
def __repr__(self):
80+
return "YdbRetryOperationFinalResult(%s, exc=%s)" % (self.result, self.exc)
81+
82+
def set_exception(self, exc):
83+
self.exc = exc
84+
85+
86+
def retry_operation_impl(callee, retry_settings=None, *args, **kwargs):
87+
retry_settings = RetrySettings() if retry_settings is None else retry_settings
88+
status = None
89+
90+
for attempt in range(retry_settings.max_retries + 1):
91+
try:
92+
result = YdbRetryOperationFinalResult(callee(*args, **kwargs))
93+
yield result
94+
95+
if result.exc is not None:
96+
raise result.exc
97+
98+
except issues.Error as e:
99+
status = e
100+
retry_settings.on_ydb_error_callback(e)
101+
102+
retriable_info = check_retriable_error(e, retry_settings, attempt)
103+
if not retriable_info.is_retriable:
104+
raise
105+
106+
skip_yield_error_types = [
107+
issues.Aborted,
108+
issues.BadSession,
109+
issues.NotFound,
110+
issues.InternalError,
111+
]
112+
113+
yield_sleep = True
114+
for t in skip_yield_error_types:
115+
if isinstance(e, t):
116+
yield_sleep = False
117+
118+
if yield_sleep:
119+
yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds)
120+
121+
except Exception as e:
122+
# you should provide your own handler you want
123+
retry_settings.unknown_error_handler(e)
124+
raise
125+
126+
raise status
127+
128+
129+
def retry_operation_sync(callee, retry_settings=None, *args, **kwargs):
130+
opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs)
131+
for next_opt in opt_generator:
132+
if isinstance(next_opt, YdbRetryOperationSleepOpt):
133+
time.sleep(next_opt.timeout)
134+
else:
135+
return next_opt.result

ydb/_topic_reader/topic_reader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
Callable,
1111
)
1212

13-
from ..table import RetrySettings
13+
from .._retries import RetrySettings
1414
from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange
1515

1616

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
from .. import (
2727
_apis,
2828
issues,
29-
check_retriable_error,
30-
RetrySettings,
3129
)
30+
from .._errors import check_retriable_error
31+
from .._retries import RetrySettings
3232
from .._grpc.grpcwrapper.ydb_topic_public_types import PublicCodec
3333
from .._grpc.grpcwrapper.ydb_topic import (
3434
UpdateTokenRequest,

ydb/query/pool.py

Lines changed: 6 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -5,94 +5,10 @@
55
from .session import (
66
QuerySessionSync,
77
)
8-
from .. import issues
9-
from .._errors import check_retriable_error
10-
11-
12-
class RetrySettings(object):
13-
def __init__(
14-
self,
15-
max_retries: int = 10,
16-
max_session_acquire_timeout: int = None,
17-
on_ydb_error_callback: Callable = None,
18-
idempotent: bool = False,
19-
):
20-
self.max_retries = max_retries
21-
self.max_session_acquire_timeout = max_session_acquire_timeout
22-
self.on_ydb_error_callback = (lambda e: None) if on_ydb_error_callback is None else on_ydb_error_callback
23-
self.retry_not_found = True
24-
self.idempotent = idempotent
25-
self.retry_internal_error = True
26-
self.unknown_error_handler = lambda e: None
27-
28-
29-
class YdbRetryOperationSleepOpt:
30-
def __init__(self, timeout):
31-
self.timeout = timeout
32-
33-
def __eq__(self, other):
34-
return type(self) == type(other) and self.timeout == other.timeout
35-
36-
def __repr__(self):
37-
return "YdbRetryOperationSleepOpt(%s)" % self.timeout
38-
39-
40-
class YdbRetryOperationFinalResult:
41-
def __init__(self, result):
42-
self.result = result
43-
self.exc = None
44-
45-
def __eq__(self, other):
46-
return type(self) == type(other) and self.result == other.result and self.exc == other.exc
47-
48-
def __repr__(self):
49-
return "YdbRetryOperationFinalResult(%s, exc=%s)" % (self.result, self.exc)
50-
51-
def set_exception(self, exc):
52-
self.exc = exc
53-
54-
55-
def retry_operation_impl(callee: Callable, retry_settings: RetrySettings = None, *args, **kwargs):
56-
retry_settings = RetrySettings() if retry_settings is None else retry_settings
57-
status = None
58-
59-
for attempt in range(retry_settings.max_retries + 1):
60-
try:
61-
result = YdbRetryOperationFinalResult(callee(*args, **kwargs))
62-
yield result
63-
64-
if result.exc is not None:
65-
raise result.exc
66-
67-
except issues.Error as e:
68-
status = e
69-
retry_settings.on_ydb_error_callback(e)
70-
71-
retriable_info = check_retriable_error(e, retry_settings, attempt)
72-
if not retriable_info.is_retriable:
73-
raise
74-
75-
skip_yield_error_types = [
76-
issues.Aborted,
77-
issues.BadSession,
78-
issues.NotFound,
79-
issues.InternalError,
80-
]
81-
82-
yield_sleep = True
83-
for t in skip_yield_error_types:
84-
if isinstance(e, t):
85-
yield_sleep = False
86-
87-
if yield_sleep:
88-
yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds)
89-
90-
except Exception as e:
91-
# you should provide your own handler you want
92-
retry_settings.unknown_error_handler(e)
93-
raise
94-
if status:
95-
raise status
8+
from .._retries import (
9+
RetrySettings,
10+
retry_operation_sync,
11+
)
9612

9713

9814
class QuerySessionPool:
@@ -122,12 +38,7 @@ def wrapped_callee():
12238
with self.checkout() as session:
12339
return callee(session, *args, **kwargs)
12440

125-
opt_generator = retry_operation_impl(wrapped_callee, retry_settings, *args, **kwargs)
126-
for next_opt in opt_generator:
127-
if isinstance(next_opt, YdbRetryOperationSleepOpt):
128-
time.sleep(next_opt.timeout)
129-
else:
130-
return next_opt.result
41+
return retry_operation_sync(wrapped_callee, retry_settings)
13142

13243
def execute_with_retries(self, query: str, retry_settings: RetrySettings = None, *args, **kwargs):
13344
"""Special interface to execute a one-shot queries in a safe, retriable way.
@@ -144,12 +55,7 @@ def wrapped_callee():
14455
it = session.execute(query, empty_tx_control=True, *args, **kwargs)
14556
return [result_set for result_set in it]
14657

147-
opt_generator = retry_operation_impl(wrapped_callee, retry_settings)
148-
for next_opt in opt_generator:
149-
if isinstance(next_opt, YdbRetryOperationSleepOpt):
150-
time.sleep(next_opt.timeout)
151-
else:
152-
return next_opt.result
58+
return retry_operation_sync(wrapped_callee, retry_settings)
15359

15460

15561
class SimpleQuerySessionCheckout:

0 commit comments

Comments
 (0)