From 0da893fdd16cc4278b0dc7cfa37a62a041c873ef Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Tue, 1 Jul 2025 12:02:05 +0530 Subject: [PATCH 1/4] batch size configurable, flush interval Signed-off-by: Sai Shree Pradhan --- src/databricks/sql/client.py | 2 + .../sql/telemetry/telemetry_client.py | 45 ++++++++++++++++++- tests/unit/test_telemetry.py | 12 +++-- 3 files changed, 50 insertions(+), 9 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index c137306a..36b13554 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -250,6 +250,7 @@ def read(self) -> Optional[OAuthToken]: self.telemetry_enabled = ( self.client_telemetry_enabled and self.server_telemetry_enabled ) + self.telemetry_batch_size = kwargs.get("telemetry_batch_size") user_agent_entry = kwargs.get("user_agent_entry") if user_agent_entry is None: @@ -311,6 +312,7 @@ def read(self) -> Optional[OAuthToken]: session_id_hex=self.get_session_id_hex(), auth_provider=auth_provider, host_url=self.host, + batch_size=self.telemetry_batch_size, ) self._telemetry_client = TelemetryClientFactory.get_telemetry_client( diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index 10aa04ef..4f090864 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -150,6 +150,9 @@ class TelemetryClient(BaseTelemetryClient): TELEMETRY_AUTHENTICATED_PATH = "/telemetry-ext" TELEMETRY_UNAUTHENTICATED_PATH = "/telemetry-unauth" + DEFAULT_BATCH_SIZE = 100 + DEFAULT_FLUSH_INTERVAL_SECONDS = 90 + def __init__( self, telemetry_enabled, @@ -157,10 +160,12 @@ def __init__( auth_provider, host_url, executor, + batch_size=None, ): logger.debug("Initializing TelemetryClient for connection: %s", session_id_hex) self._telemetry_enabled = telemetry_enabled - self._batch_size = 10 # TODO: Decide on batch size + self._batch_size = batch_size if batch_size is not None else self.DEFAULT_BATCH_SIZE + self._flush_interval_seconds = self.DEFAULT_FLUSH_INTERVAL_SECONDS self._session_id_hex = session_id_hex self._auth_provider = auth_provider self._user_agent = None @@ -169,9 +174,41 @@ def __init__( self._driver_connection_params = None self._host_url = host_url self._executor = executor + self._flush_timer = None + + # Start the periodic flush timer + self._start_flush_timer() + + def _start_flush_timer(self): + """Start the periodic flush timer""" + + self._flush_timer = threading.Timer( + self._flush_interval_seconds, + self._periodic_flush + ) + self._flush_timer.daemon = True # Don't prevent program exit + self._flush_timer.start() + logger.debug("Started flush timer for connection %s (interval: %d seconds)", + self._session_id_hex, self._flush_interval_seconds) + + def _periodic_flush(self): + """Periodic flush callback - flushes events and reschedules the timer""" + + logger.debug("Performing periodic flush for connection %s", self._session_id_hex) + self._flush() + # Reschedule the next flush + self._start_flush_timer() + + def _stop_flush_timer(self): + """Stop the periodic flush timer""" + if self._flush_timer is not None: + self._flush_timer.cancel() + self._flush_timer = None + logger.debug("Stopped flush timer for connection %s", self._session_id_hex) def _export_event(self, event): """Add an event to the batch queue and flush if batch is full""" + logger.debug("Exporting event for connection %s", self._session_id_hex) with self._lock: self._events_batch.append(event) @@ -183,6 +220,7 @@ def _export_event(self, event): def _flush(self): """Flush the current batch of events to the server""" + with self._lock: events_to_flush = self._events_batch.copy() self._events_batch = [] @@ -300,8 +338,9 @@ def export_failure_log(self, error_name, error_message): logger.debug("Failed to export failure log: %s", e) def close(self): - """Flush remaining events before closing""" + """Flush remaining events and stop timer before closing""" logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex) + self._stop_flush_timer() self._flush() @@ -364,6 +403,7 @@ def initialize_telemetry_client( session_id_hex, auth_provider, host_url, + batch_size=None, ): """Initialize a telemetry client for a specific connection if telemetry is enabled""" try: @@ -385,6 +425,7 @@ def initialize_telemetry_client( auth_provider=auth_provider, host_url=host_url, executor=TelemetryClientFactory._executor, + batch_size=batch_size, ) else: TelemetryClientFactory._clients[ diff --git a/tests/unit/test_telemetry.py b/tests/unit/test_telemetry.py index 699480bb..b83ac4b4 100644 --- a/tests/unit/test_telemetry.py +++ b/tests/unit/test_telemetry.py @@ -181,22 +181,20 @@ def test_export_failure_log( client._export_event.assert_called_once_with(mock_frontend_log.return_value) - def test_export_event(self, telemetry_client_setup): - """Test exporting an event.""" + def test_batch_size_flush(self, telemetry_client_setup): + """Test batch size flush.""" client = telemetry_client_setup["client"] client._flush = MagicMock() - for i in range(5): + for i in range(TelemetryClient._batch_size-1): client._export_event(f"event-{i}") client._flush.assert_not_called() - assert len(client._events_batch) == 5 + assert len(client._events_batch) == TelemetryClient._batch_size-1 - for i in range(5, 10): - client._export_event(f"event-{i}") + client._export_event(f"event-{TelemetryClient._batch_size - 1}") client._flush.assert_called_once() - assert len(client._events_batch) == 10 @patch("requests.post") def test_send_telemetry_authenticated(self, mock_post, telemetry_client_setup): From ef14caae52689da3bd3da85ae12379f2fa28616d Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Tue, 1 Jul 2025 12:09:26 +0530 Subject: [PATCH 2/4] formatting Signed-off-by: Sai Shree Pradhan --- .../sql/telemetry/telemetry_client.py | 28 +++++++++++-------- tests/unit/test_telemetry.py | 8 ++++-- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index 4f090864..279a2792 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -164,7 +164,9 @@ def __init__( ): logger.debug("Initializing TelemetryClient for connection: %s", session_id_hex) self._telemetry_enabled = telemetry_enabled - self._batch_size = batch_size if batch_size is not None else self.DEFAULT_BATCH_SIZE + self._batch_size = ( + batch_size if batch_size is not None else self.DEFAULT_BATCH_SIZE + ) self._flush_interval_seconds = self.DEFAULT_FLUSH_INTERVAL_SECONDS self._session_id_hex = session_id_hex self._auth_provider = auth_provider @@ -175,26 +177,30 @@ def __init__( self._host_url = host_url self._executor = executor self._flush_timer = None - + # Start the periodic flush timer self._start_flush_timer() def _start_flush_timer(self): """Start the periodic flush timer""" - + self._flush_timer = threading.Timer( - self._flush_interval_seconds, - self._periodic_flush + self._flush_interval_seconds, self._periodic_flush ) self._flush_timer.daemon = True # Don't prevent program exit self._flush_timer.start() - logger.debug("Started flush timer for connection %s (interval: %d seconds)", - self._session_id_hex, self._flush_interval_seconds) + logger.debug( + "Started flush timer for connection %s (interval: %d seconds)", + self._session_id_hex, + self._flush_interval_seconds, + ) def _periodic_flush(self): """Periodic flush callback - flushes events and reschedules the timer""" - - logger.debug("Performing periodic flush for connection %s", self._session_id_hex) + + logger.debug( + "Performing periodic flush for connection %s", self._session_id_hex + ) self._flush() # Reschedule the next flush self._start_flush_timer() @@ -208,7 +214,7 @@ def _stop_flush_timer(self): def _export_event(self, event): """Add an event to the batch queue and flush if batch is full""" - + logger.debug("Exporting event for connection %s", self._session_id_hex) with self._lock: self._events_batch.append(event) @@ -220,7 +226,7 @@ def _export_event(self, event): def _flush(self): """Flush the current batch of events to the server""" - + with self._lock: events_to_flush = self._events_batch.copy() self._events_batch = [] diff --git a/tests/unit/test_telemetry.py b/tests/unit/test_telemetry.py index b83ac4b4..129c8ebd 100644 --- a/tests/unit/test_telemetry.py +++ b/tests/unit/test_telemetry.py @@ -186,13 +186,15 @@ def test_batch_size_flush(self, telemetry_client_setup): client = telemetry_client_setup["client"] client._flush = MagicMock() - for i in range(TelemetryClient._batch_size-1): + batch_size = client._batch_size + + for i in range(batch_size - 1): client._export_event(f"event-{i}") client._flush.assert_not_called() - assert len(client._events_batch) == TelemetryClient._batch_size-1 + assert len(client._events_batch) == batch_size - 1 - client._export_event(f"event-{TelemetryClient._batch_size - 1}") + client._export_event(f"event-{batch_size - 1}") client._flush.assert_called_once() From 9ac4bc769134933b581388817fb9fa96a1347d82 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Tue, 1 Jul 2025 14:30:25 +0530 Subject: [PATCH 3/4] changed flush timer to background thread approach Signed-off-by: Sai Shree Pradhan --- .../sql/telemetry/telemetry_client.py | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index 279a2792..aa1da8d6 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -176,41 +176,41 @@ def __init__( self._driver_connection_params = None self._host_url = host_url self._executor = executor - self._flush_timer = None - # Start the periodic flush timer - self._start_flush_timer() + # Background thread for periodic flushing + self._flush_stop_event = threading.Event() + self._flush_thread = None - def _start_flush_timer(self): - """Start the periodic flush timer""" + # Start the periodic flush thread + self._start_flush_thread() - self._flush_timer = threading.Timer( - self._flush_interval_seconds, self._periodic_flush - ) - self._flush_timer.daemon = True # Don't prevent program exit - self._flush_timer.start() + def _start_flush_thread(self): + """Start the background thread for periodic flushing""" + self._flush_thread = threading.Thread(target=self._flush_worker, daemon=True) + self._flush_thread.start() logger.debug( - "Started flush timer for connection %s (interval: %d seconds)", + "Started flush thread for connection %s (interval: %d seconds)", self._session_id_hex, self._flush_interval_seconds, ) - def _periodic_flush(self): - """Periodic flush callback - flushes events and reschedules the timer""" - - logger.debug( - "Performing periodic flush for connection %s", self._session_id_hex - ) - self._flush() - # Reschedule the next flush - self._start_flush_timer() + def _flush_worker(self): + """Background worker thread for periodic flushing""" + while not self._flush_stop_event.wait(self._flush_interval_seconds): + logger.debug( + "Performing periodic flush for connection %s", self._session_id_hex + ) + self._flush() - def _stop_flush_timer(self): - """Stop the periodic flush timer""" - if self._flush_timer is not None: - self._flush_timer.cancel() - self._flush_timer = None - logger.debug("Stopped flush timer for connection %s", self._session_id_hex) + def _stop_flush_thread(self): + """Stop the background flush thread""" + if self._flush_thread is not None: + self._flush_stop_event.set() + self._flush_thread.join( + timeout=1.0 + ) # Wait up to 1 second for graceful shutdown + self._flush_thread = None + logger.debug("Stopped flush thread for connection %s", self._session_id_hex) def _export_event(self, event): """Add an event to the batch queue and flush if batch is full""" @@ -346,7 +346,7 @@ def export_failure_log(self, error_name, error_message): def close(self): """Flush remaining events and stop timer before closing""" logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex) - self._stop_flush_timer() + self._stop_flush_thread() self._flush() From e8ccf56cda64ba659bf2050303c97f316e19ca31 Mon Sep 17 00:00:00 2001 From: Sai Shree Pradhan Date: Wed, 2 Jul 2025 16:50:59 +0530 Subject: [PATCH 4/4] changed for timer thread per telemetry client to one timer thread in TelemetryClientFactory Signed-off-by: Sai Shree Pradhan --- .../sql/telemetry/telemetry_client.py | 98 ++++++++++--------- tests/unit/test_telemetry.py | 12 +-- 2 files changed, 58 insertions(+), 52 deletions(-) diff --git a/src/databricks/sql/telemetry/telemetry_client.py b/src/databricks/sql/telemetry/telemetry_client.py index aa1da8d6..a90a4bd6 100644 --- a/src/databricks/sql/telemetry/telemetry_client.py +++ b/src/databricks/sql/telemetry/telemetry_client.py @@ -116,6 +116,10 @@ def export_failure_log(self, error_name, error_message): def close(self): raise NotImplementedError("Subclasses must implement close") + @abstractmethod + def flush(self): + raise NotImplementedError("Subclasses must implement flush") + class NoopTelemetryClient(BaseTelemetryClient): """ @@ -139,6 +143,9 @@ def export_failure_log(self, error_name, error_message): def close(self): pass + def flush(self): + pass + class TelemetryClient(BaseTelemetryClient): """ @@ -151,7 +158,6 @@ class TelemetryClient(BaseTelemetryClient): TELEMETRY_UNAUTHENTICATED_PATH = "/telemetry-unauth" DEFAULT_BATCH_SIZE = 100 - DEFAULT_FLUSH_INTERVAL_SECONDS = 90 def __init__( self, @@ -167,7 +173,6 @@ def __init__( self._batch_size = ( batch_size if batch_size is not None else self.DEFAULT_BATCH_SIZE ) - self._flush_interval_seconds = self.DEFAULT_FLUSH_INTERVAL_SECONDS self._session_id_hex = session_id_hex self._auth_provider = auth_provider self._user_agent = None @@ -177,41 +182,6 @@ def __init__( self._host_url = host_url self._executor = executor - # Background thread for periodic flushing - self._flush_stop_event = threading.Event() - self._flush_thread = None - - # Start the periodic flush thread - self._start_flush_thread() - - def _start_flush_thread(self): - """Start the background thread for periodic flushing""" - self._flush_thread = threading.Thread(target=self._flush_worker, daemon=True) - self._flush_thread.start() - logger.debug( - "Started flush thread for connection %s (interval: %d seconds)", - self._session_id_hex, - self._flush_interval_seconds, - ) - - def _flush_worker(self): - """Background worker thread for periodic flushing""" - while not self._flush_stop_event.wait(self._flush_interval_seconds): - logger.debug( - "Performing periodic flush for connection %s", self._session_id_hex - ) - self._flush() - - def _stop_flush_thread(self): - """Stop the background flush thread""" - if self._flush_thread is not None: - self._flush_stop_event.set() - self._flush_thread.join( - timeout=1.0 - ) # Wait up to 1 second for graceful shutdown - self._flush_thread = None - logger.debug("Stopped flush thread for connection %s", self._session_id_hex) - def _export_event(self, event): """Add an event to the batch queue and flush if batch is full""" @@ -222,9 +192,9 @@ def _export_event(self, event): logger.debug( "Batch size limit reached (%s), flushing events", self._batch_size ) - self._flush() + self.flush() - def _flush(self): + def flush(self): """Flush the current batch of events to the server""" with self._lock: @@ -344,16 +314,15 @@ def export_failure_log(self, error_name, error_message): logger.debug("Failed to export failure log: %s", e) def close(self): - """Flush remaining events and stop timer before closing""" + """Flush remaining events before closing""" logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex) - self._stop_flush_thread() - self._flush() + self.flush() class TelemetryClientFactory: """ Static factory class for creating and managing telemetry clients. - It uses a thread pool to handle asynchronous operations. + It uses a thread pool to handle asynchronous operations and a single flush thread for all clients. """ _clients: Dict[ @@ -366,6 +335,11 @@ class TelemetryClientFactory: _original_excepthook = None _excepthook_installed = False + # Shared flush thread for all clients + _flush_thread = None + _flush_event = threading.Event() + _flush_interval_seconds = 90 + @classmethod def _initialize(cls): """Initialize the factory if not already initialized""" @@ -376,11 +350,42 @@ def _initialize(cls): max_workers=10 ) # Thread pool for async operations TODO: Decide on max workers cls._install_exception_hook() + cls._start_flush_thread() cls._initialized = True logger.debug( - "TelemetryClientFactory initialized with thread pool (max_workers=10)" + "TelemetryClientFactory initialized with thread pool (max_workers=10) and shared flush thread" ) + @classmethod + def _start_flush_thread(cls): + """Start the shared background thread for periodic flushing of all clients""" + cls._flush_event.clear() + cls._flush_thread = threading.Thread(target=cls._flush_worker, daemon=True) + cls._flush_thread.start() + + @classmethod + def _flush_worker(cls): + """Background worker thread for periodic flushing of all clients""" + while not cls._flush_event.wait(cls._flush_interval_seconds): + logger.debug("Performing periodic flush for all telemetry clients") + + with cls._lock: + clients_to_flush = list(cls._clients.values()) + + for client in clients_to_flush: + try: + client.flush() + except Exception as e: + logger.debug("Failed to flush telemetry client: %s", e) + + @classmethod + def _stop_flush_thread(cls): + """Stop the shared background flush thread""" + if cls._flush_thread is not None: + cls._flush_event.set() + cls._flush_thread.join(timeout=1.0) + cls._flush_thread = None + @classmethod def _install_exception_hook(cls): """Install global exception handler for unhandled exceptions""" @@ -473,11 +478,12 @@ def close(session_id_hex): ) telemetry_client.close() - # Shutdown executor if no more clients + # Shutdown executor and flush thread if no more clients if not TelemetryClientFactory._clients and TelemetryClientFactory._executor: logger.debug( - "No more telemetry clients, shutting down thread pool executor" + "No more telemetry clients, shutting down thread pool executor and flush thread" ) + TelemetryClientFactory._stop_flush_thread() TelemetryClientFactory._executor.shutdown(wait=True) TelemetryClientFactory._executor = None TelemetryClientFactory._initialized = False diff --git a/tests/unit/test_telemetry.py b/tests/unit/test_telemetry.py index 129c8ebd..418d3927 100644 --- a/tests/unit/test_telemetry.py +++ b/tests/unit/test_telemetry.py @@ -184,19 +184,19 @@ def test_export_failure_log( def test_batch_size_flush(self, telemetry_client_setup): """Test batch size flush.""" client = telemetry_client_setup["client"] - client._flush = MagicMock() + client.flush = MagicMock() batch_size = client._batch_size for i in range(batch_size - 1): client._export_event(f"event-{i}") - client._flush.assert_not_called() + client.flush.assert_not_called() assert len(client._events_batch) == batch_size - 1 client._export_event(f"event-{batch_size - 1}") - client._flush.assert_called_once() + client.flush.assert_called_once() @patch("requests.post") def test_send_telemetry_authenticated(self, mock_post, telemetry_client_setup): @@ -251,7 +251,7 @@ def test_flush(self, telemetry_client_setup): client._events_batch = ["event1", "event2"] client._send_telemetry = MagicMock() - client._flush() + client.flush() client._send_telemetry.assert_called_once_with(["event1", "event2"]) assert client._events_batch == [] @@ -259,11 +259,11 @@ def test_flush(self, telemetry_client_setup): def test_close(self, telemetry_client_setup): """Test closing the client.""" client = telemetry_client_setup["client"] - client._flush = MagicMock() + client.flush = MagicMock() client.close() - client._flush.assert_called_once() + client.flush.assert_called_once() @patch("requests.post") def test_telemetry_request_callback_success(self, mock_post, telemetry_client_setup):