Skip to content

fix: issues of post merge review of #106 #151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions nominal/core/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import itertools
import logging
from dataclasses import dataclass, field
from datetime import timedelta
from itertools import groupby
from typing import Iterable, Mapping, Protocol, Sequence

Expand All @@ -18,7 +19,7 @@
from nominal.core._clientsbunch import HasAuthHeader
from nominal.core._utils import HasRid
from nominal.core.channel import Channel
from nominal.core.stream import BatchItem, NominalWriteStream
from nominal.core.stream import BatchItem, NominalWriteStream, WriteStream
from nominal.ts import _SecondsNanos


Expand Down Expand Up @@ -138,26 +139,40 @@ def get_channel(self, name: str, tags: dict[str, str] | None = None) -> Channel:
return Channel._from_conjure_logicalseries_api(self._clients, series)

def get_nominal_write_stream(self, batch_size: int = 10, max_wait_sec: int = 5) -> NominalWriteStream:
"""Nominal Stream to write non-blocking messages to a datasource.
"""get_nominal_write_stream is deprecated and will be removed in a future version,
use get_write_stream instead.
"""
import warnings

warnings.warn(
"get_nominal_write_stream is deprecated and will be removed in a future version,"
"use get_write_stream instead.",
UserWarning,
stacklevel=2,
)
return self.get_write_stream(batch_size, timedelta(seconds=max_wait_sec))

def get_write_stream(self, batch_size: int = 10, max_wait: timedelta = timedelta(seconds=5)) -> WriteStream:
"""Stream to write non-blocking messages to a datasource.

Args:
----
batch_size (int): How big the batch can get before writing to Nominal. Default 10
max_wait_sec (int): How long a batch can exist before being flushed to Nominal. Default 5
max_wait (timedelta): How long a batch can exist before being flushed to Nominal. Default 5 seconds

Examples:
--------
Standard Usage:
```py
with connection.get_nominal_write_stream() as stream:
with connection.get_write_stream() as stream:
stream.enqueue("my_channel_name", "2021-01-01T00:00:00Z", 42.0)
stream.enqueue("my_channel_name2", "2021-01-01T00:00:01Z", 43.0, {"tag1": "value1"})
...
```

Without a context manager:
```py
stream = connection.get_nominal_write_stream()
stream = connection.get_write_stream()
stream.enqueue("my_channel_name", "2021-01-01T00:00:00Z", 42.0)
stream.enqueue("my_channel_name2", "2021-01-01T00:00:01Z", 43.0, {"tag1": "value1"})
...
Expand All @@ -166,7 +181,9 @@ def get_nominal_write_stream(self, batch_size: int = 10, max_wait_sec: int = 5)

"""
if self._nominal_data_source_rid is not None:
return NominalWriteStream(self._process_batch, batch_size, max_wait_sec)
write_stream = WriteStream(self._process_batch, batch_size, max_wait)
write_stream.start()
return write_stream
else:
raise ValueError("Writing not implemented for this connection type")

Expand Down
72 changes: 47 additions & 25 deletions nominal/core/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import threading
import time
from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, timedelta
from types import TracebackType
from typing import Callable, Dict, Sequence, Type
from typing import Callable, Sequence, Type

from nominal.ts import IntegralNanosecondsUTC

Expand All @@ -19,31 +19,35 @@ class BatchItem:
channel_name: str
timestamp: str | datetime | IntegralNanosecondsUTC
value: float
tags: Dict[str, str] | None = None
tags: dict[str, str] | None = None


class NominalWriteStream:
class WriteStream:
def __init__(
self,
process_batch: Callable[[Sequence[BatchItem]], None],
batch_size: int = 10,
max_wait_sec: int = 5,
max_wait: timedelta = timedelta(seconds=5),
max_workers: int | None = None,
):
"""Create the stream."""
self._process_batch = process_batch
self.batch_size = batch_size
self.max_wait_sec = max_wait_sec
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
self.max_wait = max_wait
self.max_workers = max_workers
self._batch: list[BatchItem] = []
self._batch_lock = threading.Lock()
self._batch_lock = threading.RLock()
self._last_batch_time = time.time()
self._running = True
self._max_wait_event = threading.Event()
self._pending_jobs = threading.BoundedSemaphore(3)

def start(self) -> None:
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
self._timeout_thread = threading.Thread(target=self._process_timeout_batches, daemon=True)
self._timeout_thread.start()

def __enter__(self) -> "NominalWriteStream":
def __enter__(self) -> WriteStream:
"""Create the stream as a context manager."""
return self

Expand All @@ -58,7 +62,7 @@ def enqueue(
channel_name: str,
timestamp: str | datetime | IntegralNanosecondsUTC,
value: float,
tags: Dict[str, str] | None = None,
tags: dict[str, str] | None = None,
) -> None:
"""Add a message to the queue.

Expand All @@ -71,7 +75,7 @@ def enqueue_batch(
channel_name: str,
timestamps: Sequence[str | datetime | IntegralNanosecondsUTC],
values: Sequence[float],
tags: Dict[str, str] | None = None,
tags: dict[str, str] | None = None,
) -> None:
"""Add a sequence of messages to the queue.

Expand Down Expand Up @@ -99,25 +103,32 @@ def flush(self, wait: bool = False, timeout: float | None = None) -> None:
NOTE: If none, waits indefinitely.

"""
if not self._batch:
logger.debug("Not flushing... no enqueued batch")
return
with self._batch_lock:
if not self._batch:
logger.debug("Not flushing... no enqueued batch")
self._last_batch_time = time.time()
return

self._pending_jobs.acquire()

def process_future(fut: concurrent.futures.Future) -> None: # type: ignore[type-arg]
"""Callback to print errors to the console if a batch upload fails."""
self._pending_jobs.release()
maybe_ex = fut.exception()
if maybe_ex is not None:
logger.error("Batched upload task failed with exception", exc_info=maybe_ex)
else:
logger.debug("Batched upload task succeeded")

logger.debug(f"Starting flush with {len(self._batch)} records")
future = self._executor.submit(self._process_batch, self._batch)
future.add_done_callback(process_future)
with self._batch_lock:
batch = self._batch
# Clear metadata
self._batch = []
self._last_batch_time = time.time()

# Clear metadata
self._batch = []
self._last_batch_time = time.time()
logger.debug(f"Starting flush with {len(batch)} records")
future = self._executor.submit(self._process_batch, batch)
future.add_done_callback(process_future)

# Synchronously wait, if requested
if wait:
Expand All @@ -128,10 +139,17 @@ def process_future(fut: concurrent.futures.Future) -> None: # type: ignore[type

def _process_timeout_batches(self) -> None:
while self._running:
time.sleep(self.max_wait_sec / 10)
now = time.time()
with self._batch_lock:
last_batch_time = self._last_batch_time
timeout = max(self.max_wait.seconds - (now - last_batch_time), 0)
self._max_wait_event.wait(timeout=timeout)

with self._batch_lock:
if self._batch and (time.time() - self._last_batch_time) >= self.max_wait_sec:
self.flush()
# check if flush has been called in the mean time
if self._last_batch_time > last_batch_time:
continue
self.flush()

def close(self, wait: bool = True) -> None:
"""Close the Nominal Stream.
Expand All @@ -140,9 +158,13 @@ def close(self, wait: bool = True) -> None:
Flush any remaining batches
"""
self._running = False

self._max_wait_event.set()
self._timeout_thread.join()

with self._batch_lock:
self.flush()
self.flush()

self._executor.shutdown(wait=wait, cancel_futures=not wait)


NominalWriteStream = WriteStream
Loading