Skip to content

Commit 4566cb1

Browse files
move SeaResultSetQueueFactory and JsonQueue into separate SEA module
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 7035098 commit 4566cb1

File tree

5 files changed

+74
-67
lines changed

5 files changed

+74
-67
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
from __future__ import annotations
2+
3+
from abc import ABC
4+
from typing import List, Optional, Tuple
5+
6+
from databricks.sql.backend.sea.backend import SeaDatabricksClient
7+
from databricks.sql.backend.sea.models.base import ResultData, ResultManifest
8+
from databricks.sql.utils import ResultSetQueue
9+
10+
11+
class SeaResultSetQueueFactory(ABC):
12+
@staticmethod
13+
def build_queue(
14+
sea_result_data: ResultData,
15+
manifest: Optional[ResultManifest],
16+
statement_id: str,
17+
description: List[Tuple] = [],
18+
max_download_threads: Optional[int] = None,
19+
sea_client: Optional[SeaDatabricksClient] = None,
20+
lz4_compressed: bool = False,
21+
) -> ResultSetQueue:
22+
"""
23+
Factory method to build a result set queue for SEA backend.
24+
25+
Args:
26+
sea_result_data (ResultData): Result data from SEA response
27+
manifest (ResultManifest): Manifest from SEA response
28+
statement_id (str): Statement ID for the query
29+
description (List[List[Any]]): Column descriptions
30+
schema_bytes (bytes): Arrow schema bytes
31+
max_download_threads (int): Maximum number of download threads
32+
ssl_options (SSLOptions): SSL options for downloads
33+
sea_client (SeaDatabricksClient): SEA client for fetching additional links
34+
lz4_compressed (bool): Whether the data is LZ4 compressed
35+
36+
Returns:
37+
ResultSetQueue: The appropriate queue for the result data
38+
"""
39+
40+
if sea_result_data.data is not None:
41+
# INLINE disposition with JSON_ARRAY format
42+
return JsonQueue(sea_result_data.data)
43+
elif sea_result_data.external_links is not None:
44+
# EXTERNAL_LINKS disposition
45+
raise NotImplementedError(
46+
"EXTERNAL_LINKS disposition is not implemented for SEA backend"
47+
)
48+
return JsonQueue([])
49+
50+
51+
class JsonQueue(ResultSetQueue):
52+
"""Queue implementation for JSON_ARRAY format data."""
53+
54+
def __init__(self, data_array):
55+
"""Initialize with JSON array data."""
56+
self.data_array = data_array
57+
self.cur_row_index = 0
58+
self.num_rows = len(data_array)
59+
60+
def next_n_rows(self, num_rows):
61+
"""Get the next n rows from the data array."""
62+
length = min(num_rows, self.num_rows - self.cur_row_index)
63+
slice = self.data_array[self.cur_row_index : self.cur_row_index + length]
64+
self.cur_row_index += length
65+
return slice
66+
67+
def remaining_rows(self):
68+
"""Get all remaining rows from the data array."""
69+
slice = self.data_array[self.cur_row_index :]
70+
self.cur_row_index += len(slice)
71+
return slice

src/databricks/sql/result_set.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@
2424
from databricks.sql.utils import (
2525
ColumnTable,
2626
ColumnQueue,
27-
JsonQueue,
28-
SeaResultSetQueueFactory,
2927
)
28+
from databricks.sql.backend.sea.queue import JsonQueue, SeaResultSetQueueFactory
3029
from databricks.sql.backend.types import CommandId, CommandState, ExecuteResponse
3130

3231
logger = logging.getLogger(__name__)

src/databricks/sql/utils.py

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -111,69 +111,6 @@ def build_queue(
111111
raise AssertionError("Row set type is not valid")
112112

113113

114-
class SeaResultSetQueueFactory(ABC):
115-
@staticmethod
116-
def build_queue(
117-
sea_result_data: ResultData,
118-
manifest: Optional[ResultManifest],
119-
statement_id: str,
120-
description: List[Tuple] = [],
121-
max_download_threads: Optional[int] = None,
122-
sea_client: Optional[SeaDatabricksClient] = None,
123-
lz4_compressed: bool = False,
124-
) -> ResultSetQueue:
125-
"""
126-
Factory method to build a result set queue for SEA backend.
127-
128-
Args:
129-
sea_result_data (ResultData): Result data from SEA response
130-
manifest (ResultManifest): Manifest from SEA response
131-
statement_id (str): Statement ID for the query
132-
description (List[List[Any]]): Column descriptions
133-
schema_bytes (bytes): Arrow schema bytes
134-
max_download_threads (int): Maximum number of download threads
135-
ssl_options (SSLOptions): SSL options for downloads
136-
sea_client (SeaDatabricksClient): SEA client for fetching additional links
137-
lz4_compressed (bool): Whether the data is LZ4 compressed
138-
139-
Returns:
140-
ResultSetQueue: The appropriate queue for the result data
141-
"""
142-
143-
if sea_result_data.data is not None:
144-
# INLINE disposition with JSON_ARRAY format
145-
return JsonQueue(sea_result_data.data)
146-
elif sea_result_data.external_links is not None:
147-
# EXTERNAL_LINKS disposition
148-
raise NotImplementedError(
149-
"EXTERNAL_LINKS disposition is not implemented for SEA backend"
150-
)
151-
return JsonQueue([])
152-
153-
154-
class JsonQueue(ResultSetQueue):
155-
"""Queue implementation for JSON_ARRAY format data."""
156-
157-
def __init__(self, data_array):
158-
"""Initialize with JSON array data."""
159-
self.data_array = data_array
160-
self.cur_row_index = 0
161-
self.num_rows = len(data_array)
162-
163-
def next_n_rows(self, num_rows):
164-
"""Get the next n rows from the data array."""
165-
length = min(num_rows, self.num_rows - self.cur_row_index)
166-
slice = self.data_array[self.cur_row_index : self.cur_row_index + length]
167-
self.cur_row_index += length
168-
return slice
169-
170-
def remaining_rows(self):
171-
"""Get all remaining rows from the data array."""
172-
slice = self.data_array[self.cur_row_index :]
173-
self.cur_row_index += len(slice)
174-
return slice
175-
176-
177114
class ColumnTable:
178115
def __init__(self, column_table, column_names):
179116
self.column_table = column_table

tests/unit/test_sea_queue.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import pytest
88
from unittest.mock import Mock, MagicMock, patch
99

10-
from databricks.sql.utils import JsonQueue, SeaResultSetQueueFactory
10+
from databricks.sql.backend.sea.queue import JsonQueue, SeaResultSetQueueFactory
1111
from databricks.sql.backend.sea.models.base import ResultData, ResultManifest
1212

1313

tests/unit/test_sea_result_set.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from unittest.mock import Mock
1010

1111
from databricks.sql.result_set import SeaResultSet, Row
12-
from databricks.sql.utils import JsonQueue
12+
from databricks.sql.backend.sea.queue import JsonQueue
1313
from databricks.sql.backend.types import CommandId, CommandState
1414
from databricks.sql.backend.sea.models.base import ResultData, ResultManifest
1515

0 commit comments

Comments
 (0)