Skip to content

Commit ae607c5

Browse files
committed
major overhaul to make workflow easier to follow
1 parent b60b4a4 commit ae607c5

File tree

1 file changed

+166
-101
lines changed

1 file changed

+166
-101
lines changed

quixstreams/sources/community/influxdb3/influxdb3.py

Lines changed: 166 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import time
55
from datetime import datetime, timedelta, timezone
66
from functools import wraps
7-
from typing import Optional
7+
from typing import Optional, Union
88

99
from influxdb_client_3 import InfluxDBClient3
1010

@@ -15,23 +15,27 @@
1515
Source,
1616
)
1717

18+
__all__ = ("InfluxDB3Source",)
19+
20+
1821
logger = logging.getLogger(__name__)
1922

2023
TIME_UNITS = {"s": 1, "m": 60, "h": 3600, "d": 86400}
2124

22-
__all__ = ("InfluxDB3Source",)
25+
26+
class NoMeasurementsFound(Exception): ...
2327

2428

2529
# retry decorator
2630
def with_retry(func):
2731
@wraps(func)
2832
def wrapper(self, *args, **kwargs):
29-
max_attempts = self.max_attempts
30-
attempts_remaining = self.max_attempts
33+
max_attempts = self._max_attempts
34+
attempts_remaining = self._max_attempts
3135
backoff = 1 # Initial backoff in seconds
3236
while attempts_remaining:
3337
try:
34-
return func(*args, **kwargs)
38+
return func(self, *args, **kwargs)
3539
except Exception as e:
3640
logger.debug(f"{func.__name__} encountered an error: {e}")
3741
attempts_remaining -= 1
@@ -50,16 +54,43 @@ def wrapper(self, *args, **kwargs):
5054
return wrapper
5155

5256

