Skip to content

Commit a016b2a

Browse files
local file sink (#560)
* File sink Only JSON tested. * Fix linting * Rename file_sink.py to file.py * Correct imports and add jsonlines dependency * Use default logging config * Logging error before raising is redundant * Simplify typing * Use defaultdict * Cast key to str later * Use regex substitution to construct filename-safe key * Use pathlib * Refactor _resolve_format * isort fix * Rearrange files * Remove deserialization * Various fixes * Change default JSON file extension from .json to .jsonl More: https://jsonlines.org/ * Handle non-bytes key in JSONFormat.serialize * Use jsonlines.Writer.write_all * Remove redundant "inner" serialization * Refactor JSONFormat Default JSONEncoder used by jsonlines is fine and it will utilize `compact=True`. Otherwise `compact` argument would be discarded. Other various small corrections. * More fixes * Add SinkItem to quixstreams.sinks.base.__init__ * Correct typing * BatchFormat docstring * JSONFormat docstring * Rename BatchFormat to just Format * FileSink docstring * Build path using output_dir/topic/partition/key/offset.extension[.gz] pattern * Rename row to message for consistency * Handle non-bytes keys in ParquetFormat * Use pyarrow.Table.from_pylist to reduce iterations over messages * Simplify Parquet compression handling Use only native pyarrow.parquet.write_table compression mechanism * Remove supports_append property * Add SinkBatch `start_offset` and `key_type` helper properties * Don't group by keys, only by topic and partition * Remove BytesFormat * Support appending for JSON * Missing newline * Remove SinkBatch.key_type * Remove unnecessary changes to reduce PR length * Move FileSink outside of the __init__.py module * Create _get_file_path method * Remove confusing comment * Add documentation --------- Co-authored-by: Remy Gwaramadze <remy@quix.io>
1 parent 16d8637 commit a016b2a

File tree

14 files changed

+528
-1
lines changed

14 files changed

+528
-1
lines changed

LICENSES/LICENSE.jsonlines

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
*(This is the OSI approved 3-clause "New BSD License".)*
2+
3+
Copyright © 2016, wouter bolsterlee
4+
5+
All rights reserved.
6+
7+
Redistribution and use in source and binary forms, with or without
8+
modification, are permitted provided that the following conditions are met:
9+
10+
* Redistributions of source code must retain the above copyright notice, this
11+
list of conditions and the following disclaimer.
12+
13+
* Redistributions in binary form must reproduce the above copyright notice, this
14+
list of conditions and the following disclaimer in the documentation and/or
15+
other materials provided with the distribution.
16+
17+
* Neither the name of the author nor the names of the contributors may be used
18+
to endorse or promote products derived from this software without specific
19+
prior written permission.
20+
21+
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
22+
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23+
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
24+
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
25+
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
26+
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
27+
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28+
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
29+
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30+
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

conda/meta.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ requirements:
2323
- pydantic-settings >=2.3,<2.7
2424
- jsonschema >=4.3.0
2525
- fastavro >=1.8,<2.0
26+
- jsonlines >=4,<5
2627

2728
test:
2829
imports:

docs/build/build.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@
115115
k: None
116116
for k in [
117117
"quixstreams.sinks.community.iceberg",
118+
"quixstreams.sinks.community.file.sink",
119+
"quixstreams.sinks.community.file.formats.base",
120+
"quixstreams.sinks.community.file.formats.json",
121+
"quixstreams.sinks.community.file.formats.parquet",
118122
"quixstreams.sinks.core.influxdb3",
119123
"quixstreams.sinks.core.csv",
120124
"quixstreams.sinks.base.sink",

docs/connectors/sinks/file-sink.md

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# File Sink
2+
3+
!!! info
4+
5+
This is a **Community** connector. Test it before using in production.
6+
7+
To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page.
8+
9+
This sink writes batches of data to files on disk in various formats.
10+
By default, the data will include the kafka message key, value, and timestamp.
11+
12+
Currently supports the following formats:
13+
14+
- JSON
15+
- Parquet
16+
17+
## How the File Sink Works
18+
`FileSink` is a batching sink.
19+
20+
It batches processed records in memory per topic partition and writes them to files in a specified directory structure. Files are organized by topic and partition, with each batch being written to a separate file named by its starting offset.
21+
22+
The sink can either create new files for each batch or append to existing files (when using formats that support appending).
23+
24+
## How To Use File Sink
25+
26+
Create an instance of `FileSink` and pass it to the `StreamingDataFrame.sink()` method.
27+
28+
For the full description of expected parameters, see the [File Sink API](../../api-reference/sinks.md#filesink) page.
29+
30+
```python
31+
from quixstreams import Application
32+
from quixstreams.sinks.community.file import FileSink
33+
34+
# Configure the sink to write JSON files
35+
file_sink = FileSink(
36+
output_dir="./output",
37+
format="json",
38+
append=False # Set to True to append to existing files when possible
39+
)
40+
41+
app = Application(broker_address='localhost:9092', auto_offset_reset="earliest")
42+
topic = app.topic('sink_topic')
43+
44+
# Do some processing here
45+
sdf = app.dataframe(topic=topic).print(metadata=True)
46+
47+
# Sink results to the FileSink
48+
sdf.sink(file_sink)
49+
50+
if __name__ == "__main__":
51+
# Start the application
52+
app.run()
53+
```
54+
55+
## File Organization
56+
Files are organized in the following directory structure:
57+
```
58+
output_dir/
59+
├── sink_topic/
60+
│ ├── 0/
61+
│ │ ├── 0000000000000000000.json
62+
│ │ ├── 0000000000000000123.json
63+
│ │ └── 0000000000000001456.json
64+
│ └── 1/
65+
│ ├── 0000000000000000000.json
66+
│ ├── 0000000000000000789.json
67+
│ └── 0000000000000001012.json
68+
```
69+
70+
Each file is named using the batch's starting offset (padded to 19 digits) and the appropriate file extension for the chosen format.
71+
72+
## Supported Formats
73+
- **JSON**: Supports appending to existing files
74+
- **Parquet**: Does not support appending (new file created for each batch)
75+
76+
## Delivery Guarantees
77+
`FileSink` provides at-least-once guarantees, and the results may contain duplicated data if there were errors during processing.

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ nav:
5050
- Sinks:
5151
- 'connectors/sinks/README.md'
5252
- Apache Iceberg Sink: connectors/sinks/apache-iceberg-sink.md
53+
- File Sink: connectors/sinks/file-sink.md
5354
- CSV Sink: connectors/sinks/csv-sink.md
5455
- InfluxDB v3 Sink: connectors/sinks/influxdb3-sink.md
5556
- Creating a Custom Sink: connectors/sinks/custom-sinks.md

quixstreams/sinks/base/batch.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ def partition(self) -> int:
3838
def size(self) -> int:
3939
return len(self._buffer)
4040

41+
@property
42+
def start_offset(self) -> int:
43+
return self._buffer[0].offset
44+
4145
def append(
4246
self,
4347
value: Any,
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from .formats import JSONFormat, ParquetFormat
2+
from .sink import FileSink, InvalidFormatError
3+
4+
__all__ = [
5+
"FileSink",
6+
"InvalidFormatError",
7+
"JSONFormat",
8+
"ParquetFormat",
9+
]
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from .base import Format
2+
from .json import JSONFormat
3+
from .parquet import ParquetFormat
4+
5+
__all__ = ["Format", "JSONFormat", "ParquetFormat"]
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from abc import ABC, abstractmethod
2+
3+
from quixstreams.sinks.base import SinkBatch
4+
5+
__all__ = ["Format"]
6+
7+
8+
class Format(ABC):
9+
"""
10+
Base class for formatting batches in file sinks.
11+
12+
This abstract base class defines the interface for batch formatting
13+
in file sinks. Subclasses should implement the `file_extension`
14+
property and the `serialize` method to define how batches are
15+
formatted and saved.
16+
"""
17+
18+
@property
19+
@abstractmethod
20+
def file_extension(self) -> str:
21+
"""
22+
Returns the file extension used for output files.
23+
24+
:return: The file extension as a string.
25+
"""
26+
...
27+
28+
@property
29+
@abstractmethod
30+
def supports_append(self) -> bool:
31+
"""
32+
Indicates if the format supports appending data to an existing file.
33+
34+
:return: True if appending is supported, otherwise False.
35+
"""
36+
...
37+
38+
@abstractmethod
39+
def serialize(self, batch: SinkBatch) -> bytes:
40+
"""
41+
Serializes a batch of messages into bytes.
42+
43+
:param batch: The batch of messages to serialize.
44+
:return: The serialized batch as bytes.
45+
"""
46+
...
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
from gzip import compress as gzip_compress
2+
from io import BytesIO
3+
from typing import Any, Callable, Optional
4+
5+
from jsonlines import Writer
6+
7+
from quixstreams.sinks.base import SinkBatch
8+
9+
from .base import Format
10+
11+
__all__ = ["JSONFormat"]
12+
13+
14+
class JSONFormat(Format):
15+
"""
16+
Serializes batches of messages into JSON Lines format with optional gzip
17+
compression.
18+
19+
This class provides functionality to serialize a `SinkBatch` into bytes
20+
in JSON Lines format. It supports optional gzip compression and allows
21+
for custom JSON serialization through the `dumps` parameter.
22+
23+
This format supports appending to existing files.
24+
"""
25+
26+
supports_append = True
27+
28+
def __init__(
29+
self,
30+
file_extension: str = ".jsonl",
31+
compress: bool = False,
32+
dumps: Optional[Callable[[Any], str]] = None,
33+
) -> None:
34+
"""
35+
Initializes the JSONFormat.
36+
37+
:param file_extension: The file extension to use for output files.
38+
Defaults to ".jsonl".
39+
:param compress: If `True`, compresses the output using gzip and
40+
appends ".gz" to the file extension. Defaults to `False`.
41+
:param dumps: A custom function to serialize objects to JSON-formatted
42+
strings. If provided, the `compact` option is ignored.
43+
"""
44+
self._file_extension = file_extension
45+
46+
self._compress = compress
47+
if self._compress:
48+
self._file_extension += ".gz"
49+
50+
self._writer_arguments = {"compact": True}
51+
52+
# If `dumps` is provided, `compact` will be ignored
53+
if dumps is not None:
54+
self._writer_arguments["dumps"] = dumps
55+
56+
@property
57+
def file_extension(self) -> str:
58+
"""
59+
Returns the file extension used for output files.
60+
61+
:return: The file extension as a string.
62+
"""
63+
return self._file_extension
64+
65+
def serialize(self, batch: SinkBatch) -> bytes:
66+
"""
67+
Serializes a `SinkBatch` into bytes in JSON Lines format.
68+
69+
Each item in the batch is converted into a JSON object with
70+
"_timestamp", "_key", and "_value" fields. If the message key is
71+
in bytes, it is decoded to a string.
72+
73+
:param batch: The `SinkBatch` to serialize.
74+
:return: The serialized batch in JSON Lines format, optionally
75+
compressed with gzip.
76+
"""
77+
78+
with BytesIO() as fp:
79+
with Writer(fp, **self._writer_arguments) as writer:
80+
writer.write_all(
81+
{
82+
"_timestamp": item.timestamp,
83+
"_key": item.key.decode()
84+
if isinstance(item.key, bytes)
85+
else str(item),
86+
"_value": item.value,
87+
}
88+
for item in batch
89+
)
90+
91+
value = fp.getvalue()
92+
93+
return gzip_compress(value) if self._compress else value
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
from io import BytesIO
2+
from typing import Literal
3+
4+
import pyarrow as pa
5+
import pyarrow.parquet as pq
6+
7+
from quixstreams.sinks.base import SinkBatch
8+
9+
from .base import Format
10+
11+
__all__ = ["ParquetFormat"]
12+
13+
Compression = Literal["none", "snappy", "gzip", "brotli", "lz4", "zstd"]
14+
15+
16+
class ParquetFormat(Format):
17+
"""
18+
Serializes batches of messages into Parquet format.
19+
20+
This class provides functionality to serialize a `SinkBatch` into bytes
21+
in Parquet format using PyArrow. It allows setting the file extension
22+
and compression algorithm used for the Parquet files.
23+
24+
This format does not support appending to existing files.
25+
"""
26+
27+
supports_append = False
28+
29+
def __init__(
30+
self,
31+
file_extension: str = ".parquet",
32+
compression: Compression = "snappy",
33+
) -> None:
34+
"""
35+
Initializes the ParquetFormat.
36+
37+
:param file_extension: The file extension to use for output files.
38+
Defaults to ".parquet".
39+
:param compression: The compression algorithm to use for Parquet files.
40+
Allowed values are "none", "snappy", "gzip", "brotli", "lz4",
41+
or "zstd". Defaults to "snappy".
42+
"""
43+
self._file_extension = file_extension
44+
self._compression = compression
45+
46+
@property
47+
def file_extension(self) -> str:
48+
"""
49+
Returns the file extension used for output files.
50+
51+
:return: The file extension as a string.
52+
"""
53+
return self._file_extension
54+
55+
def serialize(self, batch: SinkBatch) -> bytes:
56+
"""
57+
Serializes a `SinkBatch` into bytes in Parquet format.
58+
59+
Each item in the batch is converted into a dictionary with "_timestamp",
60+
"_key", and the keys from the message value. If the message key is in
61+
bytes, it is decoded to a string.
62+
63+
Missing fields in messages are filled with `None` to ensure all rows
64+
have the same columns.
65+
66+
:param batch: The `SinkBatch` to serialize.
67+
:return: The serialized batch as bytes in Parquet format.
68+
"""
69+
70+
# Get all unique keys (columns) across all messages
71+
columns = set()
72+
for item in batch:
73+
columns.update(item.value.keys())
74+
75+
# Normalize messages: Ensure all messages have the same keys,
76+
# filling missing ones with None.
77+
normalized_messages = [
78+
{
79+
"_timestamp": item.timestamp,
80+
"_key": item.key.decode() if isinstance(item.key, bytes) else str(item),
81+
**{column: item.value.get(column, None) for column in columns},
82+
}
83+
for item in batch
84+
]
85+
86+
# Convert normalized messages to a PyArrow Table
87+
table = pa.Table.from_pylist(normalized_messages)
88+
89+
with BytesIO() as fp:
90+
pq.write_table(table, fp, compression=self._compression)
91+
return fp.getvalue()

0 commit comments

Comments
 (0)