Skip to content

Commit a0fb56b

Browse files
authored
FileSource: fix requiring all optional file source deps to use a file source (#859)
* fix requiring all optional deps to use any file source * update documentation for file source
1 parent d43ace3 commit a0fb56b

File tree

4 files changed

+309
-82
lines changed

4 files changed

+309
-82
lines changed

docs/connectors/sources/amazon-s3-source.md

Lines changed: 104 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -37,35 +37,50 @@ You can learn more details about the [expected kafka message format](#message-da
3737

3838
## How To Use
3939

40-
S3 File Source is just a special configuration of the `FileSource` connector.
41-
42-
Simply provide it an `S3Origin` (`FileSource(origin=<ORIGIN>)`).
43-
44-
Then, hand the configured `FileSource` to your `SDF` (`app.dataframe(source=<SOURCE>)`).
40+
Import and instantiate an `S3FileSource` instance and hand it to an Application using
41+
`app.add_source(<S3FileSource>)` or instead to a StreamingDataFrame with
42+
`app.dataframe(source=<S3FileSource>)` if further data manipulation is required.
4543

4644
For more details around various settings, see [configuration](#configuration).
4745

4846
```python
4947
from quixstreams import Application
50-
from quixstreams.sources.community.file import FileSource
51-
from quixstreams.sources.community.file.origins import S3Origin
48+
from quixstreams.sources.community.file.s3 import S3FileSource
49+
50+
51+
def key_setter(record: dict) -> str:
52+
return record["host_id"]
53+
54+
55+
def value_setter(record: dict) -> dict:
56+
return {k: record[k] for k in ["field_x", "field_y"]}
5257

53-
app = Application(broker_address="localhost:9092", auto_offset_reset="earliest")
5458

55-
origin = S3Origin(
59+
def timestamp_setter(record: dict) -> int:
60+
return record['timestamp']
61+
62+
63+
source = S3FileSource(
64+
filepath='folder_a/folder_b',
5665
bucket="<YOUR BUCKET NAME>",
66+
region_name="<YOUR REGION>",
5767
aws_access_key_id="<YOUR KEY ID>",
5868
aws_secret_access_key="<YOUR SECRET KEY>",
59-
region_name="<YOUR REGION>",
60-
)
61-
source = FileSource(
62-
directory="path/to/your/topic_folder/",
63-
origin=origin,
64-
format="json",
69+
key_setter=key_setter,
70+
value_setter=value_setter,
71+
timestamp_setter=timestamp_setter,
72+
file_format="json",
6573
compression="gzip",
74+
has_partition_folders=False,
75+
replay_speed=0.5,
76+
)
77+
app = Application(
78+
broker_address="localhost:9092",
79+
consumer_group='file-source',
80+
auto_offset_reset='latest',
6681
)
67-
sdf = app.dataframe(source=source).print(metadata=True)
68-
# YOUR LOGIC HERE!
82+
app.add_source(source)
83+
6984

7085
if __name__ == "__main__":
7186
app.run()
@@ -77,8 +92,9 @@ Here are some important configurations to be aware of (see [File Source API](../
7792

7893
### Required:
7994

80-
`S3Origin`:
81-
95+
- `filepath`: folder to recursively iterate from (a file will be used directly).
96+
For S3, exclude bucket name or starting "/".
97+
**Note**: If using alongside `FileSink`, provide the path to the topic name folder (ex: `"path/to/topic_a/"`).
8298
- `bucket`: The S3 bucket name only (ex: `"your-bucket"`).
8399
- `region_name`: AWS region (ex: us-east-1).
84100
**Note**: can alternatively set the `AWS_REGION` environment variable.
@@ -87,18 +103,8 @@ Here are some important configurations to be aware of (see [File Source API](../
87103
- `aws_secret_access_key`: AWS secret key.
88104
**Note**: can alternatively set the `AWS_SECRET_ACCESS_KEY` environment variable.
89105

90-
91-
`FileSource`:
92-
93-
- `directory`: a directory to recursively read through (exclude bucket name or starting "/").
94-
**Note**: If using alongside `FileSink`, provide the path to the topic name folder (ex: `"path/to/topic_a/"`).
95-
- `origin`: An `S3Origin` instance.
96-
97-
98106
### Optional:
99107

100-
`FileSource`:
101-
102108
- `format`: what format the message files are in (ex: `"json"`, `"parquet"`).
103109
**Advanced**: can optionally provide a `Format` instance (`compression` will then be ignored).
104110
**Default**: `"json"`
@@ -110,9 +116,21 @@ Here are some important configurations to be aware of (see [File Source API](../
110116
**Note**: Time delay will only be accurate _per partition_, NOT overall.
111117
**Default**: 1.0
112118

113-
## File hierarchy/structure
114119

115-
The File Source expects a folder structure like so:
120+
## Supported File Hierarchies
121+
122+
All `*FileSource` types support both single file referencing and recursive folder traversal.
123+
124+
In addition, it also supports a topic-partition file structure as produced by a Quix
125+
Streams `*FileSink` instance.
126+
127+
128+
### Using with a Topic-Partition hierarchy (from `*FileSink`)
129+
130+
A Topic-Partition structure allows reproducing messages to the exact partition
131+
they originated from.
132+
133+
When using a Quix Streams `*FileSink`, it will produce files using this structure:
116134

117135
```
118136
my_sinked_topics/
@@ -127,6 +145,61 @@ The File Source expects a folder structure like so:
127145
└── etc...
128146
```
129147

148+
To have `*FileSource` reflect this partition mapping for messages (instead of just producing
149+
messages to whatever partition is applicable), it must know how many partition folders
150+
there are so it can create a topic with that many partitions.
151+
152+
To enable this:
153+
1. subclass your `*FileSource` instance and define the `file_partition_counter` method.
154+
- this will be run before processing any files.
155+
2. Enable the use of `file_partition_counter` by setting the flag `has_partition_folders=True`.
156+
3. Extract the original Kafka key with `key_setter` (by default, it uses the same field name that `*FinkSink` writes to).
157+
- see [message data schema](#message-data-formatschema) for more info around expected defaults.
158+
159+
#### Example
160+
As a simple example, using the topic-partition file structure:
161+
162+
```
163+
├── my_topic/
164+
│ ├── 0/
165+
│ │ ├── 0000.ext
166+
│ │ └── 0011.ext
167+
│ └── 1/
168+
│ ├── 0003.ext
169+
│ └── 0016.ext
170+
```
171+
172+
you could define `file_partition_counter` on `LocalFileSource` like this:
173+
174+
```python
175+
from quixstreams.sources.community.file.local import LocalFileSource
176+
177+
class MyLocalFileSource(LocalFileSource):
178+
179+
def file_partition_counter(self) -> int:
180+
return len([f for f in self._filepath.iterdir()]) # `len(['0', '1'])`
181+
```
182+
183+
Also, for our `key_setter`:
184+
```python
185+
def my_key_setter(record: dict) -> str:
186+
return record["original_key_field"]
187+
```
188+
189+
Then when initing with your new class:
190+
191+
```python
192+
source = MyLocalFileSource(
193+
..., # required args,
194+
has_partition_folders=True,
195+
key_setter=my_key_setter,
196+
)
197+
```
198+
199+
This will produce these messages across the 2 partitions in their original partitioning
200+
and ordering.
201+
202+
130203
## Message Data Format/Schema
131204

132205
The expected file schema largely depends on the chosen

docs/connectors/sources/local-file-source.md

Lines changed: 103 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from quixstreams.sources.community.file.local import LocalFileSource
2+
13
# Local File Source
24

35
!!! info
@@ -23,7 +25,7 @@ pip install quixstreams
2325

2426
## How It Works
2527

26-
`FileSource` steps through each folder within the provided path and dumps each record
28+
`LocalFileSource` steps through each folder within the provided path and dumps each record
2729
contained in each file as a message to a Kafka topic. Folders are navigated in
2830
lexicographical order.
2931

@@ -37,26 +39,46 @@ You can learn more details about the [expected kafka message format](#message-da
3739

3840
## How To Use
3941

40-
Local File Source is the default configuration of the `FileSource` connector.
41-
42-
Simply hand the configured `FileSource` (without a `origin`) to your `SDF`
43-
(`app.dataframe(source=<SOURCE>)`).
42+
Import and instantiate a `LocalFileSource` instance and hand it to an Application using
43+
`app.add_source(<LocalFileSource>)` or instead to a StreamingDataFrame with
44+
`app.dataframe(source=<LocalFileSource>)` if further data manipulation is required.
4445

4546
For more details around various settings, see [configuration](#configuration).
4647

4748
```python
4849
from quixstreams import Application
49-
from quixstreams.sources.community.file import FileSource
50+
from quixstreams.sources.community.file.local import LocalFileSource
51+
52+
53+
def key_setter(record: dict) -> str:
54+
return record["host_id"]
55+
56+
57+
def value_setter(record: dict) -> dict:
58+
return {k: record[k] for k in ["field_x", "field_y"]}
59+
60+
61+
def timestamp_setter(record: dict) -> int:
62+
return record['timestamp']
63+
5064

51-
app = Application(broker_address="localhost:9092")
52-
source = FileSource(
53-
directory="/path/to/my/topic_folder",
54-
format="json",
65+
source = LocalFileSource(
66+
filepath='folder_a/folder_b',
67+
key_setter=key_setter,
68+
value_setter=value_setter,
69+
timestamp_setter=timestamp_setter,
70+
file_format="json",
5571
compression="gzip",
56-
replay_speed=1.0,
72+
has_partition_folders=False,
73+
replay_speed=0.5,
5774
)
58-
sdf = app.dataframe(source=source).print(metadata=True)
59-
# YOUR LOGIC HERE!
75+
app = Application(
76+
broker_address="localhost:9092",
77+
consumer_group='file-source',
78+
auto_offset_reset='latest',
79+
)
80+
app.add_source(source)
81+
6082

6183
if __name__ == "__main__":
6284
app.run()
@@ -68,7 +90,7 @@ Here are some important configurations to be aware of (see [File Source API](../
6890

6991
### Required:
7092

71-
- `directory`: a directory to recursively read through (exclude bucket name).
93+
- `filepath`: folder to recursively iterate from (a file will be used directly).
7294
**Note**: If using alongside `FileSink`, provide the path to the topic name folder (ex: `"path/to/topic_a/"`).
7395

7496
### Optional:
@@ -85,9 +107,20 @@ Here are some important configurations to be aware of (see [File Source API](../
85107
**Default**: 1.0
86108

87109

88-
## File hierarchy/structure
110+
## Supported File Hierarchies
111+
112+
All `*FileSource` types support both single file referencing and recursive folder traversal.
113+
114+
In addition, it also supports a topic-partition file structure as produced by a Quix
115+
Streams `*FileSink` instance.
116+
89117

90-
The File Source expects a folder structure like so:
118+
### Using with a Topic-Partition hierarchy (from `*FileSink`)
119+
120+
A Topic-Partition structure allows reproducing messages to the exact partition
121+
they originated from.
122+
123+
When using a Quix Streams `*FileSink`, it will produce files using this structure:
91124

92125
```
93126
my_sinked_topics/
@@ -102,7 +135,60 @@ The File Source expects a folder structure like so:
102135
└── etc...
103136
```
104137

105-
This is the default structure generated by the File Sink.
138+
To have `*FileSource` reflect this partition mapping for messages (instead of just producing
139+
messages to whatever partition is applicable), it must know how many partition folders
140+
there are so it can create a topic with that many partitions.
141+
142+
To enable this:
143+
1. subclass your `*FileSource` instance and define the `file_partition_counter` method.
144+
- this will be run before processing any files.
145+
2. Enable the use of `file_partition_counter` by setting the flag `has_partition_folders=True`.
146+
3. Extract the original Kafka key with `key_setter` (by default, it uses the same field name that `*FinkSink` writes to).
147+
- see [message data schema](#message-data-formatschema) for more info around expected defaults.
148+
149+
#### Example
150+
As a simple example, using the topic-partition file structure:
151+
152+
```
153+
├── my_topic/
154+
│ ├── 0/
155+
│ │ ├── 0000.ext
156+
│ │ └── 0011.ext
157+
│ └── 1/
158+
│ ├── 0003.ext
159+
│ └── 0016.ext
160+
```
161+
162+
you could define `file_partition_counter` on `LocalFileSource` like this:
163+
164+
```python
165+
from quixstreams.sources.community.file.local import LocalFileSource
166+
167+
class MyLocalFileSource(LocalFileSource):
168+
169+
def file_partition_counter(self) -> int:
170+
return len([f for f in self._filepath.iterdir()]) # `len(['0', '1'])`
171+
```
172+
173+
Also, for our `key_setter`:
174+
```python
175+
def my_key_setter(record: dict) -> str:
176+
return record["original_key_field"]
177+
```
178+
179+
Then when initing with your new class:
180+
181+
```python
182+
source = MyLocalFileSource(
183+
..., # required args,
184+
has_partition_folders=True,
185+
key_setter=my_key_setter,
186+
)
187+
```
188+
189+
This will produce these messages across the 2 partitions in their original partitioning
190+
and ordering.
191+
106192

107193
## Message Data Format/Schema
108194

0 commit comments

Comments
 (0)