Skip to content

Commit c95e59b

Browse files
authored
add influxdb1sink (#936)
1 parent f8f3cab commit c95e59b

File tree

3 files changed

+360
-1
lines changed

3 files changed

+360
-1
lines changed

conda/post-link.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ $PREFIX/bin/pip install \
77
'pyiceberg[pyarrow,glue]>=0.7' \
88
'redis[hiredis]>=5.2.0,<6' \
99
'confluent-kafka[avro,json,protobuf,schemaregistry]>=2.8.2,<2.10' \
10-
'influxdb3-python[pandas]>=0.7,<1.0' \
10+
'influxdb>=5.3,<6' \
1111
'jsonpath_ng>=1.7.0,<2'

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@ all = [
4343
"pymongo>=4.11,<5",
4444
"pandas>=1.0.0,<3.0",
4545
"elasticsearch>=8.17,<9",
46+
"influxdb>=5.3,<6"
4647
]
4748

4849
avro = ["fastavro>=1.8,<2.0"]
4950
protobuf = ["protobuf>=5.27.2,<7.0"]
51+
influxdb1 = ["influxdb>=5.3,<6"]
5052
influxdb3 = ["influxdb3-python[pandas]>=0.7,<1.0"]
5153
iceberg = ["pyiceberg[pyarrow]>=0.7"]
5254
iceberg_aws = ["pyiceberg[pyarrow,glue]>=0.7"]
Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
import logging
2+
import sys
3+
import time
4+
from datetime import datetime, timezone
5+
from typing import Any, Callable, Iterable, Literal, Mapping, Optional, Union, get_args
6+
7+
from quixstreams.models import HeadersTuples
8+
9+
try:
10+
from influxdb import InfluxDBClient
11+
from influxdb.exceptions import InfluxDBClientError
12+
except ImportError as exc:
13+
raise ImportError(
14+
'Package "influxdb" is missing: '
15+
"run pip install quixstreams[influxdb1] to fix it"
16+
) from exc
17+
18+
from quixstreams.sinks.base import (
19+
BatchingSink,
20+
ClientConnectFailureCallback,
21+
ClientConnectSuccessCallback,
22+
SinkBackpressureError,
23+
SinkBatch,
24+
)
25+
26+
logger = logging.getLogger(__name__)
27+
28+
29+
TimePrecision = Literal["ms", "ns", "us", "s"]
30+
TIME_PRECISION_LEN = {
31+
"s": 10,
32+
"ms": 13,
33+
"ns": 16,
34+
"us": 19,
35+
}
36+
37+
InfluxDBValueMap = dict[str, Union[str, int, float, bool]]
38+
39+
FieldsCallable = Callable[[InfluxDBValueMap], Iterable[str]]
40+
MeasurementCallable = Callable[[InfluxDBValueMap], str]
41+
TagsCallable = Callable[[InfluxDBValueMap], Iterable[str]]
42+
TimeCallable = Callable[[InfluxDBValueMap], Optional[Union[str, int, datetime]]]
43+
44+
FieldsSetter = Union[Iterable[str], FieldsCallable]
45+
MeasurementSetter = Union[str, MeasurementCallable]
46+
TagsSetter = Union[Iterable[str], TagsCallable]
47+
TimeSetter = Union[str, TimeCallable]
48+
49+
50+
class InfluxDB1Sink(BatchingSink):
51+
_TIME_PRECISIONS = {
52+
"s": "s",
53+
"ms": "ms",
54+
"ns": "n",
55+
"us": "u",
56+
}
57+
58+
def __init__(
59+
self,
60+
host: str,
61+
database: str,
62+
measurement: MeasurementSetter,
63+
port: int = 8086,
64+
username: Optional[str] = None,
65+
password: Optional[str] = None,
66+
fields_keys: FieldsSetter = (),
67+
tags_keys: TagsSetter = (),
68+
time_setter: Optional[TimeSetter] = None,
69+
time_precision: TimePrecision = "ms",
70+
allow_missing_fields: bool = False,
71+
include_metadata_tags: bool = False,
72+
convert_ints_to_floats: bool = False,
73+
batch_size: int = 1000,
74+
request_timeout_ms: int = 10_000,
75+
on_client_connect_success: Optional[ClientConnectSuccessCallback] = None,
76+
on_client_connect_failure: Optional[ClientConnectFailureCallback] = None,
77+
):
78+
"""
79+
A connector to sink processed data to InfluxDB v1.
80+
81+
It batches the processed records in memory per topic partition, converts
82+
them to the InfluxDB format, and flushes them to InfluxDB at the checkpoint.
83+
84+
The InfluxDB sink transparently handles backpressure if the destination instance
85+
cannot accept more data at the moment
86+
(e.g., when InfluxDB returns an HTTP 429 error with the "retry_after" header set).
87+
When this happens, the sink will notify the Application to pause consuming
88+
from the backpressured topic partition until the "retry_after" timeout elapses.
89+
90+
>***NOTE***: InfluxDB1Sink can accept only dictionaries.
91+
> If the record values are not dicts, you need to convert them to dicts before
92+
> sinking.
93+
:param host: InfluxDB host in format "https://<host>"
94+
:param database: database name
95+
:param measurement: measurement name as a string.
96+
Also accepts a single-argument callable that receives the current message
97+
data as a dict and returns a string.
98+
:param username: database username
99+
:param password: database password
100+
:param fields_keys: an iterable (list) of strings used as InfluxDB "fields".
101+
Also accepts a single-argument callable that receives the current message
102+
data as a dict and returns an iterable of strings.
103+
- If present, it must not overlap with "tags_keys".
104+
- If empty, the whole record value will be used.
105+
>***NOTE*** The fields' values can only be strings, floats, integers, or booleans.
106+
Default - `()`.
107+
:param tags_keys: an iterable (list) of strings used as InfluxDB "tags".
108+
Also accepts a single-argument callable that receives the current message
109+
data as a dict and returns an iterable of strings.
110+
- If present, it must not overlap with "fields_keys".
111+
- Given keys are popped from the value dictionary since the same key
112+
cannot be both a tag and field.
113+
- If empty, no tags will be sent.
114+
>***NOTE***: InfluxDB client always converts tag values to strings.
115+
Default - `()`.
116+
:param time_setter: an optional column name to use as "time" for InfluxDB.
117+
Also accepts a callable which receives the current message data and
118+
returns either the desired time or `None` (use default).
119+
The time can be an `int`, `string` (RFC3339 format), or `datetime`.
120+
The time must match the `time_precision` argument if not a `datetime` object, else raises.
121+
By default, a record's kafka timestamp with "ms" time precision is used.
122+
:param time_precision: a time precision to use when writing to InfluxDB.
123+
Possible values: "ms", "ns", "us", "s".
124+
Default - `"ms"`.
125+
:param allow_missing_fields: if `True`, skip the missing fields keys, else raise `KeyError`.
126+
Default - `False`
127+
:param include_metadata_tags: if True, includes record's key, topic,
128+
and partition as tags.
129+
Default - `False`.
130+
:param convert_ints_to_floats: if True, converts all integer values to floats.
131+
Default - `False`.
132+
:param batch_size: how many records to write to InfluxDB in one request.
133+
Note that it only affects the size of one write request, and not the number
134+
of records flushed on each checkpoint.
135+
Default - `1000`.
136+
:param request_timeout_ms: an HTTP request timeout in milliseconds.
137+
Default - `10000`.
138+
:param on_client_connect_success: An optional callback made after successful
139+
client authentication, primarily for additional logging.
140+
:param on_client_connect_failure: An optional callback made after failed
141+
client authentication (which should raise an Exception).
142+
Callback should accept the raised Exception as an argument.
143+
Callback must resolve (or propagate/re-raise) the Exception.
144+
"""
145+
146+
super().__init__(
147+
on_client_connect_success=on_client_connect_success,
148+
on_client_connect_failure=on_client_connect_failure,
149+
)
150+
self._request_timeout_ms = request_timeout_ms
151+
152+
if time_precision not in (time_args := get_args(TimePrecision)):
153+
raise ValueError(
154+
f"Invalid 'time_precision' argument {time_precision}; "
155+
f"valid options: {time_args}"
156+
)
157+
if not callable(fields_keys) and not callable(tags_keys):
158+
fields_tags_keys_overlap = set(fields_keys) & set(tags_keys)
159+
if fields_tags_keys_overlap:
160+
overlap_str = ",".join(str(k) for k in fields_tags_keys_overlap)
161+
raise ValueError(
162+
f'Keys {overlap_str} are present in both "fields_keys" and "tags_keys"'
163+
)
164+
165+
self._client_args = {
166+
"host": host,
167+
"port": port,
168+
"database": database,
169+
"username": username,
170+
"password": password,
171+
"timeout": self._request_timeout_ms,
172+
}
173+
self._client: Optional[InfluxDBClient] = None
174+
self._measurement = _measurement_callable(measurement)
175+
self._fields_keys = _fields_callable(fields_keys)
176+
self._tags_keys = _tags_callable(tags_keys)
177+
self._time_setter = _time_callable(time_setter)
178+
self._include_metadata_tags = include_metadata_tags
179+
self._time_precision = self._TIME_PRECISIONS[time_precision]
180+
self._batch_size = batch_size
181+
self._allow_missing_fields = allow_missing_fields
182+
self._convert_ints_to_floats = convert_ints_to_floats
183+
184+
def setup(self):
185+
self._client = InfluxDBClient(**self._client_args)
186+
existing_dbs = [db["name"] for db in self._client.get_list_database()]
187+
if (db := self._client_args["database"]) not in existing_dbs:
188+
logger.info(f"Database '{db}' not found. Creating it...")
189+
self._client.create_database(db)
190+
else:
191+
logger.debug(f"Database '{db}' already exists.")
192+
193+
def add(
194+
self,
195+
value: Any,
196+
key: Any,
197+
timestamp: int,
198+
headers: HeadersTuples,
199+
topic: str,
200+
partition: int,
201+
offset: int,
202+
):
203+
if not isinstance(value, Mapping):
204+
raise TypeError(
205+
f'Sink "{self.__class__.__name__}" supports only dictionaries,'
206+
f" got {type(value)}"
207+
)
208+
return super().add(
209+
value=value,
210+
key=key,
211+
timestamp=timestamp,
212+
headers=headers,
213+
topic=topic,
214+
partition=partition,
215+
offset=offset,
216+
)
217+
218+
def write(self, batch: SinkBatch):
219+
measurement = self._measurement
220+
fields_keys = self._fields_keys
221+
tags_keys = self._tags_keys
222+
time_setter = self._time_setter
223+
for write_batch in batch.iter_chunks(n=self._batch_size):
224+
records = []
225+
226+
min_timestamp = None
227+
max_timestamp = None
228+
229+
for item in write_batch:
230+
value = item.value
231+
# Evaluate these before we alter the value
232+
_measurement = measurement(value)
233+
_tags_keys = tags_keys(value)
234+
_fields_keys = fields_keys(value)
235+
ts = time_setter(value)
236+
237+
tags = {}
238+
for tag_key in _tags_keys:
239+
# TODO: InfluxDB client always converts tags values to strings
240+
# by doing str().
241+
# We may add some extra validation here in the future to prevent
242+
# unwanted conversion.
243+
if tag_key in value:
244+
tag = value.pop(tag_key)
245+
tags[tag_key] = tag
246+
247+
if ts is None:
248+
ts = item.timestamp
249+
# Note: currently NOT validating the timestamp itself is valid
250+
elif not isinstance(ts, valid := (str, int, datetime)):
251+
raise TypeError(
252+
f'InfluxDB1 "time" field expects: {valid}, got {type(ts)}'
253+
)
254+
255+
if isinstance(ts, int):
256+
time_len = len(str(ts))
257+
expected = TIME_PRECISION_LEN[self._time_precision]
258+
if time_len != expected:
259+
raise ValueError(
260+
f'`time_precision` of "{self._time_precision}" '
261+
f"expects a {expected}-digit integer epoch, "
262+
f"got {time_len} (timestamp: {ts})."
263+
)
264+
265+
if self._include_metadata_tags:
266+
tags["__key"] = item.key
267+
tags["__topic"] = batch.topic
268+
tags["__partition"] = batch.partition
269+
270+
if _fields_keys:
271+
fields = {
272+
f: value[f]
273+
for f in _fields_keys
274+
if f in value or not self._allow_missing_fields
275+
}
276+
else:
277+
fields = value
278+
279+
if self._convert_ints_to_floats:
280+
fields = {
281+
k: float(v) if type(v) is int else v # avoids bool matching
282+
for k, v in fields.items()
283+
}
284+
285+
record = {
286+
"measurement": _measurement,
287+
"tags": tags,
288+
"fields": fields,
289+
"time": ts,
290+
}
291+
records.append(record)
292+
min_timestamp = min(ts, min_timestamp or _ts_min_default(ts))
293+
max_timestamp = max(ts, max_timestamp or _ts_max_default(ts))
294+
295+
try:
296+
_start = time.monotonic()
297+
self._client.write_points(
298+
points=records, time_precision=self._time_precision
299+
)
300+
elapsed = round(time.monotonic() - _start, 2)
301+
logger.info(
302+
f"Sent data to InfluxDB; "
303+
f"total_records={len(records)} "
304+
f"min_timestamp={min_timestamp} "
305+
f"max_timestamp={max_timestamp} "
306+
f"time_elapsed={elapsed}s"
307+
)
308+
except InfluxDBClientError as exc:
309+
if exc.content and exc.code == 429:
310+
# The write limit is exceeded, raise a SinkBackpressureError
311+
# to pause the partition for a certain period of time.
312+
raise SinkBackpressureError(retry_after=int(15)) from exc
313+
raise
314+
315+
316+
def _ts_min_default(timestamp: Union[int, str, datetime]):
317+
if isinstance(timestamp, int):
318+
return sys.maxsize
319+
elif isinstance(timestamp, str):
320+
return "~" # lexicographically largest ASCII char
321+
elif isinstance(timestamp, datetime):
322+
return datetime.max.replace(tzinfo=timezone.utc)
323+
324+
325+
def _ts_max_default(timestamp: Union[int, str, datetime]):
326+
if isinstance(timestamp, int):
327+
return -1
328+
elif isinstance(timestamp, str):
329+
return ""
330+
elif isinstance(timestamp, datetime):
331+
return datetime.min.replace(tzinfo=timezone.utc)
332+
333+
334+
def _measurement_callable(setter: MeasurementSetter) -> MeasurementCallable:
335+
if callable(setter):
336+
return setter
337+
return lambda value: setter
338+
339+
340+
def _fields_callable(setter: FieldsSetter) -> FieldsCallable:
341+
if callable(setter):
342+
return setter
343+
return lambda value: setter
344+
345+
346+
def _tags_callable(setter: TagsSetter) -> TagsCallable:
347+
if callable(setter):
348+
return setter
349+
return lambda value: setter
350+
351+
352+
def _time_callable(setter: Optional[TimeSetter]) -> TimeCallable:
353+
if callable(setter):
354+
return setter
355+
if isinstance(setter, str):
356+
return lambda value: value[setter] # type: ignore
357+
return lambda value: None # the kafka timestamp will be used

0 commit comments

Comments
 (0)