Skip to content

Commit c1c9007

Browse files
committed
Merge remote-tracking branch 'origin/main' into user-agent-param
2 parents 0eb56f1 + fbaa451 commit c1c9007

File tree

11 files changed

+123
-43
lines changed

11 files changed

+123
-43
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
# the repo. Unless a later match takes precedence, these
33
# users will be requested for review when someone opens a
44
# pull request.
5-
* @rcypher-databricks @yunbodeng-db @andrefurlan-db @jackyhu-db @benc-db @kravets-levko
5+
* @deeksha-db @samikshya-db @jprakash-db @yunbodeng-db @jackyhu-db @benc-db

.github/workflows/code-quality-checks.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ jobs:
8989
#----------------------------------------------
9090
- name: Load cached venv
9191
id: cached-poetry-dependencies
92-
uses: actions/cache@v2
92+
uses: actions/cache@v4
9393
with:
9494
path: .venv-pyarrow
9595
key: venv-pyarrow-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }}

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,16 @@
55
- Split the connector into two separate packages: `databricks-sql-connector` and `databricks-sqlalchemy`. The `databricks-sql-connector` package contains the core functionality of the connector, while the `databricks-sqlalchemy` package contains the SQLAlchemy dialect for the connector.
66
- Pyarrow dependency is now optional in `databricks-sql-connector`. Users needing arrow are supposed to explicitly install pyarrow
77

