From 64bb29653aee6c7678db0f1b41407a958124e6e3 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Mon, 25 Nov 2024 17:49:40 +0100 Subject: [PATCH 01/12] fix: streaming connection is not stopping on CTRL-C --- nominal/core/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nominal/core/stream.py b/nominal/core/stream.py index 050327db..97245762 100644 --- a/nominal/core/stream.py +++ b/nominal/core/stream.py @@ -128,7 +128,7 @@ def process_future(fut: concurrent.futures.Future) -> None: # type: ignore[type def _process_timeout_batches(self) -> None: while self._running: - time.sleep(self.max_wait_sec / 10) + time.sleep(1) with self._batch_lock: if self._batch and (time.time() - self._last_batch_time) >= self.max_wait_sec: self.flush() From 3867d9a24a46993a4ae92d8292d04d4cc3cba7b0 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Tue, 26 Nov 2024 11:25:59 +0100 Subject: [PATCH 02/12] remove nominal from method and class name --- nominal/core/connection.py | 24 +++++++++++++++++++----- nominal/core/stream.py | 9 +++++++-- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/nominal/core/connection.py b/nominal/core/connection.py index c2fe39bd..f8dc3ffb 100644 --- a/nominal/core/connection.py +++ b/nominal/core/connection.py @@ -18,7 +18,7 @@ from nominal.core._clientsbunch import HasAuthHeader from nominal.core._utils import HasRid from nominal.core.channel import Channel -from nominal.core.stream import BatchItem, NominalWriteStream +from nominal.core.stream import BatchItem, NominalWriteStream, WriteStream from nominal.ts import _SecondsNanos @@ -138,7 +138,21 @@ def get_channel(self, name: str, tags: dict[str, str] | None = None) -> Channel: return Channel._from_conjure_logicalseries_api(self._clients, series) def get_nominal_write_stream(self, batch_size: int = 10, max_wait_sec: int = 5) -> NominalWriteStream: - """Nominal Stream to write non-blocking messages to a datasource. + """get_nominal_write_stream is deprecated and will be removed in a future version, + use get_write_stream instead. + """ + import warnings + + warnings.warn( + "get_nominal_write_stream is deprecated and will be removed in a future version," + "use get_write_stream instead.", + UserWarning, + stacklevel=2, + ) + return self.get_write_stream(batch_size, max_wait_sec) + + def get_write_stream(self, batch_size: int = 10, max_wait_sec: int = 5) -> WriteStream: + """Stream to write non-blocking messages to a datasource. Args: ---- @@ -149,7 +163,7 @@ def get_nominal_write_stream(self, batch_size: int = 10, max_wait_sec: int = 5) -------- Standard Usage: ```py - with connection.get_nominal_write_stream() as stream: + with connection.get_write_stream() as stream: stream.enqueue("my_channel_name", "2021-01-01T00:00:00Z", 42.0) stream.enqueue("my_channel_name2", "2021-01-01T00:00:01Z", 43.0, {"tag1": "value1"}) ... @@ -157,7 +171,7 @@ def get_nominal_write_stream(self, batch_size: int = 10, max_wait_sec: int = 5) Without a context manager: ```py - stream = connection.get_nominal_write_stream() + stream = connection.get_write_stream() stream.enqueue("my_channel_name", "2021-01-01T00:00:00Z", 42.0) stream.enqueue("my_channel_name2", "2021-01-01T00:00:01Z", 43.0, {"tag1": "value1"}) ... @@ -166,7 +180,7 @@ def get_nominal_write_stream(self, batch_size: int = 10, max_wait_sec: int = 5) """ if self._nominal_data_source_rid is not None: - return NominalWriteStream(self._process_batch, batch_size, max_wait_sec) + return WriteStream(self._process_batch, batch_size, max_wait_sec) else: raise ValueError("Writing not implemented for this connection type") diff --git a/nominal/core/stream.py b/nominal/core/stream.py index 97245762..de66c357 100644 --- a/nominal/core/stream.py +++ b/nominal/core/stream.py @@ -9,6 +9,8 @@ from types import TracebackType from typing import Callable, Dict, Sequence, Type +from typing_extensions import TypeAlias + from nominal.ts import IntegralNanosecondsUTC logger = logging.getLogger(__name__) @@ -22,7 +24,10 @@ class BatchItem: tags: Dict[str, str] | None = None -class NominalWriteStream: +NominalWriteStream: TypeAlias = "WriteStream" + + +class WriteStream: def __init__( self, process_batch: Callable[[Sequence[BatchItem]], None], @@ -43,7 +48,7 @@ def __init__( self._timeout_thread = threading.Thread(target=self._process_timeout_batches, daemon=True) self._timeout_thread.start() - def __enter__(self) -> "NominalWriteStream": + def __enter__(self) -> "WriteStream": """Create the stream as a context manager.""" return self From df68c3387ee65d1a877b1f941342ee990dc3e3b5 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Tue, 26 Nov 2024 11:34:34 +0100 Subject: [PATCH 03/12] don't do significant work in an initializer --- nominal/core/connection.py | 4 +++- nominal/core/stream.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/nominal/core/connection.py b/nominal/core/connection.py index f8dc3ffb..295db5c2 100644 --- a/nominal/core/connection.py +++ b/nominal/core/connection.py @@ -180,7 +180,9 @@ def get_write_stream(self, batch_size: int = 10, max_wait_sec: int = 5) -> Write """ if self._nominal_data_source_rid is not None: - return WriteStream(self._process_batch, batch_size, max_wait_sec) + write_stream = WriteStream(self._process_batch, batch_size, max_wait_sec) + write_stream.start() + return write_stream else: raise ValueError("Writing not implemented for this connection type") diff --git a/nominal/core/stream.py b/nominal/core/stream.py index de66c357..1abe69b0 100644 --- a/nominal/core/stream.py +++ b/nominal/core/stream.py @@ -39,12 +39,14 @@ def __init__( self._process_batch = process_batch self.batch_size = batch_size self.max_wait_sec = max_wait_sec - self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) + self.max_workers = max_workers self._batch: list[BatchItem] = [] self._batch_lock = threading.Lock() self._last_batch_time = time.time() self._running = True + def start(self) -> None: + self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) self._timeout_thread = threading.Thread(target=self._process_timeout_batches, daemon=True) self._timeout_thread.start() From 4e667eebce702a9bcbdbe309fb35591a51e94a16 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Tue, 26 Nov 2024 14:34:58 +0100 Subject: [PATCH 04/12] let timeout thread wait interuptable --- nominal/core/stream.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/nominal/core/stream.py b/nominal/core/stream.py index 1abe69b0..a9ec241b 100644 --- a/nominal/core/stream.py +++ b/nominal/core/stream.py @@ -44,6 +44,7 @@ def __init__( self._batch_lock = threading.Lock() self._last_batch_time = time.time() self._running = True + self._max_wait_event = threading.Event() def start(self) -> None: self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) @@ -108,6 +109,7 @@ def flush(self, wait: bool = False, timeout: float | None = None) -> None: """ if not self._batch: logger.debug("Not flushing... no enqueued batch") + self._last_batch_time = time.time() return def process_future(fut: concurrent.futures.Future) -> None: # type: ignore[type-arg] @@ -135,10 +137,16 @@ def process_future(fut: concurrent.futures.Future) -> None: # type: ignore[type def _process_timeout_batches(self) -> None: while self._running: - time.sleep(1) + now = time.time() + last_batch_time = self._last_batch_time + timeout = max(self.max_wait_sec - (now - last_batch_time), 0) + self._max_wait_event.wait(timeout=timeout) + with self._batch_lock: - if self._batch and (time.time() - self._last_batch_time) >= self.max_wait_sec: - self.flush() + # check if flush has been called in the mean time + if self._last_batch_time > last_batch_time: + continue + self.flush() def close(self, wait: bool = True) -> None: """Close the Nominal Stream. @@ -147,6 +155,8 @@ def close(self, wait: bool = True) -> None: Flush any remaining batches """ self._running = False + + self._max_wait_event.set() self._timeout_thread.join() with self._batch_lock: From a441f5df0f2f7771b391e07147bc4aff27230aee Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Tue, 26 Nov 2024 16:58:53 +0100 Subject: [PATCH 05/12] add backpressure --- nominal/core/stream.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nominal/core/stream.py b/nominal/core/stream.py index a9ec241b..e579045a 100644 --- a/nominal/core/stream.py +++ b/nominal/core/stream.py @@ -45,6 +45,7 @@ def __init__( self._last_batch_time = time.time() self._running = True self._max_wait_event = threading.Event() + self._pending_jobs = threading.BoundedSemaphore(3) def start(self) -> None: self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) @@ -112,8 +113,11 @@ def flush(self, wait: bool = False, timeout: float | None = None) -> None: self._last_batch_time = time.time() return + self._pending_jobs.acquire() + def process_future(fut: concurrent.futures.Future) -> None: # type: ignore[type-arg] """Callback to print errors to the console if a batch upload fails.""" + self._pending_jobs.release() maybe_ex = fut.exception() if maybe_ex is not None: logger.error("Batched upload task failed with exception", exc_info=maybe_ex) From 38c58935ff48877c515b17c998d2dace21a8c72d Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Tue, 26 Nov 2024 19:57:54 +0100 Subject: [PATCH 06/12] fix critical sections of flush --- nominal/core/stream.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/nominal/core/stream.py b/nominal/core/stream.py index e579045a..f2ed306a 100644 --- a/nominal/core/stream.py +++ b/nominal/core/stream.py @@ -108,10 +108,11 @@ def flush(self, wait: bool = False, timeout: float | None = None) -> None: NOTE: If none, waits indefinitely. """ - if not self._batch: - logger.debug("Not flushing... no enqueued batch") - self._last_batch_time = time.time() - return + with self._batch_lock: + if not self._batch: + logger.debug("Not flushing... no enqueued batch") + self._last_batch_time = time.time() + return self._pending_jobs.acquire() @@ -124,13 +125,14 @@ def process_future(fut: concurrent.futures.Future) -> None: # type: ignore[type else: logger.debug("Batched upload task succeeded") - logger.debug(f"Starting flush with {len(self._batch)} records") - future = self._executor.submit(self._process_batch, self._batch) - future.add_done_callback(process_future) + with self._batch_lock: + logger.debug(f"Starting flush with {len(self._batch)} records") + future = self._executor.submit(self._process_batch, self._batch) + future.add_done_callback(process_future) - # Clear metadata - self._batch = [] - self._last_batch_time = time.time() + # Clear metadata + self._batch = [] + self._last_batch_time = time.time() # Synchronously wait, if requested if wait: @@ -142,7 +144,8 @@ def process_future(fut: concurrent.futures.Future) -> None: # type: ignore[type def _process_timeout_batches(self) -> None: while self._running: now = time.time() - last_batch_time = self._last_batch_time + with self._batch_lock: + last_batch_time = self._last_batch_time timeout = max(self.max_wait_sec - (now - last_batch_time), 0) self._max_wait_event.wait(timeout=timeout) @@ -150,7 +153,7 @@ def _process_timeout_batches(self) -> None: # check if flush has been called in the mean time if self._last_batch_time > last_batch_time: continue - self.flush() + self.flush() def close(self, wait: bool = True) -> None: """Close the Nominal Stream. @@ -163,7 +166,6 @@ def close(self, wait: bool = True) -> None: self._max_wait_event.set() self._timeout_thread.join() - with self._batch_lock: - self.flush() + self.flush() self._executor.shutdown(wait=wait, cancel_futures=not wait) From 313a2419f2c2094cb6d67a0f533a30efe0d514e7 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Tue, 26 Nov 2024 20:29:39 +0100 Subject: [PATCH 07/12] use dict instead of Dict --- nominal/core/stream.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nominal/core/stream.py b/nominal/core/stream.py index f2ed306a..4c904f94 100644 --- a/nominal/core/stream.py +++ b/nominal/core/stream.py @@ -7,7 +7,7 @@ from dataclasses import dataclass from datetime import datetime from types import TracebackType -from typing import Callable, Dict, Sequence, Type +from typing import Callable, Sequence, Type from typing_extensions import TypeAlias @@ -21,7 +21,7 @@ class BatchItem: channel_name: str timestamp: str | datetime | IntegralNanosecondsUTC value: float - tags: Dict[str, str] | None = None + tags: dict[str, str] | None = None NominalWriteStream: TypeAlias = "WriteStream" @@ -67,7 +67,7 @@ def enqueue( channel_name: str, timestamp: str | datetime | IntegralNanosecondsUTC, value: float, - tags: Dict[str, str] | None = None, + tags: dict[str, str] | None = None, ) -> None: """Add a message to the queue. @@ -80,7 +80,7 @@ def enqueue_batch( channel_name: str, timestamps: Sequence[str | datetime | IntegralNanosecondsUTC], values: Sequence[float], - tags: Dict[str, str] | None = None, + tags: dict[str, str] | None = None, ) -> None: """Add a sequence of messages to the queue. From de56df65ce716c26f02158ce0ec291dab892f4e8 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Wed, 27 Nov 2024 13:27:37 +0100 Subject: [PATCH 08/12] use typefull duration for max_wait --- nominal/core/connection.py | 8 ++++---- nominal/core/stream.py | 8 ++++---- nominal/ts.py | 5 +++++ 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/nominal/core/connection.py b/nominal/core/connection.py index 295db5c2..327e2a8c 100644 --- a/nominal/core/connection.py +++ b/nominal/core/connection.py @@ -19,7 +19,7 @@ from nominal.core._utils import HasRid from nominal.core.channel import Channel from nominal.core.stream import BatchItem, NominalWriteStream, WriteStream -from nominal.ts import _SecondsNanos +from nominal.ts import IntegralSecondsDuration, _SecondsNanos @dataclass(frozen=True) @@ -151,13 +151,13 @@ def get_nominal_write_stream(self, batch_size: int = 10, max_wait_sec: int = 5) ) return self.get_write_stream(batch_size, max_wait_sec) - def get_write_stream(self, batch_size: int = 10, max_wait_sec: int = 5) -> WriteStream: + def get_write_stream(self, batch_size: int = 10, max_wait: IntegralSecondsDuration = 5) -> WriteStream: """Stream to write non-blocking messages to a datasource. Args: ---- batch_size (int): How big the batch can get before writing to Nominal. Default 10 - max_wait_sec (int): How long a batch can exist before being flushed to Nominal. Default 5 + max_wait (IntegralSecondsDuration): How long a batch can exist before being flushed to Nominal. Default 5 Examples: -------- @@ -180,7 +180,7 @@ def get_write_stream(self, batch_size: int = 10, max_wait_sec: int = 5) -> Write """ if self._nominal_data_source_rid is not None: - write_stream = WriteStream(self._process_batch, batch_size, max_wait_sec) + write_stream = WriteStream(self._process_batch, batch_size, max_wait) write_stream.start() return write_stream else: diff --git a/nominal/core/stream.py b/nominal/core/stream.py index 4c904f94..e644c20f 100644 --- a/nominal/core/stream.py +++ b/nominal/core/stream.py @@ -11,7 +11,7 @@ from typing_extensions import TypeAlias -from nominal.ts import IntegralNanosecondsUTC +from nominal.ts import IntegralNanosecondsUTC, IntegralSecondsDuration logger = logging.getLogger(__name__) @@ -32,13 +32,13 @@ def __init__( self, process_batch: Callable[[Sequence[BatchItem]], None], batch_size: int = 10, - max_wait_sec: int = 5, + max_wait: IntegralSecondsDuration = 5, max_workers: int | None = None, ): """Create the stream.""" self._process_batch = process_batch self.batch_size = batch_size - self.max_wait_sec = max_wait_sec + self.max_wait = max_wait self.max_workers = max_workers self._batch: list[BatchItem] = [] self._batch_lock = threading.Lock() @@ -146,7 +146,7 @@ def _process_timeout_batches(self) -> None: now = time.time() with self._batch_lock: last_batch_time = self._last_batch_time - timeout = max(self.max_wait_sec - (now - last_batch_time), 0) + timeout = max(self.max_wait - (now - last_batch_time), 0) self._max_wait_event.wait(timeout=timeout) with self._batch_lock: diff --git a/nominal/ts.py b/nominal/ts.py index 53c7c147..be16e2d4 100644 --- a/nominal/ts.py +++ b/nominal/ts.py @@ -224,12 +224,17 @@ class in Java: see java "TypedTimestampType", "IntegralNanosecondsUTC", "LogTimestampType", + "IntegralSecondsDuration", ] IntegralNanosecondsUTC: TypeAlias = int """Alias for an `int` used in the code for documentation purposes. This value is a timestamp in nanoseconds since the Unix epoch, UTC.""" +IntegralSecondsDuration: TypeAlias = int +"""Alias for an `int` used in the code for documentation purposes. +This value is a duration of the number of seconds.""" + LogTimestampType: TypeAlias = Literal["absolute", "relative"] From dc4ca00ef9fc922e252ffb858cdca8eb2d6c6406 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Tue, 3 Dec 2024 13:12:29 +0100 Subject: [PATCH 09/12] address issues from review --- nominal/core/connection.py | 9 +++++---- nominal/core/stream.py | 27 +++++++++++++-------------- nominal/ts.py | 5 ----- 3 files changed, 18 insertions(+), 23 deletions(-) diff --git a/nominal/core/connection.py b/nominal/core/connection.py index 327e2a8c..7929fbd3 100644 --- a/nominal/core/connection.py +++ b/nominal/core/connection.py @@ -3,6 +3,7 @@ import itertools import logging from dataclasses import dataclass, field +from datetime import timedelta from itertools import groupby from typing import Iterable, Mapping, Protocol, Sequence @@ -19,7 +20,7 @@ from nominal.core._utils import HasRid from nominal.core.channel import Channel from nominal.core.stream import BatchItem, NominalWriteStream, WriteStream -from nominal.ts import IntegralSecondsDuration, _SecondsNanos +from nominal.ts import _SecondsNanos @dataclass(frozen=True) @@ -149,15 +150,15 @@ def get_nominal_write_stream(self, batch_size: int = 10, max_wait_sec: int = 5) UserWarning, stacklevel=2, ) - return self.get_write_stream(batch_size, max_wait_sec) + return self.get_write_stream(batch_size, timedelta(seconds=max_wait_sec)) - def get_write_stream(self, batch_size: int = 10, max_wait: IntegralSecondsDuration = 5) -> WriteStream: + def get_write_stream(self, batch_size: int = 10, max_wait: timedelta = timedelta(seconds=5)) -> WriteStream: """Stream to write non-blocking messages to a datasource. Args: ---- batch_size (int): How big the batch can get before writing to Nominal. Default 10 - max_wait (IntegralSecondsDuration): How long a batch can exist before being flushed to Nominal. Default 5 + max_wait (timedelta): How long a batch can exist before being flushed to Nominal. Default 5 seconds Examples: -------- diff --git a/nominal/core/stream.py b/nominal/core/stream.py index e644c20f..c48a57c8 100644 --- a/nominal/core/stream.py +++ b/nominal/core/stream.py @@ -5,13 +5,11 @@ import threading import time from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timedelta from types import TracebackType from typing import Callable, Sequence, Type -from typing_extensions import TypeAlias - -from nominal.ts import IntegralNanosecondsUTC, IntegralSecondsDuration +from nominal.ts import IntegralNanosecondsUTC logger = logging.getLogger(__name__) @@ -24,15 +22,12 @@ class BatchItem: tags: dict[str, str] | None = None -NominalWriteStream: TypeAlias = "WriteStream" - - class WriteStream: def __init__( self, process_batch: Callable[[Sequence[BatchItem]], None], batch_size: int = 10, - max_wait: IntegralSecondsDuration = 5, + max_wait: timedelta = timedelta(seconds=5), max_workers: int | None = None, ): """Create the stream.""" @@ -52,7 +47,7 @@ def start(self) -> None: self._timeout_thread = threading.Thread(target=self._process_timeout_batches, daemon=True) self._timeout_thread.start() - def __enter__(self) -> "WriteStream": + def __enter__(self) -> WriteStream: """Create the stream as a context manager.""" return self @@ -126,14 +121,15 @@ def process_future(fut: concurrent.futures.Future) -> None: # type: ignore[type logger.debug("Batched upload task succeeded") with self._batch_lock: - logger.debug(f"Starting flush with {len(self._batch)} records") - future = self._executor.submit(self._process_batch, self._batch) - future.add_done_callback(process_future) - + batch = self._batch # Clear metadata self._batch = [] self._last_batch_time = time.time() + logger.debug(f"Starting flush with {len(batch)} records") + future = self._executor.submit(self._process_batch, batch) + future.add_done_callback(process_future) + # Synchronously wait, if requested if wait: # Warn user if timeout is too short @@ -146,7 +142,7 @@ def _process_timeout_batches(self) -> None: now = time.time() with self._batch_lock: last_batch_time = self._last_batch_time - timeout = max(self.max_wait - (now - last_batch_time), 0) + timeout = max(self.max_wait.seconds - (now - last_batch_time), 0) self._max_wait_event.wait(timeout=timeout) with self._batch_lock: @@ -169,3 +165,6 @@ def close(self, wait: bool = True) -> None: self.flush() self._executor.shutdown(wait=wait, cancel_futures=not wait) + + +NominalWriteStream = WriteStream diff --git a/nominal/ts.py b/nominal/ts.py index be16e2d4..53c7c147 100644 --- a/nominal/ts.py +++ b/nominal/ts.py @@ -224,17 +224,12 @@ class in Java: see java "TypedTimestampType", "IntegralNanosecondsUTC", "LogTimestampType", - "IntegralSecondsDuration", ] IntegralNanosecondsUTC: TypeAlias = int """Alias for an `int` used in the code for documentation purposes. This value is a timestamp in nanoseconds since the Unix epoch, UTC.""" -IntegralSecondsDuration: TypeAlias = int -"""Alias for an `int` used in the code for documentation purposes. -This value is a duration of the number of seconds.""" - LogTimestampType: TypeAlias = Literal["absolute", "relative"] From 4d50df74925a1e8c0079c05e476c3393e72f59ae Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Tue, 3 Dec 2024 13:15:43 +0100 Subject: [PATCH 10/12] fix: use reentrant lock now that lock is used in flush() --- nominal/core/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nominal/core/stream.py b/nominal/core/stream.py index c48a57c8..de7256d9 100644 --- a/nominal/core/stream.py +++ b/nominal/core/stream.py @@ -36,7 +36,7 @@ def __init__( self.max_wait = max_wait self.max_workers = max_workers self._batch: list[BatchItem] = [] - self._batch_lock = threading.Lock() + self._batch_lock = threading.RLock() self._last_batch_time = time.time() self._running = True self._max_wait_event = threading.Event() From caa8810801778bf965be2f48863dcf4bcd530958 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Wed, 4 Dec 2024 22:02:47 +0100 Subject: [PATCH 11/12] use frozen dataclass Also separate the thread-safe code in it's own class, this also provides a place for the mutable _last_time member that we can't have on the frozen dataclass. --- nominal/core/connection.py | 4 +- nominal/core/stream.py | 158 +++++++++++++++++++++++-------------- 2 files changed, 98 insertions(+), 64 deletions(-) diff --git a/nominal/core/connection.py b/nominal/core/connection.py index 7929fbd3..1b7ac8d2 100644 --- a/nominal/core/connection.py +++ b/nominal/core/connection.py @@ -181,9 +181,7 @@ def get_write_stream(self, batch_size: int = 10, max_wait: timedelta = timedelta """ if self._nominal_data_source_rid is not None: - write_stream = WriteStream(self._process_batch, batch_size, max_wait) - write_stream.start() - return write_stream + return WriteStream.create(batch_size, max_wait, self._process_batch) else: raise ValueError("Writing not implemented for this connection type") diff --git a/nominal/core/stream.py b/nominal/core/stream.py index de7256d9..0aaf0f8c 100644 --- a/nominal/core/stream.py +++ b/nominal/core/stream.py @@ -9,6 +9,8 @@ from types import TracebackType from typing import Callable, Sequence, Type +from typing_extensions import Self + from nominal.ts import IntegralNanosecondsUTC logger = logging.getLogger(__name__) @@ -22,30 +24,39 @@ class BatchItem: tags: dict[str, str] | None = None +@dataclass(frozen=True) class WriteStream: - def __init__( - self, + batch_size: int + max_wait: timedelta + _process_batch: Callable[[Sequence[BatchItem]], None] + _executor: concurrent.futures.ThreadPoolExecutor + _thread_safe_batch: ThreadSafeBatch + _stop: threading.Event + _pending_jobs: threading.BoundedSemaphore + + @classmethod + def create( + cls, + batch_size: int, + max_wait: timedelta, process_batch: Callable[[Sequence[BatchItem]], None], - batch_size: int = 10, - max_wait: timedelta = timedelta(seconds=5), - max_workers: int | None = None, - ): + ) -> Self: """Create the stream.""" - self._process_batch = process_batch - self.batch_size = batch_size - self.max_wait = max_wait - self.max_workers = max_workers - self._batch: list[BatchItem] = [] - self._batch_lock = threading.RLock() - self._last_batch_time = time.time() - self._running = True - self._max_wait_event = threading.Event() - self._pending_jobs = threading.BoundedSemaphore(3) + executor = concurrent.futures.ThreadPoolExecutor() + + instance = cls( + batch_size, + max_wait, + process_batch, + executor, + ThreadSafeBatch(), + threading.Event(), + threading.BoundedSemaphore(3), + ) + + executor.submit(instance._process_timeout_batches) - def start(self) -> None: - self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) - self._timeout_thread = threading.Thread(target=self._process_timeout_batches, daemon=True) - self._timeout_thread.start() + return instance def __enter__(self) -> WriteStream: """Create the stream as a context manager.""" @@ -86,28 +97,19 @@ def enqueue_batch( f"Expected equal numbers of timestamps and values! Received: {len(timestamps)} vs. {len(values)}" ) - with self._batch_lock: - for timestamp, value in zip(timestamps, values): - self._batch.append(BatchItem(channel_name, timestamp, value, tags)) + self._thread_safe_batch.add( + [BatchItem(channel_name, timestamp, value, tags) for timestamp, value in zip(timestamps, values)] + ) + self._flush(condition=lambda size: size >= self.batch_size) - if len(self._batch) >= self.batch_size: - self.flush() - - def flush(self, wait: bool = False, timeout: float | None = None) -> None: - """Flush current batch of records to nominal in a background thread. - - Args: - ---- - wait: If true, wait for the batch to complete uploading before returning - timeout: If wait is true, the time to wait for flush completion. - NOTE: If none, waits indefinitely. + def _flush(self, condition: Callable[[int], bool] | None = None) -> concurrent.futures.Future[None] | None: + batch = self._thread_safe_batch.swap(condition) - """ - with self._batch_lock: - if not self._batch: - logger.debug("Not flushing... no enqueued batch") - self._last_batch_time = time.time() - return + if batch is None: + return None + if not batch: + logger.debug("Not flushing... no enqueued batch") + return None self._pending_jobs.acquire() @@ -120,36 +122,43 @@ def process_future(fut: concurrent.futures.Future) -> None: # type: ignore[type else: logger.debug("Batched upload task succeeded") - with self._batch_lock: - batch = self._batch - # Clear metadata - self._batch = [] - self._last_batch_time = time.time() - logger.debug(f"Starting flush with {len(batch)} records") future = self._executor.submit(self._process_batch, batch) future.add_done_callback(process_future) + return future + + def flush(self, wait: bool = False, timeout: float | None = None) -> None: + """Flush current batch of records to nominal in a background thread. + + Args: + ---- + wait: If true, wait for the batch to complete uploading before returning + timeout: If wait is true, the time to wait for flush completion. + NOTE: If none, waits indefinitely. + + """ + future = self._flush() # Synchronously wait, if requested - if wait: + if wait and future is not None: # Warn user if timeout is too short _, pending = concurrent.futures.wait([future], timeout) if pending: logger.warning("Upload task still pending after flushing batch... increase timeout or setting to None") def _process_timeout_batches(self) -> None: - while self._running: + while not self._stop.is_set(): now = time.time() - with self._batch_lock: - last_batch_time = self._last_batch_time + + last_batch_time = self._thread_safe_batch.last_time timeout = max(self.max_wait.seconds - (now - last_batch_time), 0) - self._max_wait_event.wait(timeout=timeout) + self._stop.wait(timeout=timeout) - with self._batch_lock: - # check if flush has been called in the mean time - if self._last_batch_time > last_batch_time: - continue - self.flush() + # check if flush has been called in the mean time + if self._thread_safe_batch.last_time > last_batch_time: + continue + + self._flush() def close(self, wait: bool = True) -> None: """Close the Nominal Stream. @@ -157,14 +166,41 @@ def close(self, wait: bool = True) -> None: Stop the process timeout thread Flush any remaining batches """ - self._running = False - - self._max_wait_event.set() - self._timeout_thread.join() + self._stop.set() - self.flush() + self._flush() self._executor.shutdown(wait=wait, cancel_futures=not wait) NominalWriteStream = WriteStream + + +class ThreadSafeBatch: + def __init__(self) -> None: + """Thread-safe access to batch and last swap time.""" + self._batch: list[BatchItem] = [] + self._last_time = time.time() + self._lock = threading.Lock() + + def swap(self, condition: Callable[[int], bool] | None = None) -> list[BatchItem] | None: + """Swap the current batch with an empty one and return the old batch. + + If condition is provided, the swap will only occur if the condition is met, otherwise None is returned. + """ + with self._lock: + if condition and not condition(len(self._batch)): + return None + batch = self._batch + self._batch = [] + self._last_time = time.time() + return batch + + def add(self, items: Sequence[BatchItem]) -> None: + with self._lock: + self._batch.extend(items) + + @property + def last_time(self) -> float: + with self._lock: + return self._last_time From b53efaf257b726d76b05c0ce72825010fc2eef10 Mon Sep 17 00:00:00 2001 From: Mario Buikhuizen Date: Wed, 4 Dec 2024 22:22:59 +0100 Subject: [PATCH 12/12] add deprecation warning for NominalWriteStream --- nominal/core/connection.py | 4 ++-- nominal/core/stream.py | 18 ++++++++++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/nominal/core/connection.py b/nominal/core/connection.py index 1b7ac8d2..a6ea15cb 100644 --- a/nominal/core/connection.py +++ b/nominal/core/connection.py @@ -19,7 +19,7 @@ from nominal.core._clientsbunch import HasAuthHeader from nominal.core._utils import HasRid from nominal.core.channel import Channel -from nominal.core.stream import BatchItem, NominalWriteStream, WriteStream +from nominal.core.stream import BatchItem, WriteStream from nominal.ts import _SecondsNanos @@ -138,7 +138,7 @@ def get_channel(self, name: str, tags: dict[str, str] | None = None) -> Channel: series = self._clients.logical_series.get_logical_series(self._clients.auth_header, resolved_series.rid) return Channel._from_conjure_logicalseries_api(self._clients, series) - def get_nominal_write_stream(self, batch_size: int = 10, max_wait_sec: int = 5) -> NominalWriteStream: + def get_nominal_write_stream(self, batch_size: int = 10, max_wait_sec: int = 5) -> WriteStream: """get_nominal_write_stream is deprecated and will be removed in a future version, use get_write_stream instead. """ diff --git a/nominal/core/stream.py b/nominal/core/stream.py index 0aaf0f8c..22a9e83d 100644 --- a/nominal/core/stream.py +++ b/nominal/core/stream.py @@ -4,15 +4,28 @@ import logging import threading import time +import warnings from dataclasses import dataclass from datetime import datetime, timedelta from types import TracebackType -from typing import Callable, Sequence, Type +from typing import Any, Callable, Sequence, Type from typing_extensions import Self from nominal.ts import IntegralNanosecondsUTC + +def __getattr__(name: str) -> Any: + if name == "NominalWriteStream": + warnings.warn( + "NominalWriteStream is deprecated, use WriteStream instead", + DeprecationWarning, + stacklevel=2, + ) + return WriteStream + raise AttributeError(f"module '{__name__}' has no attribute '{name}'") + + logger = logging.getLogger(__name__) @@ -173,9 +186,6 @@ def close(self, wait: bool = True) -> None: self._executor.shutdown(wait=wait, cancel_futures=not wait) -NominalWriteStream = WriteStream - - class ThreadSafeBatch: def __init__(self) -> None: """Thread-safe access to batch and last swap time."""