57+
def _interval_to_seconds(interval: str) -> int:
58+
return int(interval[:-1]) * TIME_UNITS[interval[-1]]
59+
60+
61+
def _set_sql_query(sql_query: str) -> str:
62+
return (
63+
sql_query.format(start_time="$start_time", end_time="$end_time")
64+
or "SELECT * FROM {measurement_name} " # noqa: S608
65+
"WHERE time >= $start_time "
66+
"AND time < $end_time"
67+
)
68+
69+
5370
class InfluxDB3Source(Source):
71+
"""
72+
InfluxDB3Source extracts data from a specified set of measurements in a
73+
database (or all available ones if none are specified).
74+
75+
It processes measurements sequentially by gathering/producing a tumbling
76+
"time_delta"-sized window of data, starting from 'start_date' and eventually
77+
stopping at 'end_date'.
78+
79+
Once stopped, it processes the next measurement, until all are complete.
80+
81+
If no 'end_date' is provided, it will run indefinitely for a single
82+
measurement (which means no other measurements will be processed!).
83+
"""
84+
5485
def __init__(
5586
self,
56-
influxdb_host: str,
57-
influxdb_token: str,
58-
influxdb_org: str,
59-
influxdb_database: str,
60-
start_date: datetime,
87+
host: str,
88+
token: str,
89+
organization_id: str,
90+
database: str,
91+
start_date: datetime = datetime.now(tz=timezone.utc),
6192
end_date: Optional[datetime] = None,
62-
measurement: Optional[str] = None,
93+
measurements: Optional[Union[str, list[str]]] = None,
6394
sql_query: Optional[str] = None,
6495
time_delta: str = "5m",
6596
delay: float = 0,
@@ -70,13 +101,13 @@ def __init__(
70101
on_client_connect_failure: Optional[ClientConnectFailureCallback] = None,
71102
) -> None:
72103
"""
73-
:param influxdb_host: Host URL of the InfluxDB instance.
74-
:param influxdb_token: Authentication token for InfluxDB.
75-
:param influxdb_org: Organization name in InfluxDB.
76-
:param influxdb_database: Database name in InfluxDB.
77-
:param start_date: The start datetime for querying InfluxDB.
78-
:param end_date: The end datetime for querying InfluxDB.
79-
:param measurement: The measurement to query. If None, all measurements will be processed.
104+
:param host: Host URL of the InfluxDB instance.
105+
:param token: Authentication token for InfluxDB.
106+
:param organization_id: Organization name in InfluxDB.
107+
:param database: Database name in InfluxDB.
108+
:param start_date: The start datetime for querying InfluxDB. Uses current time by default.
109+
:param end_date: The end datetime for querying InfluxDB. If none provided, runs indefinitely for a single measurement.
110+
:param measurements: The measurements to query. If None, all measurements will be processed.
80111
:param sql_query: Custom SQL query for retrieving data. If provided, it overrides the default logic.
81112
:param time_delta: Time interval for batching queries, e.g., "5m" for 5 minutes.
82113
:param delay: An optional delay between producing batches.
@@ -90,117 +121,155 @@ def __init__(
90121
Callback should accept the raised Exception as an argument.
91122
Callback must resolve (or propagate/re-raise) the Exception.
92123
"""
124+
if isinstance(measurements, str):
125+
measurements = [measurements]
126+
measurements = measurements or []
93127
super().__init__(
94-
name=name or f"influxdb_{influxdb_database}_{measurement}",
128+
name=name
129+
or f"influxdb3_{database}_{'-'.join(measurements) or 'measurements'}",
95130
shutdown_timeout=shutdown_timeout,
96131
on_client_connect_success=on_client_connect_success,
97132
on_client_connect_failure=on_client_connect_failure,
98133
)
99134

100135
self._client_kwargs = {
101-
"host": influxdb_host,
102-
"token": influxdb_token,
103-
"org": influxdb_org,
104-
"database": influxdb_database,
136+
"host": host,
137+
"token": token,
138+
"org": organization_id,
139+
"database": database,
105140
}
106-
self.measurement = measurement
107-
self.sql_query = sql_query # Custom SQL query
108-
self.start_date = start_date
109-
self.end_date = end_date
110-
self.time_delta = time_delta
111-
self.delay = delay
112-
self.max_retries = max_retries
141+
self._measurements = measurements
142+
self._sql_query = _set_sql_query(sql_query or "")
143+
self._start_date = start_date
144+
self._end_date = end_date
145+
self._time_delta_seconds = _interval_to_seconds(time_delta)
146+
self._delay = delay
147+
self._max_attempts = max_retries + 1
113148

114149
self._client: Optional[InfluxDBClient3] = None
115150

116151
def setup(self):
117-
self._client = InfluxDBClient3(**self._client_kwargs)
118-
try:
119-
# We cannot safely parameterize the table (measurement) selection, so
120-
# the best we can do is confirm authentication was successful
121-
self._client.query("")
122-
except Exception as e:
123-
if "No SQL statements were provided in the query string" not in str(e):
124-
raise
152+
if not self._client:
153+
self._client = InfluxDBClient3(**self._client_kwargs)
154+
self._client.query(
155+
query="SHOW MEASUREMENTS", mode="pandas", language="influxql"
156+
)
157+
158+
def _close_client(self):
159+
if self._client:
160+
self._client.close()
161+
self._client = None
162+
163+
@property
164+
def _measurement_names(self) -> list[str]:
165+
if not self._measurements:
166+
self._measurements = self._get_measurements()
167+
return self._measurements
125168

126169
@with_retry
127-
def produce_records(self, records: list[dict], measurement_name: str):
170+
def _produce_records(self, records: list[dict], measurement_name: str):
128171
for record in records:
172+
# TODO: a key, value, and timestamp setter
129173
msg = self.serialize(
130174
key=f"{measurement_name}_{random.randint(1, 1000)}", # noqa: S311
131175
value=record,
132176
)
133177
self.produce(value=msg.value, key=msg.key)
134178
self.producer.flush()
135179

136-
def run(self):
137-
measurements = (
138-
[self.measurement] if self.measurement else self.get_measurements()
180+
def _get_measurements(self) -> list[str]:
181+
try:
182+
result = self._client.query(
183+
query="SHOW MEASUREMENTS", mode="pandas", language="influxql"
184+
)
185+
except Exception as e:
186+
logger.error(f"Failed to retrieve measurements: {e}")
187+
raise
188+
else:
189+
if result.empty:
190+
raise NoMeasurementsFound(
191+
"query 'SHOW MEASUREMENTS' returned an empty result set"
192+
)
193+
return result["name"].tolist()
194+
195+
@with_retry
196+
def _query_data(self, measurement_name, start_time, end_time):
197+
logger.info(
198+
f"Executing query for {measurement_name} FROM '{start_time}' TO '{end_time}'"
139199
)
200+
try:
201+
return self._client.query(
202+
query=self._sql_query.format(measurement_name=measurement_name),
203+
mode="pandas",
204+
language="influxql",
205+
query_parameters={
206+
"start_time": start_time.isoformat(),
207+
"end_time": end_time.isoformat(),
208+
},
209+
)
210+
except Exception as e:
211+
logger.error(f"Query failed for measurement {measurement_name}: {e}")
212+
raise
140213

141-
for measurement_name in measurements:
142-
logger.info(f"Processing measurement: {measurement_name}")
143-
_start_date = self.start_date
144-
_end_date = self.end_date
214+
def _do_measurement_processing(self, current_time: datetime) -> bool:
215+
if not self.running:
216+
logger.info("Stopping all measurement processing...")
217+
return False
218+
if not self._end_date or (current_time < self._end_date):
219+
return True
220+
logger.info(f"Measurement is now at defined end_date: {self._end_date}")
221+
return False
145222

146-
is_running = (
147-
True
148-
if _end_date is None
149-
else (self.running and _start_date < _end_date)
150-
)
223+
def _process_measurement(self, measurement_name):
224+
logger.info(f"Processing measurement: {measurement_name}")
225+
start_time = self._start_date
151226

152-
while is_running:
153-
end_time = _start_date + timedelta(
154-
seconds=interval_to_seconds(self.time_delta)
155-
)
156-
wait_time = (end_time - datetime.now(timezone.utc)).total_seconds()
157-
if _end_date is None and wait_time > 0:
158-
logger.info(f"Sleeping for {wait_time}s")
159-
time.sleep(wait_time)
227+
while self._do_measurement_processing(start_time):
228+
end_time = start_time + timedelta(seconds=self._time_delta_seconds)
160229

161-
data = self.query_data(measurement_name, _start_date, end_time)
162-
if data is not None and not data.empty:
163-
if "iox::measurement" in data.columns:
164-
data = data.drop(columns=["iox::measurement"])
165-
self.produce_records(
166-
json.loads(data.to_json(orient="records", date_format="iso")),
167-
data["measurement_name"],
230+
# TODO: maybe allow querying more frequently once "caught up"?
231+
if self._end_date is None:
232+
if wait_time := max(
233+
0.0, (end_time - datetime.now(timezone.utc)).total_seconds()
234+
):
235+
logger.info(
236+
f"At current time; sleeping for {wait_time}s "
237+
f"to allow a {self._time_delta_seconds}s query window"
168238
)
239+
time.sleep(wait_time)
169240

170-
_start_date = end_time
171-
if self.delay > 0:
172-
time.sleep(self.delay)
241+
data = self._query_data(measurement_name, start_time, end_time)
242+
if data is not None and not data.empty:
243+
if "iox::measurement" in data.columns:
244+
data = data.drop(columns=["iox::measurement"])
245+
data["measurement_name"] = measurement_name
246+
self._produce_records(
247+
json.loads(data.to_json(orient="records", date_format="iso")),
248+
data,
249+
)
173250

174-
def get_measurements(self):
175-
try:
176-
query = "SHOW MEASUREMENTS"
177-
result = self._client.query(query=query, mode="pandas", language="influxql")
178-
return result["name"].tolist() if not result.empty else []
179-
except Exception as e:
180-
logger.error(f"Failed to retrieve measurements: {e}")
181-
return []
251+
start_time = end_time
252+
if self._delay > 0:
253+
logger.debug(f"Applying query delay of {self._delay}s")
254+
time.sleep(self._delay)
255+
logger.debug(f"Ended processing for {measurement_name}.")
182256

183-
@with_retry
184-
def query_data(self, measurement_name, start_time, end_time):
257+
def run(self):
258+
if len(self._measurement_names) > 1 and not self._end_date:
259+
logger.warning(
260+
"More than one measurement was found and no end_date "
261+
f"was specified; only measurement '{self._measurement_names[0]}' "
262+
f"will be processed!"
263+
)
264+
measurement_names = iter(self._measurement_names)
185265
try:
186-
if self.sql_query:
187-
query = self.sql_query.format(
188-
measurement_name=measurement_name,
189-
start_time=start_time.isoformat(),
190-
end_time=end_time.isoformat(),
191-
)
192-
else:
193-
query = (
194-
f'SELECT * FROM "{measurement_name}" ' # noqa: S608
195-
f"WHERE time >= '{start_time.isoformat()}' "
196-
f"AND time < '{end_time.isoformat()}'"
197-
)
198-
199-
logger.info(f"Executing query: {query}")
200-
return self._client.query(query=query, mode="pandas", language="influxql")
201-
except Exception as e:
202-
logger.error(f"Query failed for measurement {measurement_name}: {e}")
203-
raise
266+
while self.running:
267+
self._process_measurement(next(measurement_names))
268+
except StopIteration:
269+
logger.info("Finished processing all measurements.")
270+
finally:
271+
logger.info("Stopping InfluxDB3 client...")
272+
self._close_client()
204273

205274
def default_topic(self) -> Topic:
206275
return Topic(
@@ -210,7 +279,3 @@ def default_topic(self) -> Topic:
210279
value_deserializer="json",
211280
value_serializer="json",
212281
)
213-
214-
215-
def interval_to_seconds(interval: str) -> int:
216-
return int(interval[:-1]) * TIME_UNITS[interval[-1]]

0 commit comments

Comments
 (0)