8+
# 3.7.3 (2025-03-28)
9+
10+
- Fix: Unable to poll small results in execute_async function (databricks/databricks-sql-python#515 by @jprakash-db)
11+
- Updated log messages to show the status code and error messages of requests (databricks/databricks-sql-python#511 by @jprakash-db)
12+
- Fix: Incorrect metadata was fetched in case of queries with the same alias (databricks/databricks-sql-python#505 by @jprakash-db)
13+
14+
# 3.7.2 (2025-01-31)
15+
16+
- Updated the retry_dela_max and retry_timeout (databricks/databricks-sql-python#497 by @jprakash-db)
17+
818
# 3.7.1 (2025-01-07)
919

1020
- Relaxed the number of Http retry attempts (databricks/databricks-sql-python#486 by @jprakash-db)

src/databricks/sql/auth/retry.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,9 @@ def sleep_for_retry(self, response: BaseHTTPResponse) -> bool:
290290
else:
291291
proposed_wait = self.get_backoff_time()
292292

293-
proposed_wait = min(proposed_wait, self.delay_max)
293+
proposed_wait = max(proposed_wait, self.delay_max)
294294
self.check_proposed_wait(proposed_wait)
295+
logger.debug(f"Retrying after {proposed_wait} seconds")
295296
time.sleep(proposed_wait)
296297
return True
297298

@@ -344,23 +345,24 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]:
344345
if a retry would violate the configured policy.
345346
"""
346347

348+
logger.info(f"Received status code {status_code} for {method} request")
349+
347350
# Request succeeded. Don't retry.
348351
if status_code == 200:
349352
return False, "200 codes are not retried"
350353

351354
if status_code == 401:
352-
raise NonRecoverableNetworkError(
353-
"Received 401 - UNAUTHORIZED. Confirm your authentication credentials."
355+
return (
356+
False,
357+
"Received 401 - UNAUTHORIZED. Confirm your authentication credentials.",
354358
)
355359

356360
if status_code == 403:
357-
raise NonRecoverableNetworkError(
358-
"Received 403 - FORBIDDEN. Confirm your authentication credentials."
359-
)
361+
return False, "403 codes are not retried"
360362

361363
# Request failed and server said NotImplemented. This isn't recoverable. Don't retry.
362364
if status_code == 501:
363-
raise NonRecoverableNetworkError("Received code 501 from server.")
365+
return False, "Received code 501 from server."
364366

365367
# Request failed and this method is not retryable. We only retry POST requests.
366368
if not self._is_method_retryable(method):
@@ -399,8 +401,9 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]:
399401
and status_code not in self.status_forcelist
400402
and status_code not in self.force_dangerous_codes
401403
):
402-
raise UnsafeToRetryError(
403-
"ExecuteStatement command can only be retried for codes 429 and 503. Received code: {status_code}"
404+
return (
405+
False,
406+
"ExecuteStatement command can only be retried for codes 429 and 503",
404407
)
405408

406409
# Request failed with a dangerous code, was an ExecuteStatement, but user forced retries for this

src/databricks/sql/auth/thrift_http_client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ def flush(self):
198198
self.message = self.__resp.reason
199199
self.headers = self.__resp.headers
200200

201+
logger.info(
202+
"HTTP Response with status code {}, message: {}".format(
203+
self.code, self.message
204+
)
205+
)
206+
201207
@staticmethod
202208
def basic_proxy_auth_headers(proxy):
203209
if proxy is None or not proxy.username:

src/databricks/sql/client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,7 @@ def execute(
747747
self,
748748
operation: str,
749749
parameters: Optional[TParameterCollection] = None,
750+
enforce_embedded_schema_correctness=False,
750751
) -> "Cursor":
751752
"""
752753
Execute a query and wait for execution to complete.
@@ -811,6 +812,7 @@ def execute(
811812
use_cloud_fetch=self.connection.use_cloud_fetch,
812813
parameters=prepared_params,
813814
async_op=False,
815+
enforce_embedded_schema_correctness=enforce_embedded_schema_correctness,
814816
)
815817
self.active_result_set = ResultSet(
816818
self.connection,
@@ -832,6 +834,7 @@ def execute_async(
832834
self,
833835
operation: str,
834836
parameters: Optional[TParameterCollection] = None,
837+
enforce_embedded_schema_correctness=False,
835838
) -> "Cursor":
836839
"""
837840
@@ -872,6 +875,7 @@ def execute_async(
872875
use_cloud_fetch=self.connection.use_cloud_fetch,
873876
parameters=prepared_params,
874877
async_op=True,
878+
enforce_embedded_schema_correctness=enforce_embedded_schema_correctness,
875879
)
876880

877881
return self

src/databricks/sql/thrift_backend.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
# - 900s attempts-duration lines up w ODBC/JDBC drivers (for cluster startup > 10 mins)
6767
_retry_policy = { # (type, default, min, max)
6868
"_retry_delay_min": (float, 1, 0.1, 60),
69-
"_retry_delay_max": (float, 30, 5, 3600),
69+
"_retry_delay_max": (float, 60, 5, 3600),
7070
"_retry_stop_after_attempts_count": (int, 30, 1, 60),
7171
"_retry_stop_after_attempts_duration": (float, 900, 1, 86400),
7272
"_retry_delay_default": (float, 5, 1, 60),
@@ -881,6 +881,7 @@ def execute_command(
881881
use_cloud_fetch=True,
882882
parameters=[],
883883
async_op=False,
884+
enforce_embedded_schema_correctness=False,
884885
):
885886
assert session_handle is not None
886887

@@ -896,8 +897,12 @@ def execute_command(
896897
sessionHandle=session_handle,
897898
statement=operation,
898899
runAsync=True,
899-
getDirectResults=ttypes.TSparkGetDirectResults(
900-
maxRows=max_rows, maxBytes=max_bytes
900+
# For async operation we don't want the direct results
901+
getDirectResults=None
902+
if async_op
903+
else ttypes.TSparkGetDirectResults(
904+
maxRows=max_rows,
905+
maxBytes=max_bytes,
901906
),
902907
canReadArrowResult=True if pyarrow else False,
903908
canDecompressLZ4Result=lz4_compression,
@@ -908,6 +913,7 @@ def execute_command(
908913
},
909914
useArrowNativeTypes=spark_arrow_types,
910915
parameters=parameters,
916+
enforceEmbeddedSchemaCorrectness=enforce_embedded_schema_correctness,
911917
)
912918
resp = self.make_request(self._client.ExecuteStatement, req)
913919

tests/e2e/common/large_queries_mixin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def test_long_running_query(self):
9494
scale_factor = 1
9595
with self.cursor() as cursor:
9696
while duration < min_duration:
97-
assert scale_factor < 512, "Detected infinite loop"
97+
assert scale_factor < 1024, "Detected infinite loop"
9898
start = time.time()
9999

100100
cursor.execute(

tests/e2e/common/retry_test_mixins.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,9 @@ class PySQLRetryTestsMixin:
121121
# For testing purposes
122122
_retry_policy = {
123123
"_retry_delay_min": 0.1,
124-
"_retry_delay_max": 5,
124+
"_retry_delay_max": 3,
125125
"_retry_stop_after_attempts_count": 5,
126-
"_retry_stop_after_attempts_duration": 10,
126+
"_retry_stop_after_attempts_duration": 30,
127127
"_retry_delay_default": 0.5,
128128
}
129129

@@ -135,7 +135,7 @@ def test_retry_urllib3_settings_are_honored(self):
135135
urllib3_config = {"connect": 10, "read": 11, "redirect": 12}
136136
rp = DatabricksRetryPolicy(
137137
delay_min=0.1,
138-
delay_max=10.0,
138+
delay_max=3,
139139
stop_after_attempts_count=10,
140140
stop_after_attempts_duration=10.0,
141141
delay_default=1.0,
@@ -174,14 +174,14 @@ def test_retry_max_count_not_exceeded(self):
174174
def test_retry_exponential_backoff(self):
175175
"""GIVEN the retry policy is configured for reasonable exponential backoff
176176
WHEN the server sends nothing but 429 responses with retry-afters
177-
THEN the connector will use those retry-afters values as delay
177+
THEN the connector will use those retry-afters values as floor
178178
"""
179179
retry_policy = self._retry_policy.copy()
180180
retry_policy["_retry_delay_min"] = 1
181181

182182
time_start = time.time()
183183
with mocked_server_response(
184-
status=429, headers={"Retry-After": "3"}
184+
status=429, headers={"Retry-After": "8"}
185185
) as mock_obj:
186186
with pytest.raises(RequestError) as cm:
187187
with self.connection(extra_params=retry_policy) as conn:
@@ -191,14 +191,14 @@ def test_retry_exponential_backoff(self):
191191
assert isinstance(cm.value.args[1], MaxRetryDurationError)
192192

193193
# With setting delay_min to 1, the expected retry delays should be:
194-
# 3, 3, 3, 3
194+
# 8, 8, 8, 8
195195
# The first 3 retries are allowed, the 4th retry puts the total duration over the limit
196-
# of 10 seconds
196+
# of 30 seconds
197197
assert mock_obj.return_value.getresponse.call_count == 4
198-
assert duration > 6
198+
assert duration > 24
199199

200-
# Should be less than 7, but this is a safe margin for CI/CD slowness
201-
assert duration < 10
200+
# Should be less than 26, but this is a safe margin for CI/CD slowness
201+
assert duration < 30
202202

203203
def test_retry_max_duration_not_exceeded(self):
204204
"""GIVEN the max attempt duration of 10 seconds

tests/e2e/test_driver.py

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,19 +177,22 @@ def test_cloud_fetch(self):
177177
for i in range(len(cf_result)):
178178
assert cf_result[i] == noop_result[i]
179179

180-
def test_execute_async(self):
181-
def isExecuting(operation_state):
182-
return not operation_state or operation_state in [
183-
ttypes.TOperationState.RUNNING_STATE,
184-
ttypes.TOperationState.PENDING_STATE,
185-
]
180+
181+
class TestPySQLAsyncQueriesSuite(PySQLPytestTestCase):
182+
def isExecuting(self, operation_state):
183+
return not operation_state or operation_state in [
184+
ttypes.TOperationState.RUNNING_STATE,
185+
ttypes.TOperationState.PENDING_STATE,
186+
]
187+
188+
def test_execute_async__long_running(self):
186189

187190
long_running_query = "SELECT COUNT(*) FROM RANGE(10000 * 16) x JOIN RANGE(10000) y ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'"
188191
with self.cursor() as cursor:
189192
cursor.execute_async(long_running_query)
190193

191194
## Polling after every POLLING_INTERVAL seconds
192-
while isExecuting(cursor.get_query_state()):
195+
while self.isExecuting(cursor.get_query_state()):
193196
time.sleep(self.POLLING_INTERVAL)
194197
log.info("Polling the status in test_execute_async")
195198

@@ -198,6 +201,55 @@ def isExecuting(operation_state):
198201

199202
assert result[0].asDict() == {"count(1)": 0}
200203

204+
def test_execute_async__small_result(self):
205+
small_result_query = "SELECT 1"
206+
207+
with self.cursor() as cursor:
208+
cursor.execute_async(small_result_query)
209+
210+
## Fake sleep for 5 secs
211+
time.sleep(5)
212+
213+
## Polling after every POLLING_INTERVAL seconds
214+
while self.isExecuting(cursor.get_query_state()):
215+
time.sleep(self.POLLING_INTERVAL)
216+
log.info("Polling the status in test_execute_async")
217+
218+
cursor.get_async_execution_result()
219+
result = cursor.fetchall()
220+
221+
assert result[0].asDict() == {"1": 1}
222+
223+
def test_execute_async__large_result(self):
224+
x_dimension = 1000
225+
y_dimension = 1000
226+
large_result_query = f"""
227+
SELECT
228+
x.id AS x_id,
229+
y.id AS y_id,
230+
FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') AS date
231+
FROM
232+
RANGE({x_dimension}) x
233+
JOIN
234+
RANGE({y_dimension}) y
235+
"""
236+
237+
with self.cursor() as cursor:
238+
cursor.execute_async(large_result_query)
239+
240+
## Fake sleep for 5 secs
241+
time.sleep(5)
242+
243+
## Polling after every POLLING_INTERVAL seconds
244+
while self.isExecuting(cursor.get_query_state()):
245+
time.sleep(self.POLLING_INTERVAL)
246+
log.info("Polling the status in test_execute_async")
247+
248+
cursor.get_async_execution_result()
249+
result = cursor.fetchall()
250+
251+
assert len(result) == x_dimension * y_dimension
252+
201253

202254
# Exclude Retry tests because they require specific setups, and LargeQueries too slow for core
203255
# tests

0 commit comments

Comments
 (0)