Skip to content

Commit 802c841

Browse files
committed
basic pool & retries & example
1 parent 1195c4f commit 802c841

File tree

8 files changed

+307
-27
lines changed

8 files changed

+307
-27
lines changed

examples/query-service/basic_example.py

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
import ydb
22

3-
from ydb.query.session import QuerySessionSync
4-
53

64
def main():
75
driver_config = ydb.DriverConfig(
@@ -16,14 +14,57 @@ def main():
1614
except TimeoutError:
1715
raise RuntimeError("Connect failed to YDB")
1816

19-
session = QuerySessionSync(driver)
20-
session.create()
17+
# client = ydb.QueryClientSync(driver)
18+
# session = client.session().create()
19+
pool = ydb.QuerySessionPool(driver)
20+
# with pool.checkout() as session:
21+
def callee(session):
22+
print("="*50)
23+
print("BEFORE ACTION")
24+
it = session.execute("""SELECT COUNT(*) FROM example;""")
25+
for result_set in it:
26+
print(f"rows: {str(result_set.rows)}")
27+
28+
print("="*50)
29+
print("INSERT WITH COMMIT TX")
30+
tx = session.transaction()
31+
32+
tx.begin()
33+
34+
tx.execute("""INSERT INTO example (key, value) VALUES (0033, "onepieceisreal");""")
35+
36+
for result_set in tx.execute("""SELECT COUNT(*) FROM example;"""):
37+
print(f"rows: {str(result_set.rows)}")
38+
39+
tx.commit()
40+
41+
print("="*50)
42+
print("AFTER COMMIT TX")
43+
44+
for result_set in session.execute("""SELECT COUNT(*) FROM example;"""):
45+
print(f"rows: {str(result_set.rows)}")
46+
47+
print("="*50)
48+
print("INSERT WITH ROLLBACK TX")
49+
50+
tx = session.transaction()
51+
52+
tx.begin()
53+
54+
tx.execute("""INSERT INTO example (key, value) VALUES (0044, "onepieceisreal");""")
55+
56+
for result_set in tx.execute("""SELECT COUNT(*) FROM example;"""):
57+
print(f"rows: {str(result_set.rows)}")
58+
59+
tx.rollback()
60+
61+
print("="*50)
62+
print("AFTER ROLLBACK TX")
63+
64+
for result_set in session.execute("""SELECT COUNT(*) FROM example;"""):
65+
print(f"rows: {str(result_set.rows)}")
2166

22-
it = session.execute("select 1; select 2;", commit_tx=False)
23-
for result_set in it:
24-
# pass
25-
print(f"columns: {str(result_set.columns)}")
26-
print(f"rows: {str(result_set.rows)}")
67+
pool.retry_operation_sync(callee)
2768

2869

2970
if __name__ == "__main__":

ydb/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from .tracing import * # noqa
2020
from .topic import * # noqa
2121
from .draft import * # noqa
22+
from .query import * # noqa
2223

2324
try:
2425
import ydb.aio as aio # noqa

ydb/_grpc/grpcwrapper/ydb_query.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,11 @@ def from_proto(msg: ydb_query_pb2.RollbackTransactionResponse) -> "RollbackTrans
129129
@dataclass
130130
class QueryContent(IFromPublic, IToProto):
131131
text: str
132-
syntax: Optional[str] = None
132+
syntax: Optional[int] = None
133133

134134
@staticmethod
135-
def from_public(query: str) -> "QueryContent":
136-
return QueryContent(text=query)
135+
def from_public(query: str, syntax: int = None) -> "QueryContent":
136+
return QueryContent(text=query, syntax=syntax)
137137

138138
def to_proto(self) -> ydb_query_pb2.QueryContent:
139139
return ydb_query_pb2.QueryContent(text=self.text, syntax=self.syntax)
@@ -163,16 +163,16 @@ class ExecuteQueryRequest(IToProto):
163163
query_content: QueryContent
164164
tx_control: TransactionControl
165165
concurrent_result_sets: Optional[bool] = False
166-
exec_mode: Optional[str] = None
166+
exec_mode: Optional[int] = None
167167
parameters: Optional[dict] = None
168-
stats_mode: Optional[str] = None
168+
stats_mode: Optional[int] = None
169169

170170
def to_proto(self) -> ydb_query_pb2.ExecuteQueryRequest:
171171
return ydb_query_pb2.ExecuteQueryRequest(
172172
session_id=self.session_id,
173173
tx_control=self.tx_control.to_proto(),
174174
query_content=self.query_content.to_proto(),
175-
exec_mode=ydb_query_pb2.EXEC_MODE_EXECUTE,
175+
exec_mode=self.exec_mode,
176176
stats_mode=self.stats_mode,
177177
concurrent_result_sets=self.concurrent_result_sets,
178178
parameters=self.parameters,

ydb/query/__init__.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from .base import (
2+
IQueryClient,
3+
SupportedDriverType,
4+
QueryClientSettings,
5+
)
6+
7+
from .session import QuerySessionSync
8+
9+
from .._grpc.grpcwrapper.ydb_query_public_types import (
10+
QueryOnlineReadOnly,
11+
QuerySerializableReadWrite,
12+
QuerySnapshotReadOnly,
13+
QueryStaleReadOnly,
14+
)
15+
16+
from .pool import QuerySessionPool
17+
18+
19+
class QueryClientSync(IQueryClient):
20+
def __init__(self, driver: SupportedDriverType, query_client_settings: QueryClientSettings = None):
21+
self._driver = driver
22+
self._settings = query_client_settings
23+
24+
def session(self) -> QuerySessionSync:
25+
return QuerySessionSync(self._driver, self._settings)

ydb/query/base.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import abc
2+
import enum
23
import functools
34

45
from typing import (
@@ -63,7 +64,7 @@ def __init__(self, driver: SupportedDriverType, settings: QueryClientSettings =
6364
pass
6465

6566
@abc.abstractmethod
66-
def create(self) -> None:
67+
def create(self) -> "IQuerySession":
6768
pass
6869

6970
@abc.abstractmethod
@@ -117,7 +118,7 @@ def rollback(settings: QueryClientSettings = None):
117118
pass
118119

119120
@abc.abstractmethod
120-
def execute(query: str):
121+
def execute(query: str, commit_tx=False):
121122
pass
122123

123124

@@ -130,39 +131,73 @@ def session(self) -> IQuerySession:
130131
pass
131132

132133

134+
class QuerySyntax(enum.IntEnum):
135+
UNSPECIFIED = 0
136+
YQL_V1 = 1
137+
PG = 2
138+
139+
140+
class QueryExecMode(enum.IntEnum):
141+
UNSPECIFIED = 0
142+
PARSE = 10
143+
VALIDATE = 20
144+
EXPLAIN = 30
145+
EXECUTE = 50
146+
147+
133148
def create_execute_query_request(
134-
query: str, session_id: str, tx_id: str = None, commit_tx: bool = False, tx_mode: BaseQueryTxMode = None
149+
query: str,
150+
session_id: str,
151+
tx_id: str = None,
152+
commit_tx: bool = False,
153+
tx_mode: BaseQueryTxMode = None,
154+
syntax: QuerySyntax = None,
155+
exec_mode: QueryExecMode = None,
156+
parameters: dict = None,
157+
concurrent_result_sets: bool = False,
158+
135159
):
160+
syntax = QuerySyntax.YQL_V1 if not syntax else syntax
161+
exec_mode = QueryExecMode.EXECUTE if not exec_mode else exec_mode
136162
if tx_id:
137163
req = ydb_query.ExecuteQueryRequest(
138164
session_id=session_id,
139165
query_content=ydb_query.QueryContent.from_public(
140166
query=query,
167+
syntax=syntax,
141168
),
142169
tx_control=ydb_query.TransactionControl(
143170
tx_id=tx_id,
144171
commit_tx=commit_tx,
145172
),
173+
exec_mode=exec_mode,
174+
parameters=parameters,
175+
concurrent_result_sets=concurrent_result_sets,
146176
)
147177
else:
148178
tx_mode = tx_mode if tx_mode is not None else QuerySerializableReadWrite()
149179
req = ydb_query.ExecuteQueryRequest(
150180
session_id=session_id,
151181
query_content=ydb_query.QueryContent.from_public(
152182
query=query,
183+
syntax=syntax,
153184
),
154185
tx_control=ydb_query.TransactionControl(
155186
begin_tx=ydb_query.TransactionSettings(
156187
tx_mode=tx_mode,
157188
),
158189
commit_tx=commit_tx,
159190
),
191+
exec_mode=exec_mode,
192+
parameters=parameters,
193+
concurrent_result_sets=concurrent_result_sets,
160194
)
161195

162196
return req.to_proto()
163197

164198

165199
def wrap_execute_query_response(rpc_state, response_pb):
200+
issues._process_response(response_pb)
166201
return convert.ResultSet.from_message(response_pb.result_set)
167202

168203

ydb/query/pool.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import abc
2+
import time
3+
from typing import Callable
4+
5+
from . import base
6+
from .session import (
7+
QuerySessionSync,
8+
BaseQuerySession,
9+
)
10+
from .. import issues
11+
from .._errors import check_retriable_error
12+
13+
14+
class RetrySettings(object):
15+
def __init__(
16+
self,
17+
max_retries: int = 10,
18+
max_session_acquire_timeout: int = None,
19+
on_ydb_error_callback: Callable = None,
20+
idempotent: bool = False,
21+
):
22+
self.max_retries = max_retries
23+
self.max_session_acquire_timeout = max_session_acquire_timeout
24+
self.on_ydb_error_callback = (lambda e: None) if on_ydb_error_callback is None else on_ydb_error_callback
25+
self.retry_not_found = True
26+
self.idempotent = idempotent
27+
self.retry_internal_error = True
28+
self.unknown_error_handler = lambda e: None
29+
30+
31+
class YdbRetryOperationSleepOpt:
32+
def __init__(self, timeout):
33+
self.timeout = timeout
34+
35+
def __eq__(self, other):
36+
return type(self) == type(other) and self.timeout == other.timeout
37+
38+
def __repr__(self):
39+
return "YdbRetryOperationSleepOpt(%s)" % self.timeout
40+
41+
42+
class YdbRetryOperationFinalResult:
43+
def __init__(self, result):
44+
self.result = result
45+
self.exc = None
46+
47+
def __eq__(self, other):
48+
return type(self) == type(other) and self.result == other.result and self.exc == other.exc
49+
50+
def __repr__(self):
51+
return "YdbRetryOperationFinalResult(%s, exc=%s)" % (self.result, self.exc)
52+
53+
def set_exception(self, exc):
54+
self.exc = exc
55+
56+
57+
def retry_operation_impl(callee: Callable, retry_settings: RetrySettings = None, *args, **kwargs):
58+
retry_settings = RetrySettings() if retry_settings is None else retry_settings
59+
status = None
60+
61+
for attempt in range(retry_settings.max_retries + 1):
62+
try:
63+
result = YdbRetryOperationFinalResult(callee(*args, **kwargs))
64+
yield result
65+
66+
if result.exc is not None:
67+
raise result.exc
68+
69+
except issues.Error as e:
70+
status = e
71+
retry_settings.on_ydb_error_callback(e)
72+
73+
retriable_info = check_retriable_error(e, retry_settings, attempt)
74+
if not retriable_info.is_retriable:
75+
raise
76+
77+
skip_yield_error_types = [
78+
issues.Aborted,
79+
issues.BadSession,
80+
issues.NotFound,
81+
issues.InternalError,
82+
]
83+
84+
yield_sleep = True
85+
for t in skip_yield_error_types:
86+
if isinstance(e, t):
87+
yield_sleep = False
88+
89+
if yield_sleep:
90+
yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds)
91+
92+
except Exception as e:
93+
# you should provide your own handler you want
94+
retry_settings.unknown_error_handler(e)
95+
raise
96+
97+
raise status
98+
99+
100+
class QuerySessionPool:
101+
def __init__(self, driver: base.SupportedDriverType):
102+
self._driver = driver
103+
104+
def checkout(self):
105+
return SimpleQuerySessionCheckout(self)
106+
107+
def retry_operation_sync(self, callee: Callable, retry_settings: RetrySettings = None, *args, **kwargs):
108+
retry_settings = RetrySettings() if retry_settings is None else retry_settings
109+
110+
def wrapped_callee():
111+
with self.checkout() as session:
112+
return callee(session, *args, **kwargs)
113+
114+
opt_generator = retry_operation_impl(wrapped_callee, retry_settings, *args, **kwargs)
115+
for next_opt in opt_generator:
116+
if isinstance(next_opt, YdbRetryOperationSleepOpt):
117+
time.sleep(next_opt.timeout)
118+
else:
119+
return next_opt.result
120+
121+
122+
class SimpleQuerySessionCheckout:
123+
def __init__(self, pool: QuerySessionPool):
124+
self._pool = pool
125+
self._session = QuerySessionSync(pool._driver)
126+
127+
def __enter__(self):
128+
self._session.create()
129+
return self._session
130+
131+
def __exit__(self, exc_type, exc_val, exc_tb):
132+
self._session.delete()

ydb/query/session.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,14 +205,16 @@ def delete(self) -> None:
205205
self._delete_call()
206206
self._stream.cancel()
207207

208-
def create(self) -> None:
208+
def create(self) -> "QuerySessionSync":
209209
if self._state._already_in(QuerySessionStateEnum.CREATED):
210210
return
211211

212212
self._state._check_invalid_transition(QuerySessionStateEnum.CREATED)
213213
self._create_call()
214214
self._attach()
215215

216+
return self
217+
216218
def transaction(self, tx_mode: base.BaseQueryTxMode = None) -> base.IQueryTxContext:
217219
self._state._check_session_ready_to_use()
218220

0 commit comments

Comments
 (0)