Skip to content

Commit 1ac441a

Browse files
committed
Changed the flow
1 parent 8e759db commit 1ac441a

File tree

5 files changed

+29
-23
lines changed

5 files changed

+29
-23
lines changed

src/databricks/sql/auth/retry.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ class CommandType(Enum):
3232
CLOSE_SESSION = "CloseSession"
3333
CLOSE_OPERATION = "CloseOperation"
3434
GET_OPERATION_STATUS = "GetOperationStatus"
35-
FETCH_RESULTS_INLINE_FETCH_NEXT = "FetchResultsInline_FETCH_NEXT"
3635
OTHER = "Other"
3736

3837
@classmethod
@@ -242,6 +241,14 @@ def command_type(self) -> Optional[CommandType]:
242241
def command_type(self, value: CommandType) -> None:
243242
self._command_type = value
244243

244+
@property
245+
def is_retryable(self) -> bool:
246+
return self._is_retryable
247+
248+
@is_retryable.setter
249+
def is_retryable(self, value: bool) -> None:
250+
self._is_retryable = value
251+
245252
@property
246253
def delay_default(self) -> float:
247254
"""Time in seconds the connector will wait between requests polling a GetOperationStatus Request
@@ -363,11 +370,8 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]:
363370
if status_code == 501:
364371
raise NonRecoverableNetworkError("Received code 501 from server.")
365372

366-
if self.command_type == CommandType.FETCH_RESULTS_INLINE_FETCH_NEXT:
367-
return (
368-
False,
369-
"FetchResults in INLINE mode with FETCH_NEXT orientation are not idempotent and is not retried",
370-
)
373+
if self.is_retryable == False:
374+
return False, "Request is not retryable"
371375

372376
# Request failed and this method is not retryable. We only retry POST requests.
373377
if not self._is_method_retryable(method):

src/databricks/sql/auth/thrift_http_client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,3 +216,12 @@ def set_retry_command_type(self, value: CommandType):
216216
logger.warning(
217217
"DatabricksRetryPolicy is currently bypassed. The CommandType cannot be set."
218218
)
219+
220+
def set_is_retryable(self, retryable: bool):
221+
"""Pass the provided retryable flag to the retry policy"""
222+
if isinstance(self.retry_policy, DatabricksRetryPolicy):
223+
self.retry_policy.is_retryable = retryable
224+
else:
225+
logger.warning(
226+
"DatabricksRetryPolicy is currently bypassed. The is_retryable flag cannot be set."
227+
)

src/databricks/sql/client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,7 @@ def execute(
808808
self.thrift_backend,
809809
self.buffer_size_bytes,
810810
self.arraysize,
811+
self.connection.use_cloud_fetch,
811812
)
812813

813814
if execute_response.is_staging_operation:
@@ -1202,6 +1203,7 @@ def __init__(
12021203
thrift_backend: ThriftBackend,
12031204
result_buffer_size_bytes: int = DEFAULT_RESULT_BUFFER_SIZE_BYTES,
12041205
arraysize: int = 10000,
1206+
use_cloud_fetch: bool = True,
12051207
):
12061208
"""
12071209
A ResultSet manages the results of a single command.
@@ -1223,6 +1225,7 @@ def __init__(
12231225
self.description = execute_response.description
12241226
self._arrow_schema_bytes = execute_response.arrow_schema_bytes
12251227
self._next_row_index = 0
1228+
self.use_cloud_fetch = use_cloud_fetch
12261229

12271230
if execute_response.arrow_queue:
12281231
# In this case the server has taken the fast path and returned an initial batch of
@@ -1250,6 +1253,7 @@ def _fill_results_buffer(self):
12501253
lz4_compressed=self.lz4_compressed,
12511254
arrow_schema_bytes=self._arrow_schema_bytes,
12521255
description=self.description,
1256+
use_cloud_fetch=self.use_cloud_fetch,
12531257
)
12541258
self.results = results
12551259
self.has_more_rows = has_more_rows

src/databricks/sql/thrift_backend.py

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ def _handle_request_error(self, error_info, attempt, elapsed):
321321

322322
# FUTURE: Consider moving to https://github.com/litl/backoff or
323323
# https://github.com/jd/tenacity for retry logic.
324-
def make_request(self, method, request):
324+
def make_request(self, method, request, retryable=True):
325325
"""Execute given request, attempting retries when
326326
1. Receiving HTTP 429/503 from server
327327
2. OSError is raised during a GetOperationStatus
@@ -374,20 +374,9 @@ def attempt_request(attempt):
374374

375375
# These three lines are no-ops if the v3 retry policy is not in use
376376
if self.enable_v3_retries:
377-
# Not to retry when FetchResults in INLINE mode when it has orientation as FETCH_NEXT as it is not idempotent
378-
if (
379-
this_method_name == "FetchResults"
380-
and self._use_cloud_fetch == False
381-
):
382-
this_method_name += (
383-
"Inline_"
384-
+ ttypes.TFetchOrientation._VALUES_TO_NAMES[
385-
request.orientation
386-
]
387-
)
388-
389377
this_command_type = CommandType.get(this_method_name)
390378
self._transport.set_retry_command_type(this_command_type)
379+
self._transport.set_is_retryable(retryable)
391380
self._transport.startRetryTimer()
392381

393382
response = method(request)
@@ -898,8 +887,6 @@ def execute_command(
898887
):
899888
assert session_handle is not None
900889

901-
self._use_cloud_fetch = use_cloud_fetch
902-
903890
spark_arrow_types = ttypes.TSparkArrowTypes(
904891
timestampAsArrow=self._use_arrow_native_timestamps,
905892
decimalAsArrow=self._use_arrow_native_decimals,
@@ -1042,6 +1029,7 @@ def fetch_results(
10421029
lz4_compressed,
10431030
arrow_schema_bytes,
10441031
description,
1032+
use_cloud_fetch=True,
10451033
):
10461034
assert op_handle is not None
10471035

@@ -1058,7 +1046,8 @@ def fetch_results(
10581046
includeResultSetMetadata=True,
10591047
)
10601048

1061-
resp = self.make_request(self._client.FetchResults, req)
1049+
# Fetch results in Inline mode with FETCH_NEXT orientation are not idempotent and hence not retried
1050+
resp = self.make_request(self._client.FetchResults, req, use_cloud_fetch)
10621051
if resp.results.startRowOffset > expected_row_start_offset:
10631052
raise DataError(
10641053
"fetch_results failed due to inconsistency in the state between the client and the server. Expected results to start from {} but they instead start at {}, some result batches must have been skipped".format(

tests/unit/test_retry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,6 @@ def test_sleep__retry_after_present(self, t_mock, retry_policy, error_history):
8787

8888
def test_not_retryable__fetch_results_orientation_fetch_next(self, retry_policy):
8989
HTTP_STATUS_CODES = [200, 429, 503, 504]
90-
retry_policy.command_type = CommandType.FETCH_RESULTS_INLINE_FETCH_NEXT
90+
retry_policy.is_retryable = False
9191
for status_code in HTTP_STATUS_CODES:
9292
assert not retry_policy.is_retry("METHOD_NAME", status_code=status_code)

0 commit comments

Comments
 (0)