|
| 1 | +# InfluxDB v1 Sink |
| 2 | + |
| 3 | +!!! info |
| 4 | + |
| 5 | + This is a **Community** connector. Test it before using in production. |
| 6 | + |
| 7 | + To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page. |
| 8 | + |
| 9 | +InfluxDB is an open source time series database for metrics, events, and real-time analytics. |
| 10 | + |
| 11 | +Quix Streams provides a sink to write processed data to InfluxDB v1. |
| 12 | + |
| 13 | +>***NOTE***: This sink only supports InfluxDB v1. |
| 14 | +
|
| 15 | +## How To Install |
| 16 | +The dependencies for this sink are not included to the default `quixstreams` package. |
| 17 | + |
| 18 | +To install them, run the following command: |
| 19 | + |
| 20 | +```commandline |
| 21 | +pip install quixstreams[influxdb1] |
| 22 | +``` |
| 23 | + |
| 24 | +## How To Use |
| 25 | + |
| 26 | +To sink data to InfluxDB v1, you need to create an instance of `InfluxDB1Sink` and pass |
| 27 | +it to the `StreamingDataFrame.sink()` method: |
| 28 | + |
| 29 | +```python |
| 30 | +from quixstreams import Application |
| 31 | +from quixstreams.sinks.community.influxdb1 import InfluxDB1Sink |
| 32 | + |
| 33 | +app = Application(broker_address="localhost:9092") |
| 34 | +topic = app.topic("numbers-topic") |
| 35 | + |
| 36 | +# Initialize InfluxDB1Sink |
| 37 | +influx_sink = InfluxDB1Sink( |
| 38 | + host="<influxdb-host>", |
| 39 | + database="<influxdb-database>", |
| 40 | + username="<infludb-username>", |
| 41 | + password="<influxdb-password>", |
| 42 | + measurement="numbers", |
| 43 | + fields_keys=["number"], |
| 44 | + tags_keys=["tag"] |
| 45 | +) |
| 46 | + |
| 47 | +sdf = app.dataframe(topic) |
| 48 | +# Do some processing here ... |
| 49 | +# Sink data to InfluxDB |
| 50 | +sdf.sink(influx_sink) |
| 51 | + |
| 52 | +if __name__ == '__main__': |
| 53 | + app.run() |
| 54 | +``` |
| 55 | + |
| 56 | +## How It Works |
| 57 | +`InfluxDB1Sink` is a batching sink. |
| 58 | +It batches processed records in memory per topic partition, and writes them to the InfluxDB instance when a checkpoint has been committed. |
| 59 | + |
| 60 | +Under the hood, it transforms data to the Influx format using and writes processed records in batches. |
| 61 | + |
| 62 | +### What data can be sent to InfluxDB? |
| 63 | + |
| 64 | +`InfluxDB1Sink` can accept only dictionaries values. |
| 65 | + |
| 66 | +If the record values are not dicts, you need to convert them to dicts using `StreamingDataFrame.apply()` before sinking. |
| 67 | + |
| 68 | +The structure of the sinked data is defined by the `fields_keys` and `tags_keys` parameters provided to the sink class. |
| 69 | + |
| 70 | +- `fields_keys` - a list of keys to be used as "fields" when writing to InfluxDB. |
| 71 | +If present, its keys cannot overlap with any in `tags_keys`. |
| 72 | +If empty, the whole record value will be used. |
| 73 | +The fields' values can only be strings, floats, integers, or booleans. |
| 74 | + |
| 75 | +- `tags_keys` - a list of keys to be used as "tags" when writing to InfluxDB. |
| 76 | +If present, its keys cannot overlap with any in `fields_keys`. |
| 77 | +These keys will be popped from the value dictionary automatically because InfluxDB doesn't allow the same keys be both in tags and fields. |
| 78 | +If empty, no tags will be sent. |
| 79 | +>***NOTE***: InfluxDB client always converts tag values to strings. |
| 80 | +
|
| 81 | +To learn more about schema design and data types in InfluxDB, please read [InfluxDB schema design recommendations](https://docs.influxdata.com/influxdb/cloud-serverless/write-data/best-practices/schema-design/). |
| 82 | + |
| 83 | +## Delivery Guarantees |
| 84 | +`InfluxDB1Sink` provides at-least-once guarantees, and the same records may be written multiple times in case of errors during processing. |
| 85 | + |
| 86 | +## Backpressure Handling |
| 87 | +InfluxDB1Sink automatically handles events when the database cannot accept new data due to write limits. |
| 88 | + |
| 89 | +When this happens, the application loses the accumulated in-memory batch and pauses the corresponding topic partition for a timeout duration returned by InfluxDB API (it returns an HTTP error with 429 status code and a `Retry-After` header with a timeout). |
| 90 | +When the timeout expires, the app automatically resumes the partition to re-process the data and sink it again. |
| 91 | + |
| 92 | +## Configuration |
| 93 | +InfluxDB1Sink accepts the following configuration parameters: |
| 94 | + |
| 95 | +- `host` - InfluxDB host in format "https://<host>" |
| 96 | + |
| 97 | +- `pot` - InfluxDB port. Default - `8086`. |
| 98 | + |
| 99 | +- `database` - a database name. |
| 100 | + |
| 101 | +- `username` - a username. |
| 102 | + |
| 103 | +- `password` - a password. |
| 104 | + |
| 105 | +- `measurement` - a measurement name, required. |
| 106 | + |
| 107 | +- `fields_keys` - an iterable (list) of strings used as InfluxDB "fields". |
| 108 | + Also accepts a single-argument callable that receives the current message data as a dict and returns an iterable of strings. |
| 109 | + - If present, it must not overlap with "tags_keys". |
| 110 | + - If empty, the whole record value will be used. |
| 111 | +See the [What data can be sent to InfluxDB](#what-data-can-be-sent-to-influxdb) for more info. |
| 112 | + |
| 113 | +- `tags_keys` - an iterable (list) of strings used as InfluxDB "tags". |
| 114 | + Also accepts a single-argument callable that receives the current message data as a |
| 115 | + dict and returns an iterable of strings. |
| 116 | + - If present, it must not overlap with "fields_keys". |
| 117 | + - Given keys are popped from the value dictionary since the same key |
| 118 | + cannot be both a tag and field. |
| 119 | + - If empty, no tags will be sent. |
| 120 | +See the [What data can be sent to InfluxDB](#what-data-can-be-sent-to-influxdb) for more info. |
| 121 | + |
| 122 | +- `time_setter` - an optional column name to use as "time" for InfluxDB. |
| 123 | + Also accepts a callable which receives the current message data and |
| 124 | + returns either the desired time or `None` (use default). |
| 125 | + - The time can be an `int`, `string` (RFC3339 format), or `datetime`. |
| 126 | + - The time must match the `time_precision` argument if not a `datetime` object, else raises. |
| 127 | + - By default, a record's kafka timestamp with `"ms"` time precision is used. |
| 128 | + |
| 129 | +- `time_precision` - a time precision to use when writing to InfluxDB. |
| 130 | +Default - `ms`. |
| 131 | + |
| 132 | +- `include_metadata_tags` - if True, includes the record's key, topic, and partition as tags. |
| 133 | +Default - `False`. |
| 134 | + |
| 135 | +- `convert_ints_to_floats` - if True, converts all integer values to floats. |
| 136 | +Default - `False`. |
| 137 | + |
| 138 | +- `batch_size` - the number of records to write to InfluxDB in one request. |
| 139 | +Note that it only affects the size of one write request, and not the number of records flushed on each checkpoint. |
| 140 | +Default - `1000`. |
| 141 | + |
| 142 | +- `request_timeout_ms` - an HTTP request timeout in milliseconds. |
| 143 | +Default - `10000`. |
0 commit comments