From ae26fc51c49f30aeaa7103e118ff1d1ddcf3ff9b Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Fri, 4 Jul 2025 09:55:33 +0530 Subject: [PATCH 1/3] add close() for Queue, add ResultSet invocation Signed-off-by: varun-edachali-dbx --- src/databricks/sql/client.py | 1 + src/databricks/sql/utils.py | 13 +++++++++++++ 2 files changed, 14 insertions(+) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index b81416e15..aed785ad2 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1505,6 +1505,7 @@ def close(self) -> None: and self.connection.open ): self.thrift_backend.close_command(self.command_id) + self.results.close() except RequestError as e: if isinstance(e.args[1], CursorAlreadyClosedError): logger.info("Operation was canceled by a prior request") diff --git a/src/databricks/sql/utils.py b/src/databricks/sql/utils.py index 0ce2fa169..233808777 100644 --- a/src/databricks/sql/utils.py +++ b/src/databricks/sql/utils.py @@ -46,6 +46,10 @@ def next_n_rows(self, num_rows: int): def remaining_rows(self): pass + @abstractmethod + def close(self): + pass + class ResultSetQueueFactory(ABC): @staticmethod @@ -157,6 +161,9 @@ def remaining_rows(self): self.cur_row_index += slice.num_rows return slice + def close(self): + return + class ArrowQueue(ResultSetQueue): def __init__( @@ -192,6 +199,9 @@ def remaining_rows(self) -> "pyarrow.Table": self.cur_row_index += slice.num_rows return slice + def close(self): + return + class CloudFetchQueue(ResultSetQueue): def __init__( @@ -341,6 +351,9 @@ def _create_empty_table(self) -> "pyarrow.Table": # Create a 0-row table with just the schema bytes return create_arrow_table_from_arrow_file(self.schema_bytes, self.description) + def close(self): + self.download_manager._shutdown_manager() + ExecuteResponse = namedtuple( "ExecuteResponse", From a5269bcd010ca7d4919c74a90348aa73be0c3f06 Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Fri, 4 Jul 2025 10:01:41 +0530 Subject: [PATCH 2/3] move Queue closure to finally: block to ensure client-side cleanup regardless of server side state Signed-off-by: varun-edachali-dbx --- src/databricks/sql/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index aed785ad2..1e1094059 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1505,11 +1505,11 @@ def close(self) -> None: and self.connection.open ): self.thrift_backend.close_command(self.command_id) - self.results.close() except RequestError as e: if isinstance(e.args[1], CursorAlreadyClosedError): logger.info("Operation was canceled by a prior request") finally: + self.results.close() self.has_been_closed_server_side = True self.op_state = self.thrift_backend.CLOSED_OP_STATE From 316f6dd86f0d2c278e8defaa6d7689c1ef6fb592 Mon Sep 17 00:00:00 2001 From: varun-edachali-dbx Date: Fri, 4 Jul 2025 10:02:06 +0530 Subject: [PATCH 3/3] add unit test assertions to ensure Queue closure Signed-off-by: varun-edachali-dbx --- tests/unit/test_client.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 91e426c64..44c84d790 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -267,33 +267,39 @@ def test_arraysize_buffer_size_passthrough( def test_closing_result_set_with_closed_connection_soft_closes_commands(self): mock_connection = Mock() mock_backend = Mock() + mock_results = Mock() result_set = client.ResultSet( connection=mock_connection, thrift_backend=mock_backend, execute_response=Mock(), ) + result_set.results = mock_results mock_connection.open = False result_set.close() self.assertFalse(mock_backend.close_command.called) self.assertTrue(result_set.has_been_closed_server_side) + mock_results.close.assert_called_once() def test_closing_result_set_hard_closes_commands(self): mock_results_response = Mock() mock_results_response.has_been_closed_server_side = False mock_connection = Mock() mock_thrift_backend = Mock() + mock_results = Mock() mock_connection.open = True result_set = client.ResultSet( mock_connection, mock_results_response, mock_thrift_backend ) + result_set.results = mock_results result_set.close() mock_thrift_backend.close_command.assert_called_once_with( mock_results_response.command_handle ) + mock_results.close.assert_called_once() @patch("%s.client.ResultSet" % PACKAGE_NAME) def test_executing_multiple_commands_uses_the_most_recent_command(