Skip to content

Commit b60d4ad

Browse files
authored
local file source (#601)
Added community local file source. Supports Parquet, Jsonlines with gzip compression, and replaying messages at original produce speed.
1 parent a016b2a commit b60d4ad

File tree

12 files changed

+451
-0
lines changed

12 files changed

+451
-0
lines changed

docs/build/build.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@
133133
"quixstreams.sources.core.csv",
134134
"quixstreams.sources.core.kafka.kafka",
135135
"quixstreams.sources.core.kafka.quix",
136+
"quixstreams.sources.community.file.file",
137+
"quixstreams.sources.community.file.compressions.gzip",
138+
"quixstreams.sources.community.file.formats.json",
139+
"quixstreams.sources.community.file.formats.parquet",
136140
]
137141
},
138142
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Quix File Source Connector
2+
3+
This source enables reading from a localized file source, such as a JSONlines or Parquet
4+
file. It also supports file (de)compression.
5+
6+
The resulting messages can be produced in "replay" mode, where the time between record
7+
producing is matched as close as possible to the original. (per topic partition only).
8+
9+
The Quix File Source Connector is generally intended to be used alongside the related
10+
Quix File Sink Connector (in terms of expected file and data formatting).
11+
12+
## How to use CSV Source
13+
14+
To use a CSV Source, you need to create and instance of `FileSource`
15+
and pass it to the `app.dataframe()` method.
16+
17+
One important thing to note is that you should in general point to a single topic folder
18+
(rather than a root folder with many topics) otherwise topic partitions may not line up correctly.
19+
20+
```python
21+
from quixstreams import Application
22+
from quixstreams.sources.community.file import FileSource
23+
24+
app = Application(broker_address="localhost:9092")
25+
source = FileSource(
26+
filepath="/path/to/my/topic_folder",
27+
file_format="json",
28+
file_compression="gzip",
29+
as_replay=True
30+
)
31+
sdf = app.dataframe(source=source).print(metadata=True)
32+
33+
if __name__ == "__main__":
34+
app.run()
35+
```
36+
37+
## File hierarchy/structure
38+
39+
The Quix File Source Connector expects a folder structure like so:
40+
41+
```
42+
my_sinked_topics/
43+
├── topic_a/ # topic name (use this path to File Source!)
44+
│ ├── 0/ # topic partition number
45+
│ │ ├── 0000.ext # formatted offset files (ex: JSON)
46+
│ │ └── 0011.ext
47+
│ └── 1/
48+
│ ├── 0003.ext
49+
│ └── 0016.ext
50+
└── topic_b/
51+
└── etc...
52+
```
53+
54+
This is the default structure generated by the Quix File Sink Connector.
55+
56+
## File data format/schema
57+
58+
The expected data schema is largely dependent on the file format chosen.
59+
60+
For easiest use with the Quix File Sink Connector, you can follow these patterns:
61+
62+
- for row-based formats (like JSON), the expected data should have records
63+
with the following fields, where value is the entirety of the message value,
64+
ideally as a JSON-deserializable item:
65+
- `_key`
66+
- `_value`
67+
- `_timestamp`
68+
69+
- for columnar formats (like Parquet), they do not expect an explicit `value`
70+
field; instead all columns should be included individually while including `_key` and `_timestamp`:
71+
- `_key`
72+
- `_timestamp`
73+
- `field_a`
74+
- `field_b`...
75+
76+
etc...
77+
78+
## Topic
79+
80+
The default topic will have a partition count that reflects the partition count found
81+
within the provided topic's folder structure.

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ nav:
6060
- Kafka Replicator Source: connectors/sources/kafka-source.md
6161
- Quix Source: connectors/sources/quix-source.md
6262
- Creating a Custom Source: connectors/sources/custom-sources.md
63+
- Local File Source: connectors/source/file-source.md
6364
- Contribution Guide: 'connectors/contribution-guide.md'
6465
- Community and Core Connectors: 'connectors/community-and-core.md'
6566
- Upgrading Guide:
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# ruff: noqa: F403
2+
from .file import *
3+
from .formats import *
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# ruff: noqa: F403
2+
# ruff: noqa: F405
3+
from .base import *
4+
from .gzip import *
5+
6+
COMPRESSION_MAPPER = {"gz": GZipDecompressor, "gzip": GZipDecompressor}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from abc import ABC, abstractmethod
2+
from pathlib import Path
3+
from typing import Literal
4+
5+
__all__ = (
6+
"Decompressor",
7+
"CompressionName",
8+
)
9+
10+
11+
CompressionName = Literal["gz", "gzip"]
12+
13+
14+
class Decompressor(ABC):
15+
@abstractmethod
16+
def decompress(self, filepath: Path) -> bytes: ...
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from pathlib import Path
2+
3+
from .base import Decompressor
4+
5+
__all__ = ("GZipDecompressor",)
6+
7+
8+
class GZipDecompressor(Decompressor):
9+
def __init__(self):
10+
from gzip import decompress
11+
12+
self._decompressor = decompress
13+
14+
def decompress(self, filepath: Path) -> bytes:
15+
with open(filepath, "rb") as f:
16+
return self._decompressor(f.read())
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
import logging
2+
from pathlib import Path
3+
from time import sleep
4+
from typing import Generator, Optional, Union
5+
6+
from quixstreams.models import Topic, TopicConfig
7+
from quixstreams.sources import Source
8+
9+
from .compressions import CompressionName
10+
from .formats import FORMATS, Format, FormatName
11+
12+
__all__ = ("FileSource",)
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
class FileSource(Source):
18+
"""
19+
Ingest a set of local files into kafka by iterating through the provided folder and
20+
processing all nested files within it.
21+
22+
Expects folder and file structures as generated by the related Quix Streams File
23+
Sink Connector:
24+
25+
my_topics/
26+
├── topic_a/
27+
│ ├── 0/
28+
│ │ ├── 0000.ext
29+
│ │ └── 0011.ext
30+
│ └── 1/
31+
│ ├── 0003.ext
32+
│ └── 0016.ext
33+
└── topic_b/
34+
└── etc...
35+
36+
Intended to be used with a single topic (ex: topic_a), but will recursively read
37+
from whatever entrypoint is passed to it.
38+
39+
File format structure depends on the file format.
40+
41+
See the `.formats` and `.compressions` modules to see what is supported.
42+
43+
Example:
44+
45+
from quixstreams import Application
46+
from quixstreams.sources.community.file import FileSource
47+
48+
app = Application(broker_address="localhost:9092", auto_offset_reset="earliest")
49+
source = FileSource(
50+
filepath="/path/to/my/topic_folder",
51+
file_format="json",
52+
file_compression="gzip",
53+
)
54+
sdf = app.dataframe(source=source).print(metadata=True)
55+
56+
if __name__ == "__main__":
57+
app.run()
58+
"""
59+
60+
def __init__(
61+
self,
62+
filepath: Union[str, Path],
63+
file_format: Union[Format, FormatName],
64+
file_compression: Optional[CompressionName] = None,
65+
as_replay: bool = True,
66+
name: Optional[str] = None,
67+
shutdown_timeout: float = 10,
68+
):
69+
"""
70+
:param filepath: a filepath to recursively read through; it is recommended to
71+
provide the path to a given topic folder (ex: `/path/to/topic_a`).
72+
:param file_format: what format the message files are in (ex: json, parquet).
73+
Optionally, can provide a `Format` instance if more than file_compression
74+
is necessary to define (file_compression will then be ignored).
75+
:param file_compression: what compression is used on the given files, if any.
76+
:param as_replay: Produce the messages with the original time delay between them.
77+
Otherwise, produce the messages as fast as possible.
78+
NOTE: Time delay will only be accurate per partition, NOT overall.
79+
:param name: The name of the Source application (Default: last folder name).
80+
:param shutdown_timeout: Time in seconds the application waits for the source
81+
to gracefully shutdown
82+
"""
83+
self._filepath = Path(filepath)
84+
self._formatter = _get_formatter(file_format, file_compression)
85+
self._as_replay = as_replay
86+
self._previous_timestamp = None
87+
self._previous_partition = None
88+
super().__init__(
89+
name=name or self._filepath.name, shutdown_timeout=shutdown_timeout
90+
)
91+
92+
def _replay_delay(self, current_timestamp: int):
93+
"""
94+
Apply the replay speed by calculating the delay between messages
95+
based on their timestamps.
96+
"""
97+
if self._previous_timestamp is not None:
98+
time_diff = (current_timestamp - self._previous_timestamp) / 1000
99+
if time_diff > 0:
100+
logger.debug(f"Sleeping for {time_diff} seconds...")
101+
sleep(time_diff)
102+
self._previous_timestamp = current_timestamp
103+
104+
def _get_partition_count(self) -> int:
105+
return len([f for f in self._filepath.iterdir()])
106+
107+
def default_topic(self) -> Topic:
108+
"""
109+
Uses the file structure to generate the desired partition count for the
110+
internal topic.
111+
:return: the original default topic, with updated partition count
112+
"""
113+
topic = super().default_topic()
114+
topic.config = TopicConfig(
115+
num_partitions=self._get_partition_count(), replication_factor=1
116+
)
117+
return topic
118+
119+
def _check_file_partition_number(self, file: Path):
120+
"""
121+
Checks whether the next file is the start of a new partition so the timestamp
122+
tracker can be reset.
123+
"""
124+
partition = int(file.parent.name)
125+
if self._previous_partition != partition:
126+
self._previous_timestamp = None
127+
self._previous_partition = partition
128+
logger.debug(f"Beginning reading partition {partition}")
129+
130+
def _produce(self, record: dict):
131+
kafka_msg = self._producer_topic.serialize(
132+
key=record["_key"],
133+
value=record["_value"],
134+
timestamp_ms=record["_timestamp"],
135+
)
136+
self.produce(
137+
key=kafka_msg.key, value=kafka_msg.value, timestamp=kafka_msg.timestamp
138+
)
139+
140+
def run(self):
141+
while self._running:
142+
for file in _file_finder(self._filepath):
143+
logger.info(f"Reading files from topic {self._filepath.name}")
144+
self._check_file_partition_number(file)
145+
for record in self._formatter.file_read(file):
146+
if self._as_replay:
147+
self._replay_delay(record["_timestamp"])
148+
self._produce(record)
149+
self.flush()
150+
return
151+
152+
153+
def _get_formatter(
154+
formatter: Union[Format, FormatName], compression: Optional[CompressionName]
155+
) -> Format:
156+
if isinstance(formatter, Format):
157+
return formatter
158+
elif format_obj := FORMATS.get(formatter):
159+
return format_obj(compression=compression)
160+
161+
allowed_formats = ", ".join(FormatName.__args__)
162+
raise ValueError(
163+
f'Invalid format name "{formatter}". '
164+
f"Allowed values: {allowed_formats}, "
165+
f"or an instance of a subclass of `Format`."
166+
)
167+
168+
169+
def _file_finder(filepath: Path) -> Generator[Path, None, None]:
170+
if filepath.is_dir():
171+
for i in sorted(filepath.iterdir(), key=lambda x: x.name):
172+
yield from _file_finder(i)
173+
else:
174+
yield filepath
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# ruff: noqa: F403
2+
# ruff: noqa: F405
3+
from .base import *
4+
from .json import *
5+
from .parquet import *
6+
7+
FORMATS = {
8+
"json": JSONFormat,
9+
"parquet": ParquetFormat,
10+
}

0 commit comments

Comments
 (0)