Skip to content

Commit 60afbe1

Browse files
committed
move retry logic to standalone module
1 parent 690c974 commit 60afbe1

File tree

7 files changed

+155
-242
lines changed

7 files changed

+155
-242
lines changed

tests/query/test_query_transaction.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,6 @@ def test_context_manager_normal_flow(self, tx: BaseTxContext):
6565
assert tx._tx_state._state == QueryTxStateEnum.COMMITTED
6666

6767
def test_context_manager_does_not_hide_exceptions(self, tx: BaseTxContext):
68-
with pytest.raises(RuntimeError):
68+
with pytest.raises(Exception):
6969
with tx:
70-
tx.commit()
70+
raise Exception()

ydb/_retries.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
import random
2+
import time
3+
4+
from . import issues
5+
from ._errors import check_retriable_error
6+
7+
8+
class BackoffSettings(object):
9+
def __init__(self, ceiling=6, slot_duration=0.001, uncertain_ratio=0.5):
10+
self.ceiling = ceiling
11+
self.slot_duration = slot_duration
12+
self.uncertain_ratio = uncertain_ratio
13+
14+
def calc_timeout(self, retry_number):
15+
slots_count = 1 << min(retry_number, self.ceiling)
16+
max_duration_ms = slots_count * self.slot_duration * 1000.0
17+
# duration_ms = random.random() * max_duration_ms * uncertain_ratio) + max_duration_ms * (1 - uncertain_ratio)
18+
duration_ms = max_duration_ms * (random.random() * self.uncertain_ratio + 1.0 - self.uncertain_ratio)
19+
return duration_ms / 1000.0
20+
21+
22+
class RetrySettings(object):
23+
def __init__(
24+
self,
25+
max_retries=10,
26+
max_session_acquire_timeout=None,
27+
on_ydb_error_callback=None,
28+
backoff_ceiling=6,
29+
backoff_slot_duration=1,
30+
get_session_client_timeout=5,
31+
fast_backoff_settings=None,
32+
slow_backoff_settings=None,
33+
idempotent=False,
34+
):
35+
self.max_retries = max_retries
36+
self.max_session_acquire_timeout = max_session_acquire_timeout
37+
self.on_ydb_error_callback = (lambda e: None) if on_ydb_error_callback is None else on_ydb_error_callback
38+
self.fast_backoff = BackoffSettings(10, 0.005) if fast_backoff_settings is None else fast_backoff_settings
39+
self.slow_backoff = (
40+
BackoffSettings(backoff_ceiling, backoff_slot_duration)
41+
if slow_backoff_settings is None
42+
else slow_backoff_settings
43+
)
44+
self.retry_not_found = True
45+
self.idempotent = idempotent
46+
self.retry_internal_error = True
47+
self.unknown_error_handler = lambda e: None
48+
self.get_session_client_timeout = get_session_client_timeout
49+
if max_session_acquire_timeout is not None:
50+
self.get_session_client_timeout = min(self.max_session_acquire_timeout, self.get_session_client_timeout)
51+
52+
def with_fast_backoff(self, backoff_settings):
53+
self.fast_backoff = backoff_settings
54+
return self
55+
56+
def with_slow_backoff(self, backoff_settings):
57+
self.slow_backoff = backoff_settings
58+
return self
59+
60+
61+
class YdbRetryOperationSleepOpt(object):
62+
def __init__(self, timeout):
63+
self.timeout = timeout
64+
65+
def __eq__(self, other):
66+
return type(self) == type(other) and self.timeout == other.timeout
67+
68+
def __repr__(self):
69+
return "YdbRetryOperationSleepOpt(%s)" % self.timeout
70+
71+
72+
class YdbRetryOperationFinalResult(object):
73+
def __init__(self, result):
74+
self.result = result
75+
self.exc = None
76+
77+
def __eq__(self, other):
78+
return type(self) == type(other) and self.result == other.result and self.exc == other.exc
79+
80+
def __repr__(self):
81+
return "YdbRetryOperationFinalResult(%s, exc=%s)" % (self.result, self.exc)
82+
83+
def set_exception(self, exc):
84+
self.exc = exc
85+
86+
87+
def retry_operation_impl(callee, retry_settings=None, *args, **kwargs):
88+
retry_settings = RetrySettings() if retry_settings is None else retry_settings
89+
status = None
90+
91+
for attempt in range(retry_settings.max_retries + 1):
92+
try:
93+
result = YdbRetryOperationFinalResult(callee(*args, **kwargs))
94+
yield result
95+
96+
if result.exc is not None:
97+
raise result.exc
98+
99+
except issues.Error as e:
100+
status = e
101+
retry_settings.on_ydb_error_callback(e)
102+
103+
retriable_info = check_retriable_error(e, retry_settings, attempt)
104+
if not retriable_info.is_retriable:
105+
raise
106+
107+
skip_yield_error_types = [
108+
issues.Aborted,
109+
issues.BadSession,
110+
issues.NotFound,
111+
issues.InternalError,
112+
]
113+
114+
yield_sleep = True
115+
for t in skip_yield_error_types:
116+
if isinstance(e, t):
117+
yield_sleep = False
118+
119+
if yield_sleep:
120+
yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds)
121+
122+
except Exception as e:
123+
# you should provide your own handler you want
124+
retry_settings.unknown_error_handler(e)
125+
raise
126+
127+
raise status
128+
129+
130+
def retry_operation_sync(callee, retry_settings=None, *args, **kwargs):
131+
opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs)
132+
for next_opt in opt_generator:
133+
if isinstance(next_opt, YdbRetryOperationSleepOpt):
134+
time.sleep(next_opt.timeout)
135+
else:
136+
return next_opt.result

