Skip to content

Commit f6232d7

Browse files
committed
initial push, may touch up more later
1 parent fd9688b commit f6232d7

File tree

2 files changed

+218
-0
lines changed

2 files changed

+218
-0
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# ruff: noqa: F403
2+
from .influxdb3 import *
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
import json
2+
import logging
3+
import random
4+
import time
5+
from datetime import datetime, timedelta, timezone
6+
from functools import wraps
7+
from typing import Optional
8+
9+
from influxdb_client_3 import InfluxDBClient3
10+
11+
from quixstreams.models.topics import Topic
12+
from quixstreams.sources import (
13+
ClientConnectFailureCallback,
14+
ClientConnectSuccessCallback,
15+
Source,
16+
)
17+
18+
logger = logging.getLogger(__name__)
19+
20+
TIME_UNITS = {"s": 1, "m": 60, "h": 3600, "d": 86400}
21+
22+
__all__ = ("InfluxDB3Source",)
23+
24+
25+
# retry decorator
26+
def with_retry(func):
27+
@wraps(func)
28+
def wrapper(self, *args, **kwargs):
29+
max_attempts = self.max_attempts
30+
attempts_remaining = self.max_attempts
31+
backoff = 1 # Initial backoff in seconds
32+
while attempts_remaining:
33+
try:
34+
return func(*args, **kwargs)
35+
except Exception as e:
36+
logger.debug(f"{func.__name__} encountered an error: {e}")
37+
attempts_remaining -= 1
38+
if attempts_remaining:
39+
logger.warning(
40+
f"{func.__name__} failed and is retrying; "
41+
f"backing off for {backoff}s (attempt "
42+
f"{max_attempts-attempts_remaining}/{max_attempts})"
43+
)
44+
time.sleep(backoff)
45+
backoff *= 2 # Exponential backoff
46+
else:
47+
logger.error(f"Maximum retries reached for {func.__name__}: {e}")
48+
raise
49+
50+
return wrapper
51+
52+
53+
class InfluxDB3Source(Source):
54+
def __init__(
55+
self,
56+
influxdb_host: str,
57+
influxdb_token: str,
58+
influxdb_org: str,
59+
influxdb_database: str,
60+
start_date: datetime,
61+
end_date: Optional[datetime] = None,
62+
measurement: Optional[str] = None,
63+
sql_query: Optional[str] = None,
64+
time_delta: str = "5m",
65+
delay: float = 0,
66+
max_retries: int = 5,
67+
name: Optional[str] = None,
68+
shutdown_timeout: float = 10,
69+
on_client_connect_success: Optional[ClientConnectSuccessCallback] = None,
70+
on_client_connect_failure: Optional[ClientConnectFailureCallback] = None,
71+
) -> None:
72+
"""
73+
:param influxdb_host: Host URL of the InfluxDB instance.
74+
:param influxdb_token: Authentication token for InfluxDB.
75+
:param influxdb_org: Organization name in InfluxDB.
76+
:param influxdb_database: Database name in InfluxDB.
77+
:param start_date: The start datetime for querying InfluxDB.
78+
:param end_date: The end datetime for querying InfluxDB.
79+
:param measurement: The measurement to query. If None, all measurements will be processed.
80+
:param sql_query: Custom SQL query for retrieving data. If provided, it overrides the default logic.
81+
:param time_delta: Time interval for batching queries, e.g., "5m" for 5 minutes.
82+
:param delay: An optional delay between producing batches.
83+
:param name: A unique name for the Source, used as part of the topic name.
84+
:param shutdown_timeout: Time in seconds to wait for graceful shutdown.
85+
:param max_retries: Maximum number of retries for querying or producing.
86+
:param on_client_connect_success: An optional callback made after successful
87+
client authentication, primarily for additional logging.
88+
:param on_client_connect_failure: An optional callback made after failed
89+
client authentication (which should raise an Exception).
90+
Callback should accept the raised Exception as an argument.
91+
Callback must resolve (or propagate/re-raise) the Exception.
92+
"""
93+
super().__init__(
94+
name=name or f"influxdb_{influxdb_database}_{measurement}",
95+
shutdown_timeout=shutdown_timeout,
96+
on_client_connect_success=on_client_connect_success,
97+
on_client_connect_failure=on_client_connect_failure,
98+
)
99+
100+
self._client_kwargs = {
101+
"host": influxdb_host,
102+
"token": influxdb_token,
103+
"org": influxdb_org,
104+
"database": influxdb_database,
105+
}
106+
self.measurement = measurement
107+
self.sql_query = sql_query # Custom SQL query
108+
self.start_date = start_date
109+
self.end_date = end_date
110+
self.time_delta = time_delta
111+
self.delay = delay
112+
self.max_retries = max_retries
113+
114+
self._client: Optional[InfluxDBClient3] = None
115+
116+
def setup(self):
117+
self._client = InfluxDBClient3(**self._client_kwargs)
118+
try:
119+
# We cannot safely parameterize the table (measurement) selection, so
120+
# the best we can do is confirm authentication was successful
121+
self._client.query("")
122+
except Exception as e:
123+
if "No SQL statements were provided in the query string" not in str(e):
124+
raise
125+
126+
@with_retry
127+
def produce_records(self, records: list[dict], measurement_name: str):
128+
for record in records:
129+
msg = self.serialize(
130+
key=f"{measurement_name}_{random.randint(1, 1000)}", # noqa: S311
131+
value=record,
132+
)
133+
self.produce(value=msg.value, key=msg.key)
134+
self.producer.flush()
135+
136+
def run(self):
137+
measurements = (
138+
[self.measurement] if self.measurement else self.get_measurements()
139+
)
140+
141+
for measurement_name in measurements:
142+
logger.info(f"Processing measurement: {measurement_name}")
143+
_start_date = self.start_date
144+
_end_date = self.end_date
145+
146+
is_running = (
147+
True
148+
if _end_date is None
149+
else (self.running and _start_date < _end_date)
150+
)
151+
152+
while is_running:
153+
end_time = _start_date + timedelta(
154+
seconds=interval_to_seconds(self.time_delta)
155+
)
156+
wait_time = (end_time - datetime.now(timezone.utc)).total_seconds()
157+
if _end_date is None and wait_time > 0:
158+
logger.info(f"Sleeping for {wait_time}s")
159+
time.sleep(wait_time)
160+
161+
data = self.query_data(measurement_name, _start_date, end_time)
162+
if data is not None and not data.empty:
163+
if "iox::measurement" in data.columns:
164+
data = data.drop(columns=["iox::measurement"])
165+
self.produce_records(
166+
json.loads(data.to_json(orient="records", date_format="iso")),
167+
data["measurement_name"],
168+
)
169+
170+
_start_date = end_time
171+
if self.delay > 0:
172+
time.sleep(self.delay)
173+
174+
def get_measurements(self):
175+
try:
176+
query = "SHOW MEASUREMENTS"
177+
result = self._client.query(query=query, mode="pandas", language="influxql")
178+
return result["name"].tolist() if not result.empty else []
179+
except Exception as e:
180+
logger.error(f"Failed to retrieve measurements: {e}")
181+
return []
182+
183+
@with_retry
184+
def query_data(self, measurement_name, start_time, end_time):
185+
try:
186+
if self.sql_query:
187+
query = self.sql_query.format(
188+
measurement_name=measurement_name,
189+
start_time=start_time.isoformat(),
190+
end_time=end_time.isoformat(),
191+
)
192+
else:
193+
query = (
194+
f'SELECT * FROM "{measurement_name}" ' # noqa: S608
195+
f"WHERE time >= '{start_time.isoformat()}' "
196+
f"AND time < '{end_time.isoformat()}'"
197+
)
198+
199+
logger.info(f"Executing query: {query}")
200+
return self._client.query(query=query, mode="pandas", language="influxql")
201+
except Exception as e:
202+
logger.error(f"Query failed for measurement {measurement_name}: {e}")
203+
raise
204+
205+
def default_topic(self) -> Topic:
206+
return Topic(
207+
name=self.name,
208+
key_serializer="string",
209+
key_deserializer="string",
210+
value_deserializer="json",
211+
value_serializer="json",
212+
)
213+
214+
215+
def interval_to_seconds(interval: str) -> int:
216+
return int(interval[:-1]) * TIME_UNITS[interval[-1]]

0 commit comments

Comments
 (0)