Skip to content

Commit 1dca50f

Browse files
committed
update documentation
1 parent 56a065b commit 1dca50f

File tree

2 files changed

+130
-1
lines changed

2 files changed

+130
-1
lines changed
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# InfluxDB v3 Source
2+
3+
!!! info
4+
5+
This is a **Community** connector. Test it before using in production.
6+
7+
To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page.
8+
9+
10+
InfluxDB is an open source time series database for metrics, events, and real-time analytics.
11+
12+
Quix Streams provides a source to extract "measurements" from InfluxDB v3 databases
13+
and dump them to a Kafka topic.
14+
15+
>***NOTE***: This source only supports InfluxDB v3. Versions 1 and 2 are not supported.
16+
17+
## How to Install
18+
19+
Install Quix Streams with the following optional dependencies:
20+
21+
```bash
22+
pip install quixstreams[influxdb3]
23+
```
24+
25+
## How it Works
26+
27+
`InfluxDB3Source` extracts data from a specified set of measurements in a
28+
database (or all available ones if none are specified).
29+
30+
It processes measurements sequentially by gathering/producing a tumbling
31+
"time_delta"-sized window of data, starting from a specified 'start_date' and
32+
eventually stopping at a specified 'end_date', completing that measurement.
33+
34+
It then starts the next measurement, continuing until all are complete.
35+
36+
Note that 'end_date' is optional; when not provided, it will run indefinitely for a
37+
single measurement (which means no other measurements will be processed!).
38+
39+
## How to Use
40+
41+
To use Kinesis Source, hand `InfluxDB3Source` to `app.dataframe()`.
42+
43+
For more details around various settings, see [configuration](#configuration).
44+
45+
```python
46+
from quixstreams import Application
47+
from quixstreams.sources.community.influxdb3 import InfluxDB3Source
48+
from datetime import datetime, timedelta, timezone
49+
50+
app = Application(broker_address="localhost:9092")
51+
topic = app.topic("influx-topic")
52+
53+
influx = InfluxDB3Source(
54+
token="<influxdb-access-token>",
55+
host="<influxdb-host>",
56+
organization_id="<influxdb-org>",
57+
database="<influxdb-database>",
58+
measurements="my-measurement",
59+
start_date=datetime.now(tz=timezone.utc) - timedelta(days=2),
60+
end_date=datetime.now(tz=timezone.utc),
61+
)
62+
63+
app = Application(
64+
broker_address="<YOUR BROKER INFO>",
65+
consumer_group="<YOUR GROUP>",
66+
)
67+
68+
sdf = app.dataframe(source=influx).print(metadata=True)
69+
# YOUR LOGIC HERE!
70+
sdf.to_topic(topic)
71+
72+
if __name__ == "__main__":
73+
app.run()
74+
```
75+
76+
77+
## Configuration
78+
Here are the InfluxDB-related configurations to be aware of (see [InfluxDB3Source API](../../api-reference/sources.md#influxdb3source) for all parameters).
79+
80+
### Required:
81+
82+
- `host`: Host URL of the InfluxDB instance.
83+
- `token`: Authentication token for InfluxDB.
84+
- `organization_id`: Organization name in InfluxDB.
85+
- `database`: Database name in InfluxDB.
86+
87+
88+
### Optional:
89+
90+
- `start_date`: The start datetime for querying InfluxDB. Uses current time by default.
91+
- `end_date`: The end datetime for querying InfluxDB. If none provided, runs indefinitely for a single measurement.
92+
- `measurements`: The measurements to query. If None, all measurements will be processed.
93+
- `sql_query`: Custom SQL query for retrieving data.
94+
Query expects a `{start_time}`, `{end_time}`, and `{measurement_name}` for later formatting.
95+
If provided, it overrides the default window-query logic.
96+
- `time_delta`: Time interval for batching queries, e.g. "5m" for 5 minutes.
97+
- `delay`: Add a delay (in seconds) between producing batches.
98+
- `max_retries`: Maximum number of retries for querying or producing;
99+
Note that consecutive retries have a multiplicative backoff.
100+
101+
102+
## Testing Locally
103+
104+
Rather than connect to a hosted InfluxDB3 instance, you can alternatively test your
105+
application using a local instance of Influxdb3 using Docker:
106+
107+
1. Execute in terminal:
108+
109+
```bash
110+
docker run --rm -d --name influxdb3 \
111+
-p 8181:8181 \
112+
quay.io/influxdb/influxdb3-core:latest \
113+
serve --node-id=host0 --object-store=memory
114+
```
115+
116+
2. Use the following settings for `InfluxDB3Source` to connect:
117+
118+
```python
119+
InfluxDB3Source(
120+
host="http://localhost:8181", # be sure to add http
121+
organization_id="local", # unused, but required
122+
token="local", # unused, but required
123+
)
124+
```
125+
126+
The `database` you provide will be auto-created for you.

quixstreams/sources/community/influxdb3/influxdb3.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,15 @@ def __init__(
112112
:param start_date: The start datetime for querying InfluxDB. Uses current time by default.
113113
:param end_date: The end datetime for querying InfluxDB. If none provided, runs indefinitely for a single measurement.
114114
:param measurements: The measurements to query. If None, all measurements will be processed.
115-
:param sql_query: Custom SQL query for retrieving data. If provided, it overrides the default logic.
115+
:param sql_query: Custom SQL query for retrieving data.
116+
Query expects a `{start_time}`, `{end_time}`, and `{measurement_name}` for later formatting.
117+
If provided, it overrides the default window-query logic.
116118
:param time_delta: Time interval for batching queries, e.g., "5m" for 5 minutes.
117119
:param delay: An optional delay between producing batches.
118120
:param name: A unique name for the Source, used as part of the topic name.
119121
:param shutdown_timeout: Time in seconds to wait for graceful shutdown.
120122
:param max_retries: Maximum number of retries for querying or producing.
123+
Note that consecutive retries have a multiplicative backoff.
121124
:param on_client_connect_success: An optional callback made after successful
122125
client authentication, primarily for additional logging.
123126
:param on_client_connect_failure: An optional callback made after failed

0 commit comments

Comments
 (0)