Skip to content

Merge SeaHttpClient into THttpClient #603

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a60c387
remove SeaHttpClient and integrate with THttpClient
varun-edachali-dbx Jun 17, 2025
a4db4e8
introduce unit tests for added methods in `THttpClient`
varun-edachali-dbx Jun 18, 2025
02e5421
add more unit tests
varun-edachali-dbx Jun 18, 2025
f411cf6
increase logging
varun-edachali-dbx Jun 19, 2025
ad17007
add minimal retry func
varun-edachali-dbx Jun 19, 2025
e00e39b
allow passage of MaxRetryError
varun-edachali-dbx Jun 19, 2025
b44d3f1
more retry stuff (to review)
varun-edachali-dbx Jun 19, 2025
ea0c060
Revert "more retry stuff (to review)"
varun-edachali-dbx Jun 20, 2025
46d4850
Revert "allow passage of MaxRetryError"
varun-edachali-dbx Jun 20, 2025
0af5a75
Revert "add minimal retry func"
varun-edachali-dbx Jun 20, 2025
947fcbf
decode body bytes in logging
varun-edachali-dbx Jun 20, 2025
c200ad0
preliminary reetries
varun-edachali-dbx Jun 20, 2025
eb2dd79
simplify error handling
varun-edachali-dbx Jun 20, 2025
8efea35
Revert "simplify error handling"
varun-edachali-dbx Jun 20, 2025
6bf5995
Revert "preliminary reetries"
varun-edachali-dbx Jun 20, 2025
0b2ef6c
integrate simple retries in Sea client
varun-edachali-dbx Jun 20, 2025
00fc119
some unit tests
varun-edachali-dbx Jun 24, 2025
a9f2409
Revert "some unit tests"
varun-edachali-dbx Jun 24, 2025
f3bc8a0
potential working code dump
varun-edachali-dbx Jun 25, 2025
e32c858
Revert "potential working code dump"
varun-edachali-dbx Jun 25, 2025
8d8f730
nearly working SEA retries
varun-edachali-dbx Jun 25, 2025
8576f31
allow DELETE in retries
varun-edachali-dbx Jun 25, 2025
92a9260
allow GET (untested)
varun-edachali-dbx Jun 25, 2025
329c9af
Merge branch 'ext-links-sea' into merge-http-client
varun-edachali-dbx Jun 25, 2025
bb5cf17
minor fixes
varun-edachali-dbx Jun 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/databricks/sql/auth/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def __init__(
total=_attempts_remaining,
respect_retry_after_header=True,
backoff_factor=self.delay_min,
allowed_methods=["POST"],
allowed_methods=["POST", "DELETE", "GET"],
status_forcelist=[429, 503, *self.force_dangerous_codes],
)

Expand Down
108 changes: 106 additions & 2 deletions src/databricks/sql/auth/thrift_http_client.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import base64
import json
import logging
import urllib.parse
from typing import Dict, Union, Optional
from typing import Dict, Union, Optional, Any

import six
import thrift
import thrift.transport.THttpClient

import ssl
import warnings
from http.client import HTTPResponse
from io import BytesIO

import urllib3
from urllib3 import HTTPConnectionPool, HTTPSConnectionPool, ProxyManager
from urllib3.util import make_headers
from databricks.sql.auth.retry import CommandType, DatabricksRetryPolicy
Expand Down Expand Up @@ -222,3 +224,105 @@ def set_retry_command_type(self, value: CommandType):
logger.warning(
"DatabricksRetryPolicy is currently bypassed. The CommandType cannot be set."
)

def make_rest_request(
self,
method: str,
endpoint_path: str,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
"""
Make a REST API request using the existing connection pool.

Args:
method (str): HTTP method (GET, POST, DELETE, etc.)
endpoint_path (str): API endpoint path (e.g., "sessions" or "statements/123")
data (dict, optional): Request payload data
params (dict, optional): Query parameters
headers (dict, optional): Additional headers

Returns:
dict: Response data parsed from JSON

Raises:
RequestError: If the request fails
"""
# Ensure the transport is open
if not self.isOpen():
self.open()

# Prepare headers
request_headers = {
"Content-Type": "application/json",
}

# Add authentication headers
auth_headers: Dict[str, str] = {}
self.__auth_provider.add_headers(auth_headers)
request_headers.update(auth_headers)

# Add custom headers if provided
if headers:
request_headers.update(headers)

# Prepare request body
body = json.dumps(data).encode("utf-8") if data else None

# Build query string for params
query_string = ""
if params:
query_string = "?" + urllib.parse.urlencode(params)

# Determine full path
full_path = (
self.path.rstrip("/") + "/" + endpoint_path.lstrip("/") + query_string
)

# Log request details (debug level)
logger.debug(f"Making {method} request to {full_path}")

# Make request using the connection pool - let urllib3 exceptions propagate
logger.debug(f"making request to {full_path}")
logger.debug(f"\trequest headers: {request_headers}")
logger.debug(f"\trequest body: {body.decode('utf-8') if body else None}")
logger.debug(f"\trequest params: {params}")
logger.debug(f"\trequest full path: {full_path}")
self.__resp = self.__pool.request(
method,
url=full_path,
body=body,
headers=request_headers,
preload_content=False,
timeout=self.__timeout,
retries=self.retry_policy,
)
logger.debug(f"Response: {self.__resp}")

# Store response status and headers
if self.__resp is not None:
self.code = self.__resp.status
self.message = self.__resp.reason
self.headers = self.__resp.headers

# Log response status
logger.debug(f"Response status: {self.code}, message: {self.message}")

# Read and parse response data
# Note: urllib3's HTTPResponse has a data attribute, but it's not in the type stubs
response_data = getattr(self.__resp, "data", None)

# Parse JSON response if there is content
if response_data:
result = json.loads(response_data.decode("utf-8"))

# Log response content (truncated for large responses)
content_str = json.dumps(result)
logger.debug(f"Response content: {content_str}")

return result

return {}
else:
raise ValueError("No response received from server")
10 changes: 7 additions & 3 deletions src/databricks/sql/backend/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
cast,
)

from databricks.sql.backend.sea.backend import SeaDatabricksClient
from databricks.sql.backend.types import ExecuteResponse
from databricks.sql.backend.types import ExecuteResponse, CommandId
from databricks.sql.backend.sea.models.base import ResultData

from databricks.sql.result_set import ResultSet, SeaResultSet
if TYPE_CHECKING:
from databricks.sql.backend.sea.backend import SeaDatabricksClient
from databricks.sql.result_set import ResultSet, SeaResultSet

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,6 +72,8 @@ def _filter_sea_result_set(
from databricks.sql.result_set import SeaResultSet

# Create a new SeaResultSet with the filtered data
from databricks.sql.backend.sea.backend import SeaDatabricksClient

filtered_result_set = SeaResultSet(
connection=result_set.connection,
execute_response=execute_response,
Expand Down
Loading
Loading