Skip to content

Commit aaa0486

Browse files
daniil-quixgwaramadzetim-quix
authored
Add Redis Sink (#655)
Co-authored-by: Remy Gwaramadze <gwaramadze@users.noreply.github.com> Co-authored-by: Tim Sawicki <136370015+tim-quix@users.noreply.github.com>
1 parent 537fa61 commit aaa0486

File tree

7 files changed

+239
-1
lines changed

7 files changed

+239
-1
lines changed

LICENSES/LICENSE.redis-py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2022-2023, Redis, inc.
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

conda/post-link.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ $PREFIX/bin/pip install \
44
'rocksdict>=0.3,<0.4' \
55
'protobuf>=5.27.2,<6.0' \
66
'influxdb3-python>=0.7,<1.0' \
7-
'pyiceberg[pyarrow,glue]>=0.7,<0.8'
7+
'pyiceberg[pyarrow,glue]>=0.7,<0.8' \
8+
'redis[hiredis]>=5.2.0,<6'

docs/build/build.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@
129129
"quixstreams.sinks.community.pubsub",
130130
"quixstreams.sinks.community.postgresql",
131131
"quixstreams.sinks.community.kinesis",
132+
"quixstreams.sinks.community.redis",
132133
]
133134
},
134135
# Order: base, core, community

