Skip to content

Commit 12ee56f

Browse files
madhav-dbvarun-edachali-dbx
authored andcommitted
Enhance Cursor close handling and context manager exception management to prevent server side resource leaks (#554)
* Enhance Cursor close handling and context manager exception management * tests * fmt * Fix Cursor.close() to properly handle CursorAlreadyClosedError * Remove specific test message from Cursor.close() error handling * Improve error handling in connection and cursor context managers to ensure proper closure during exceptions, including KeyboardInterrupt. Add tests for nested cursor management and verify operation closure on server-side errors. * add * add Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 4f8b54c commit 12ee56f

File tree

3 files changed

+238
-6
lines changed

3 files changed

+238
-6
lines changed

src/databricks/sql/client.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,13 @@ def __enter__(self) -> "Connection":
278278
return self
279279

280280
def __exit__(self, exc_type, exc_value, traceback):
281-
self.close()
281+
try:
282+
self.close()
283+
except BaseException as e:
284+
logger.warning(f"Exception during connection close in __exit__: {e}")
285+
if exc_type is None:
286+
raise
287+
return False
282288

283289
def __del__(self):
284290
if self.open:
@@ -401,7 +407,14 @@ def __enter__(self) -> "Cursor":
401407
return self
402408

403409
def __exit__(self, exc_type, exc_value, traceback):
404-
self.close()
410+
try:
411+
logger.debug("Cursor context manager exiting, calling close()")
412+
self.close()
413+
except BaseException as e:
414+
logger.warning(f"Exception during cursor close in __exit__: {e}")
415+
if exc_type is None:
416+
raise
417+
return False
405418

406419
def __iter__(self):
407420
if self.active_result_set:
@@ -1066,7 +1079,21 @@ def cancel(self) -> None:
10661079
def close(self) -> None:
10671080
"""Close cursor"""
10681081
self.open = False
1069-
self.active_op_handle = None
1082+
1083+
# Close active operation handle if it exists
1084+
if self.active_op_handle:
1085+
try:
1086+
self.thrift_backend.close_command(self.active_op_handle)
1087+
except RequestError as e:
1088+
if isinstance(e.args[1], CursorAlreadyClosedError):
1089+
logger.info("Operation was canceled by a prior request")
1090+
else:
1091+
logging.warning(f"Error closing operation handle: {e}")
1092+
except Exception as e:
1093+
logging.warning(f"Error closing operation handle: {e}")
1094+
finally:
1095+
self.active_op_handle = None
1096+
10701097
if self.active_result_set:
10711098
self._close_and_clear_active_result_set()
10721099

tests/e2e/test_driver.py

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050

5151
from tests.e2e.common.uc_volume_tests import PySQLUCVolumeTestSuiteMixin
5252

53-
from databricks.sql.exc import SessionAlreadyClosedError
53+
from databricks.sql.exc import SessionAlreadyClosedError, CursorAlreadyClosedError
5454

5555
log = logging.getLogger(__name__)
5656

@@ -820,7 +820,6 @@ def test_close_connection_closes_cursors(self):
820820
ars = cursor.active_result_set
821821

822822
# We must manually run this check because thrift_backend always forces `has_been_closed_server_side` to True
823-
824823
# Cursor op state should be open before connection is closed
825824
status_request = ttypes.TGetOperationStatusReq(
826825
operationHandle=ars.command_id, getProgressUpdate=False
@@ -847,9 +846,104 @@ def test_closing_a_closed_connection_doesnt_fail(self, caplog):
847846
with self.connection() as conn:
848847
# First .close() call is explicit here
849848
conn.close()
850-
851849
assert "Session appears to have been closed already" in caplog.text
852850

851+
conn = None
852+
try:
853+
with pytest.raises(KeyboardInterrupt):
854+
with self.connection() as c:
855+
conn = c
856+
raise KeyboardInterrupt("Simulated interrupt")
857+
finally:
858+
if conn is not None:
859+
assert not conn.open, "Connection should be closed after KeyboardInterrupt"
860+
861+
def test_cursor_close_properly_closes_operation(self):
862+
"""Test that Cursor.close() properly closes the active operation handle on the server."""
863+
with self.connection() as conn:
864+
cursor = conn.cursor()
865+
try:
866+
cursor.execute("SELECT 1 AS test")
867+
assert cursor.active_op_handle is not None
868+
cursor.close()
869+
assert cursor.active_op_handle is None
870+
assert not cursor.open
871+
finally:
872+
if cursor.open:
873+
cursor.close()
874+
875+
conn = None
876+
cursor = None
877+
try:
878+
with self.connection() as c:
879+
conn = c
880+
with pytest.raises(KeyboardInterrupt):
881+
with conn.cursor() as cur:
882+
cursor = cur
883+
raise KeyboardInterrupt("Simulated interrupt")
884+
finally:
885+
if cursor is not None:
886+
assert not cursor.open, "Cursor should be closed after KeyboardInterrupt"
887+
888+
def test_nested_cursor_context_managers(self):
889+
"""Test that nested cursor context managers properly close operations on the server."""
890+
with self.connection() as conn:
891+
with conn.cursor() as cursor1:
892+
cursor1.execute("SELECT 1 AS test1")
893+
assert cursor1.active_op_handle is not None
894+
895+
with conn.cursor() as cursor2:
896+
cursor2.execute("SELECT 2 AS test2")
897+
assert cursor2.active_op_handle is not None
898+
899+
# After inner context manager exit, cursor2 should be not open
900+
assert not cursor2.open
901+
assert cursor2.active_op_handle is None
902+
903+
# After outer context manager exit, cursor1 should be not open
904+
assert not cursor1.open
905+
assert cursor1.active_op_handle is None
906+
907+
def test_cursor_error_handling(self):
908+
"""Test that cursor close handles errors properly to prevent orphaned operations."""
909+
with self.connection() as conn:
910+
cursor = conn.cursor()
911+
912+
cursor.execute("SELECT 1 AS test")
913+
914+
op_handle = cursor.active_op_handle
915+
916+
assert op_handle is not None
917+
918+
# Manually close the operation to simulate server-side closure
919+
conn.thrift_backend.close_command(op_handle)
920+
921+
cursor.close()
922+
923+
assert not cursor.open
924+
925+
def test_result_set_close(self):
926+
"""Test that ResultSet.close() properly closes operations on the server and handles state correctly."""
927+
with self.connection() as conn:
928+
cursor = conn.cursor()
929+
try:
930+
cursor.execute("SELECT * FROM RANGE(10)")
931+
932+
result_set = cursor.active_result_set
933+
assert result_set is not None
934+
935+
initial_op_state = result_set.op_state
936+
937+
result_set.close()
938+
939+
assert result_set.op_state == result_set.thrift_backend.CLOSED_OP_STATE
940+
assert result_set.op_state != initial_op_state
941+
942+
# Closing the result set again should be a no-op and not raise exceptions
943+
result_set.close()
944+
finally:
945+
cursor.close()
946+
853947

854948
# use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep
855949
# the 429/503 subsuites separate since they execute under different circumstances.

tests/unit/test_client.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import databricks.sql
2121
import databricks.sql.client as client
2222
from databricks.sql import InterfaceError, DatabaseError, Error, NotSupportedError
23+
from databricks.sql.exc import RequestError, CursorAlreadyClosedError
2324
from databricks.sql.types import Row
2425
from databricks.sql.result_set import ResultSet, ThriftResultSet
2526

@@ -540,6 +541,116 @@ def test_access_current_query_id(self):
540541
cursor.close()
541542
self.assertIsNone(cursor.query_id)
542543

544+
def test_cursor_close_handles_exception(self):
545+
"""Test that Cursor.close() handles exceptions from close_command properly."""
546+
mock_backend = Mock()
547+
mock_connection = Mock()
548+
mock_op_handle = Mock()
549+
550+
mock_backend.close_command.side_effect = Exception("Test error")
551+
552+
cursor = client.Cursor(mock_connection, mock_backend)
553+
cursor.active_op_handle = mock_op_handle
554+
555+
cursor.close()
556+
557+
mock_backend.close_command.assert_called_once_with(mock_op_handle)
558+
559+
self.assertIsNone(cursor.active_op_handle)
560+
561+
self.assertFalse(cursor.open)
562+
563+
def test_cursor_context_manager_handles_exit_exception(self):
564+
"""Test that cursor's context manager handles exceptions during __exit__."""
565+
mock_backend = Mock()
566+
mock_connection = Mock()
567+
568+
cursor = client.Cursor(mock_connection, mock_backend)
569+
original_close = cursor.close
570+
cursor.close = Mock(side_effect=Exception("Test error during close"))
571+
572+
try:
573+
with cursor:
574+
raise ValueError("Test error inside context")
575+
except ValueError:
576+
pass
577+
578+
cursor.close.assert_called_once()
579+
580+
def test_connection_close_handles_cursor_close_exception(self):
581+
"""Test that _close handles exceptions from cursor.close() properly."""
582+
cursors_closed = []
583+
584+
def mock_close_with_exception():
585+
cursors_closed.append(1)
586+
raise Exception("Test error during close")
587+
588+
cursor1 = Mock()
589+
cursor1.close = mock_close_with_exception
590+
591+
def mock_close_normal():
592+
cursors_closed.append(2)
593+
594+
cursor2 = Mock()
595+
cursor2.close = mock_close_normal
596+
597+
mock_backend = Mock()
598+
mock_session_handle = Mock()
599+
600+
try:
601+
for cursor in [cursor1, cursor2]:
602+
try:
603+
cursor.close()
604+
except Exception:
605+
pass
606+
607+
mock_backend.close_session(mock_session_handle)
608+
except Exception as e:
609+
self.fail(f"Connection close should handle exceptions: {e}")
610+
611+
self.assertEqual(cursors_closed, [1, 2], "Both cursors should have close called")
612+
613+
def test_resultset_close_handles_cursor_already_closed_error(self):
614+
"""Test that ResultSet.close() handles CursorAlreadyClosedError properly."""
615+
result_set = client.ResultSet.__new__(client.ResultSet)
616+
result_set.thrift_backend = Mock()
617+
result_set.thrift_backend.CLOSED_OP_STATE = 'CLOSED'
618+
result_set.connection = Mock()
619+
result_set.connection.open = True
620+
result_set.op_state = 'RUNNING'
621+
result_set.has_been_closed_server_side = False
622+
result_set.command_id = Mock()
623+
624+
class MockRequestError(Exception):
625+
def __init__(self):
626+
self.args = ["Error message", CursorAlreadyClosedError()]
627+
628+
result_set.thrift_backend.close_command.side_effect = MockRequestError()
629+
630+
original_close = client.ResultSet.close
631+
try:
632+
try:
633+
if (
634+
result_set.op_state != result_set.thrift_backend.CLOSED_OP_STATE
635+
and not result_set.has_been_closed_server_side
636+
and result_set.connection.open
637+
):
638+
result_set.thrift_backend.close_command(result_set.command_id)
639+
except MockRequestError as e:
640+
if isinstance(e.args[1], CursorAlreadyClosedError):
641+
pass
642+
finally:
643+
result_set.has_been_closed_server_side = True
644+
result_set.op_state = result_set.thrift_backend.CLOSED_OP_STATE
645+
646+
result_set.thrift_backend.close_command.assert_called_once_with(result_set.command_id)
647+
648+
assert result_set.has_been_closed_server_side is True
649+
650+
assert result_set.op_state == result_set.thrift_backend.CLOSED_OP_STATE
651+
finally:
652+
pass
653+
543654

544655
if __name__ == "__main__":
545656
suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])

0 commit comments

Comments
 (0)