Skip to content

Commit 05a9340

Browse files
committed
add new key and timestamp setter, ability to name the measurement name column
1 parent 1dca50f commit 05a9340

File tree

2 files changed

+45
-19
lines changed

2 files changed

+45
-19
lines changed

docs/connectors/sources/influxdb3-source.md

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,17 +86,28 @@ Here are the InfluxDB-related configurations to be aware of (see [InfluxDB3Sourc
8686

8787

8888
### Optional:
89-
90-
- `start_date`: The start datetime for querying InfluxDB. Uses current time by default.
91-
- `end_date`: The end datetime for querying InfluxDB. If none provided, runs indefinitely for a single measurement.
92-
- `measurements`: The measurements to query. If None, all measurements will be processed.
89+
- `key_setter`: sets the kafka message key for a measurement record.
90+
By default, will set the key to the measurement's name.
91+
- `timestamp_setter`: sets the kafka message timestamp for a measurement record.
92+
By default, the timestamp will be the Kafka default (Kafka produce time).
93+
- `start_date`: The start datetime for querying InfluxDB.
94+
Uses current time by default.
95+
- `end_date`: The end datetime for querying InfluxDB.
96+
If none provided, runs indefinitely for a single measurement.
97+
- `measurements`: The measurements to query.
98+
If None, all measurements will be processed.
99+
- `measurement_column_name`: The column name used for appending the measurement name to the record.
100+
Default: `_measurement_name`.
93101
- `sql_query`: Custom SQL query for retrieving data.
94-
Query expects a `{start_time}`, `{end_time}`, and `{measurement_name}` for later formatting.
95-
If provided, it overrides the default window-query logic.
96-
- `time_delta`: Time interval for batching queries, e.g. "5m" for 5 minutes.
97-
- `delay`: Add a delay (in seconds) between producing batches.
98-
- `max_retries`: Maximum number of retries for querying or producing;
99-
Note that consecutive retries have a multiplicative backoff.
102+
Query expects a `{start_time}`, `{end_time}`, and `{measurement_name}` for later formatting.
103+
If provided, it overrides the default window-query logic.
104+
- `time_delta`: Time interval for batching queries, e.g. "5m" for 5 minutes.
105+
Default: `5m`.
106+
- `delay`: Add a delay (in seconds) between producing batches.
107+
Default: `0`.
108+
- `max_retries`: Maximum number of retries for querying or producing;
109+
Note that consecutive retries have a multiplicative backoff.
110+
Default: `5`.
100111

101112

102113
## Testing Locally

quixstreams/sources/community/influxdb3/influxdb3.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import json
22
import logging
3-
import random
43
import time
54
from datetime import datetime, timedelta, timezone
65
from functools import wraps
7-
from typing import Optional, Union
6+
from typing import Callable, Optional, Union
87

98
from influxdb_client_3 import InfluxDBClient3
109

@@ -92,9 +91,12 @@ def __init__(
9291
token: str,
9392
organization_id: str,
9493
database: str,
94+
key_setter: Optional[Callable[[object], object]] = None,
95+
timestamp_setter: Optional[Callable[[object], int]] = None,
9596
start_date: datetime = datetime.now(tz=timezone.utc),
9697
end_date: Optional[datetime] = None,
9798
measurements: Optional[Union[str, list[str]]] = None,
99+
measurement_column_name: str = "_measurement_name",
98100
sql_query: Optional[str] = None,
99101
time_delta: str = "5m",
100102
delay: float = 0,
@@ -109,9 +111,15 @@ def __init__(
109111
:param token: Authentication token for InfluxDB.
110112
:param organization_id: Organization name in InfluxDB.
111113
:param database: Database name in InfluxDB.
114+
:param key_setter: sets the kafka message key for a measurement record.
115+
By default, will set the key to the measurement's name.
116+
:param timestamp_setter: sets the kafka message timestamp for a measurement record.
117+
By default, the timestamp will be the Kafka default (Kafka produce time).
112118
:param start_date: The start datetime for querying InfluxDB. Uses current time by default.
113-
:param end_date: The end datetime for querying InfluxDB. If none provided, runs indefinitely for a single measurement.
119+
:param end_date: The end datetime for querying InfluxDB.
120+
If none provided, runs indefinitely for a single measurement.
114121
:param measurements: The measurements to query. If None, all measurements will be processed.
122+
:param measurement_column_name: The column name used for appending the measurement name to the record.
115123
:param sql_query: Custom SQL query for retrieving data.
116124
Query expects a `{start_time}`, `{end_time}`, and `{measurement_name}` for later formatting.
117125
If provided, it overrides the default window-query logic.
@@ -145,6 +153,9 @@ def __init__(
145153
"org": organization_id,
146154
"database": database,
147155
}
156+
self._measurement_column_name = measurement_column_name
157+
self._key_setter = key_setter or self._default_key_setter
158+
self._timestamp_setter = timestamp_setter
148159
self._measurements = measurements
149160
self._sql_query = _set_sql_query(sql_query or "")
150161
self._start_date = start_date
@@ -167,6 +178,9 @@ def _close_client(self):
167178
self._client.close()
168179
self._client = None
169180

181+
def _default_key_setter(self, record: dict):
182+
return record[self._measurement_column_name]
183+
170184
@property
171185
def _measurement_names(self) -> list[str]:
172186
if not self._measurements:
@@ -189,14 +203,16 @@ def _get_measurements(self) -> list[str]:
189203
return result["name"].tolist()
190204

191205
@with_retry
192-
def _produce_records(self, records: list[dict], measurement_name: str):
206+
def _produce_records(self, records: list[dict]):
193207
for record in records:
194-
# TODO: a key, value, and timestamp setter
195208
msg = self.serialize(
196-
key=f"{measurement_name}_{random.randint(1, 1000)}", # noqa: S311
209+
key=self._key_setter(record),
197210
value=record,
211+
timestamp_ms=self._timestamp_setter(record)
212+
if self._timestamp_setter
213+
else None,
198214
)
199-
self.produce(value=msg.value, key=msg.key)
215+
self.produce(value=msg.value, key=msg.key, timestamp=msg.timestamp)
200216
self.producer.flush()
201217

202218
@with_retry
@@ -249,10 +265,9 @@ def _process_measurement(self, measurement_name):
249265
if data is not None and not data.empty:
250266
if "iox::measurement" in data.columns:
251267
data = data.drop(columns=["iox::measurement"])
252-
data["measurement_name"] = measurement_name
268+
data[self._measurement_column_name] = measurement_name
253269
self._produce_records(
254270
json.loads(data.to_json(orient="records", date_format="iso")),
255-
data,
256271
)
257272

258273
start_time = end_time

0 commit comments

Comments
 (0)