docs/connectors/sinks/redis-sink.md

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# Redis Sink
2+
3+
!!! info
4+
5+
This is a **Community** connector. Test it before using in production.
6+
7+
To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page.
8+
9+
Redis is an in-memory database that persists on disk.
10+
11+
Quix Streams provides a sink to write processed data to Redis.
12+
13+
## How To Install
14+
15+
The dependencies for this sink are not included to the default `quixstreams` package.
16+
17+
To install them, run the following command:
18+
19+
```commandline
20+
pip install quixstreams[redis]
21+
```
22+
23+
## How To Use
24+
25+
To sink data to Redis, you need to create an instance of `RedisSink` and pass
26+
it to the `StreamingDataFrame.sink()` method:
27+
28+
```python
29+
import json
30+
31+
from quixstreams import Application
32+
from quixstreams.sinks.community.redis import RedisSink
33+
34+
app = Application(
35+
broker_address="localhost:9092",
36+
auto_offset_reset="earliest",
37+
consumer_group="consumer-group",
38+
)
39+
40+
topic = app.topic("topic-name")
41+
42+
# Initialize a sink
43+
redis_sink = RedisSink(
44+
host="<Redis host>",
45+
port="<Redis port>",
46+
db="<Redis db>",
47+
value_serializer=json.dumps,
48+
key_serializer=None,
49+
password=None,
50+
socket_timeout=30.0,
51+
)
52+
53+
sdf = app.dataframe(topic)
54+
sdf.sink(redis_sink)
55+
56+
if __name__ == '__main__':
57+
app.run()
58+
```
59+
60+
## How It Works
61+
62+
`RedisSink` is a batching sink.
63+
It batches processed records in memory per topic partition, and writes them to Redis
64+
when a checkpoint has been committed.
65+
66+
### Data serialization
67+
68+
By default, `RedisSink` serializes records values to JSON and uses Kafka message keys as
69+
Redis keys.
70+
71+
If you want to use a different encoding or change what keys will be inserted to Redis,
72+
you may use `key_serializer` and `value_serializer` callbacks.
73+
74+
**Example**:
75+
76+
Use a combination of record's key and value to create a new Redis key,
77+
and convert values using the MessagePack format instead of JSON.
78+
79+
```python
80+
from quixstreams import Application
81+
from quixstreams.sinks.community.redis import RedisSink
82+
83+
# Assuming "msgpack-python" is installed
84+
import msgpack
85+
86+
app = Application(
87+
broker_address="localhost:9092",
88+
auto_offset_reset="earliest",
89+
consumer_group="consumer-group",
90+
)
91+
92+
topic = app.topic("topic-name")
93+
94+
redis_sink = RedisSink(
95+
host="<Redis host>",
96+
port="<Redis port>",
97+
db="<Redis db>",
98+
# Serialize records' values using msgpack format before writing to Redis
99+
value_serializer=msgpack.dumps,
100+
# Combine a new Redis key from the record's key and value.
101+
key_serializer=lambda key, value: f'{key}-{value}',
102+
)
103+
104+
sdf = app.dataframe(topic)
105+
sdf.sink(redis_sink)
106+
107+
if __name__ == '__main__':
108+
app.run()
109+
```
110+
111+
### Atomic Writes
112+
113+
`RedisSink`
114+
uses the [Redis Transactions](https://redis.io/docs/latest/develop/interact/transactions/)
115+
feature to ensure that all updates are executed atomically.
116+
117+
## Delivery Guarantees
118+
119+
`RedisSink` provides at-least-once guarantees, and the same records may be written
120+
multiple times in case of errors during processing.
121+
122+
## Configuration
123+
124+
Main configuration parameters:
125+
126+
- `host`: a Redis db host.
127+
- `port`: a Redis db port.
128+
- `db`: a Redis db number.
129+
- `value_serializer`: a callable to serialize records' values.
130+
- `key_serializer`: a callable to serialize records' keys.
131+
132+
For the full list of expected parameters, see
133+
the [RedisSink API](../../api-reference/sinks.md#redissink) page

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ all = [
3737
"psycopg2-binary>=2.9.9,<3",
3838
"boto3>=1.35.65,<2.0",
3939
"boto3-stubs>=1.35.65,<2.0",
40+
"redis[hiredis]>=5.2.0,<6"
4041
]
4142

4243
avro = ["fastavro>=1.8,<2.0"]
@@ -48,6 +49,7 @@ bigquery = ["google-cloud-bigquery>=3.26.0,<3.27"]
4849
pubsub = ["google-cloud-pubsub>=2.23.1,<3"]
4950
postgresql = ["psycopg2-binary>=2.9.9,<3"]
5051
kinesis = ["boto3>=1.35.65,<2.0", "boto3-stubs[kinesis]>=1.35.65,<2.0"]
52+
redis=["redis[hiredis]>=5.2.0,<6"]
5153

5254
[tool.setuptools.packages.find]
5355
include = ["quixstreams*"]

quixstreams/sinks/community/redis.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import json
2+
import logging
3+
import time
4+
from typing import Any, Callable, Optional, Union
5+
6+
try:
7+
import redis
8+
except ImportError as exc:
9+
raise ImportError(
10+
f"Package {exc.name} is missing: "
11+
'run "pip install quixstreams[redis]" to use RedisSink'
12+
) from exc
13+
14+
from quixstreams.sinks import BatchingSink, SinkBatch
15+
16+
__all__ = ("RedisSink",)
17+
18+
logger = logging.getLogger(__name__)
19+
20+
21+
class RedisSink(BatchingSink):
22+
def __init__(
23+
self,
24+
host: str,
25+
port: int,
26+
db: int,
27+
value_serializer: Callable[[Any], Union[bytes, str]] = json.dumps,
28+
key_serializer: Optional[Callable[[Any, Any], Union[bytes, str]]] = None,
29+
password: Optional[str] = None,
30+
socket_timeout: float = 30.0,
31+
**kwargs,
32+
) -> None:
33+
"""
34+
A connector to sink processed data to Redis.
35+
It batches the processed records in memory per topic partition, and flushes them to Redis at the checkpoint.
36+
37+
:param host: Redis host.
38+
:param port: Redis port.
39+
:param db: Redis DB number.
40+
:param value_serializer: a callable to serialize the value to string or bytes
41+
(defaults to json.dumps).
42+
:param key_serializer: an optional callable to serialize the key to string or bytes.
43+
If not provided, the Kafka message key will be used as is.
44+
:param password: Redis password, optional.
45+
:param socket_timeout: Redis socket timeout.
46+
Default - 30s.
47+
:param kwargs: Additional keyword arguments passed to the `redis.Redis` instance.
48+
"""
49+
50+
super().__init__()
51+
self._redis_uri = f"{host}:{port}/{db}"
52+
self._client = redis.Redis(
53+
host=host,
54+
port=port,
55+
db=db,
56+
password=password,
57+
socket_timeout=socket_timeout,
58+
**kwargs,
59+
)
60+
self._key_serializer = key_serializer
61+
self._value_serializer = value_serializer
62+
63+
def write(self, batch: SinkBatch) -> None:
64+
# Execute Redis updates atomically using a transaction pipeline
65+
start = time.monotonic()
66+
with self._client.pipeline(transaction=True) as pipe:
67+
for item in batch:
68+
key = item.key
69+
if self._key_serializer is not None:
70+
key = self._key_serializer(key, item.value)
71+
value = self._value_serializer(item.value)
72+
pipe.set(key, value)
73+
keys_updated = len(pipe)
74+
pipe.execute(raise_on_error=True)
75+
time_elapsed = round(time.monotonic() - start, 4)
76+
logger.debug(
77+
f'Updated {keys_updated} keys in a Redis database "{self._redis_uri}" '
78+
f"time_elapsed={time_elapsed}s"
79+
)

tests/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ fastavro>=1.8,<2.0
66
protobuf>=5.27.2
77
influxdb3-python>=0.7.0,<1.0
88
pyiceberg[pyarrow,glue]>=0.7,<0.8
9+
redis[hiredis]>=5.2.0,<6

0 commit comments

Comments
 (0)