ydb/_topic_reader/topic_reader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
Callable,
1111
)
1212

13-
from ..table import RetrySettings
13+
from .._retries import RetrySettings
1414
from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange
1515

1616

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
from .. import (
2727
_apis,
2828
issues,
29-
check_retriable_error,
30-
RetrySettings,
3129
)
30+
from .._errors import check_retriable_error
31+
from .._retries import RetrySettings
3232
from .._grpc.grpcwrapper.ydb_topic_public_types import PublicCodec
3333
from .._grpc.grpcwrapper.ydb_topic import (
3434
UpdateTokenRequest,

ydb/query/pool.py

Lines changed: 6 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1,98 +1,13 @@
1-
import time
21
from typing import Callable
32

43
from . import base
54
from .session import (
65
QuerySessionSync,
76
)
8-
from .. import issues
9-
from .._errors import check_retriable_error
10-
11-
12-
class RetrySettings(object):
13-
def __init__(
14-
self,
15-
max_retries: int = 10,
16-
max_session_acquire_timeout: int = None,
17-
on_ydb_error_callback: Callable = None,
18-
idempotent: bool = False,
19-
):
20-
self.max_retries = max_retries
21-
self.max_session_acquire_timeout = max_session_acquire_timeout
22-
self.on_ydb_error_callback = (lambda e: None) if on_ydb_error_callback is None else on_ydb_error_callback
23-
self.retry_not_found = True
24-
self.idempotent = idempotent
25-
self.retry_internal_error = True
26-
self.unknown_error_handler = lambda e: None
27-
28-
29-
class YdbRetryOperationSleepOpt:
30-
def __init__(self, timeout):
31-
self.timeout = timeout
32-
33-
def __eq__(self, other):
34-
return type(self) == type(other) and self.timeout == other.timeout
35-
36-
def __repr__(self):
37-
return "YdbRetryOperationSleepOpt(%s)" % self.timeout
38-
39-
40-
class YdbRetryOperationFinalResult:
41-
def __init__(self, result):
42-
self.result = result
43-
self.exc = None
44-
45-
def __eq__(self, other):
46-
return type(self) == type(other) and self.result == other.result and self.exc == other.exc
47-
48-
def __repr__(self):
49-
return "YdbRetryOperationFinalResult(%s, exc=%s)" % (self.result, self.exc)
50-
51-
def set_exception(self, exc):
52-
self.exc = exc
53-
54-
55-
def retry_operation_impl(callee: Callable, retry_settings: RetrySettings = None, *args, **kwargs):
56-
retry_settings = RetrySettings() if retry_settings is None else retry_settings
57-
status = None
58-
59-
for attempt in range(retry_settings.max_retries + 1):
60-
try:
61-
result = YdbRetryOperationFinalResult(callee(*args, **kwargs))
62-
yield result
63-
64-
if result.exc is not None:
65-
raise result.exc
66-
67-
except issues.Error as e:
68-
status = e
69-
retry_settings.on_ydb_error_callback(e)
70-
71-
retriable_info = check_retriable_error(e, retry_settings, attempt)
72-
if not retriable_info.is_retriable:
73-
raise
74-
75-
skip_yield_error_types = [
76-
issues.Aborted,
77-
issues.BadSession,
78-
issues.NotFound,
79-
issues.InternalError,
80-
]
81-
82-
yield_sleep = True
83-
for t in skip_yield_error_types:
84-
if isinstance(e, t):
85-
yield_sleep = False
86-
87-
if yield_sleep:
88-
yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds)
89-
90-
except Exception as e:
91-
# you should provide your own handler you want
92-
retry_settings.unknown_error_handler(e)
93-
raise
94-
if status:
95-
raise status
7+
from .._retries import (
8+
RetrySettings,
9+
retry_operation_sync,
10+
)
9611

9712

9813
class QuerySessionPool:
@@ -122,12 +37,7 @@ def wrapped_callee():
12237
with self.checkout() as session:
12338
return callee(session, *args, **kwargs)
12439

125-
opt_generator = retry_operation_impl(wrapped_callee, retry_settings, *args, **kwargs)
126-
for next_opt in opt_generator:
127-
if isinstance(next_opt, YdbRetryOperationSleepOpt):
128-
time.sleep(next_opt.timeout)
129-
else:
130-
return next_opt.result
40+
return retry_operation_sync(wrapped_callee, retry_settings)
13141

13242
def execute_with_retries(self, query: str, retry_settings: RetrySettings = None, *args, **kwargs):
13343
"""Special interface to execute a one-shot queries in a safe, retriable way.
@@ -144,12 +54,7 @@ def wrapped_callee():
14454
it = session.execute(query, empty_tx_control=True, *args, **kwargs)
14555
return [result_set for result_set in it]
14656

147-
opt_generator = retry_operation_impl(wrapped_callee, retry_settings)
148-
for next_opt in opt_generator:
149-
if isinstance(next_opt, YdbRetryOperationSleepOpt):
150-
time.sleep(next_opt.timeout)
151-
else:
152-
return next_opt.result
57+
return retry_operation_sync(wrapped_callee, retry_settings)
15358

15459

15560
class SimpleQuerySessionCheckout:

0 commit comments

Comments
 (0)