Skip to content

Commit f70220b

Browse files
authored
InfluxDB3Source: new connector (#788)
1 parent e3d1c69 commit f70220b

File tree

6 files changed

+455
-3
lines changed

6 files changed

+455
-3
lines changed

conda/meta.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ requirements:
3434
- pandas >=1.0.0,<3.0
3535
- elasticsearch >=8.17,<9
3636
- rich >=13,<15
37+
- influxdb3-python[pandas] >=0.7,<1.0
3738

3839
test:
3940
imports:

conda/post-link.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
$PREFIX/bin/pip install \
44
'rocksdict>=0.3,<0.4' \
55
'protobuf>=5.27.2,<7.0' \
6-
'influxdb3-python>=0.7,<1.0' \
6+
'influxdb3-python[pandas]>=0.7,<1.0' \
77
'pyiceberg[pyarrow,glue]>=0.7' \
88
'redis[hiredis]>=5.2.0,<6' \
99
'confluent-kafka[avro,json,protobuf,schemaregistry]>=2.8.2,<2.10'
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
# InfluxDB v3 Source
2+
3+
!!! info
4+
5+
This is a **Community** connector. Test it before using in production.
6+
7+
To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page.
8+
9+
10+
InfluxDB is an open source time series database for metrics, events, and real-time analytics.
11+
12+
Quix Streams provides a source to extract "measurements" from InfluxDB v3 databases
13+
and dump them to a Kafka topic.
14+
15+
>***NOTE***: This source only supports InfluxDB v3. Versions 1 and 2 are not supported.
16+
17+
## How to Install
18+
19+
Install Quix Streams with the following optional dependencies:
20+
21+
```bash
22+
pip install quixstreams[influxdb3]
23+
```
24+
25+
## How it Works
26+
27+
`InfluxDB3Source` extracts data from a specified set of measurements in a
28+
database (or all available ones if none are specified).
29+
30+
It processes measurements sequentially by gathering/producing a tumbling
31+
"time_delta"-sized window of data, starting from a specified 'start_date' and
32+
eventually stopping at a specified 'end_date', completing that measurement.
33+
34+
It then starts the next measurement, continuing until all are complete.
35+
36+
Note that 'end_date' is optional; when not provided, it will run indefinitely for a
37+
single measurement (which means no other measurements will be processed!).
38+
39+
## How to Use
40+
41+
Import and instantiate an `InfluxDB3Source` instance and hand it to an Application using
42+
`app.add_source(<InfluxDB3Source>)` or instead to a StreamingDataFrame with
43+
`app.dataframe(source=<InfluxDB3Source>)` if further data manipulation is required.
44+
45+
For more details around various settings, see [configuration](#configuration).
46+
47+
```python
48+
from quixstreams import Application
49+
from quixstreams.sources.community.influxdb3 import InfluxDB3Source
50+
from datetime import datetime, timedelta, timezone
51+
52+
app = Application(broker_address="localhost:9092")
53+
topic = app.topic("influx-topic")
54+
55+
influx = InfluxDB3Source(
56+
token="<influxdb-access-token>",
57+
host="<influxdb-host>",
58+
organization_id="<influxdb-org>",
59+
database="<influxdb-database>",
60+
measurements="my-measurement",
61+
start_date=datetime.now(tz=timezone.utc) - timedelta(days=2),
62+
end_date=datetime.now(tz=timezone.utc),
63+
)
64+
65+
app = Application(
66+
broker_address="<YOUR BROKER INFO>",
67+
consumer_group="<YOUR GROUP>",
68+
)
69+
70+
sdf = app.dataframe(source=influx).print(metadata=True)
71+
# YOUR LOGIC HERE!
72+
sdf.to_topic(topic)
73+
74+
if __name__ == "__main__":
75+
app.run()
76+
```
77+
78+
79+
## Configuration
80+
Here are the InfluxDB-related configurations to be aware of (see [InfluxDB3Source API](../../api-reference/sources.md#influxdb3source) for all parameters).
81+
82+
### Required:
83+
84+
- `host`: Host URL of the InfluxDB instance.
85+
- `token`: Authentication token for InfluxDB.
86+
- `organization_id`: Organization name in InfluxDB.
87+
- `database`: Database name in InfluxDB.
88+
89+
90+
### Optional:
91+
- `key_setter`: sets the kafka message key for a measurement record.
92+
By default, will set the key to the measurement's name.
93+
- `timestamp_setter`: sets the kafka message timestamp for a measurement record.
94+
By default, the timestamp will be the Kafka default (Kafka produce time).
95+
- `start_date`: The start datetime for querying InfluxDB.
96+
Uses current time by default.
97+
- `end_date`: The end datetime for querying InfluxDB.
98+
If none provided, runs indefinitely for a single measurement.
99+
- `measurements`: The measurements to query.
100+
If None, all measurements will be processed.
101+
- `measurement_column_name`: The column name used for appending the measurement name to the record.
102+
Default: `_measurement_name`.
103+
- `sql_query`: Custom SQL query for retrieving data.
104+
Query expects a `{start_time}`, `{end_time}`, and `{measurement_name}` for later formatting.
105+
If provided, it overrides the default window-query logic.
106+
- `time_delta`: Time interval for batching queries, e.g. "5m" for 5 minutes.
107+
Default: `5m`.
108+
- `delay`: Add a delay (in seconds) between producing batches.
109+
Default: `0`.
110+
- `max_retries`: Maximum number of retries for querying or producing;
111+
Note that consecutive retries have a multiplicative backoff.
112+
Default: `5`.
113+
114+
115+
## Testing Locally
116+
117+
Rather than connect to a hosted InfluxDB3 instance, you can alternatively test your
118+
application using a local instance of Influxdb3 using Docker:
119+
120+
1. Execute in terminal:
121+
122+
```bash
123+
docker run --rm -d --name influxdb3 \
124+
-p 8181:8181 \
125+
quay.io/influxdb/influxdb3-core:latest \
126+
serve --node-id=host0 --object-store=memory
127+
```
128+
129+
2. Use the following settings for `InfluxDB3Source` to connect:
130+
131+
```python
132+
InfluxDB3Source(
133+
host="http://localhost:8181", # be sure to add http
134+
organization_id="local", # unused, but required
135+
token="local", # unused, but required
136+
database="<YOUR DB>",
137+
)
138+
```
139+
140+
Note: the database must exist for this to successfully run.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ Homepage = "https://github.com/quixio/quix-streams"
3030
all = [
3131
"fastavro>=1.8,<2.0",
3232
"protobuf>=5.27.2,<7.0",
33-
"influxdb3-python>=0.7,<1.0",
33+
"influxdb3-python[pandas]>=0.7,<1.0",
3434
"pyiceberg[pyarrow,glue]>=0.7",
3535
"google-cloud-bigquery>=3.26.0,<3.27",
3636
"google-cloud-pubsub>=2.23.1,<3",
@@ -47,7 +47,7 @@ all = [
4747

4848
avro = ["fastavro>=1.8,<2.0"]
4949
protobuf = ["protobuf>=5.27.2,<7.0"]
50-
influxdb3 = ["influxdb3-python>=0.7,<1.0"]
50+
influxdb3 = ["influxdb3-python[pandas]>=0.7,<1.0"]
5151
iceberg = ["pyiceberg[pyarrow]>=0.7"]
5252
iceberg_aws = ["pyiceberg[pyarrow,glue]>=0.7"]
5353
bigquery = ["google-cloud-bigquery>=3.26.0,<3.27"]
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# ruff: noqa: F403
2+
from .influxdb3 import *

0 commit comments

Comments
 (0)