Skip to content

Commit 3b1d160

Browse files
ovvdaniil-quix
andauthored
docs: Add standalone sources section (#557)
* docs: Add standalone sources section * indentation * Sources docs update --------- Co-authored-by: Daniil Gusev <daniil@quix.io>
1 parent 63dcd6b commit 3b1d160

File tree

4 files changed

+96
-32
lines changed

4 files changed

+96
-32
lines changed

docs/connectors/sources/README.md

Lines changed: 79 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,41 +9,42 @@ from quixstreams import Application
99
from quixstreams.sources import CSVSource
1010

1111
def main():
12-
app = Application()
13-
source = CSVSource(path="input.csv")
14-
15-
sdf = app.dataframe(source=source)
16-
sdf.print(metadata=True)
17-
18-
app.run()
19-
12+
app = Application()
13+
source = CSVSource(path="input.csv")
14+
15+
sdf = app.dataframe(source=source)
16+
sdf.print(metadata=True)
17+
18+
app.run()
19+
2020
if __name__ == "__main__":
21-
main()
21+
main()
2222
```
2323

2424
## Supported sources
2525

26-
Quix streams provide a source out of the box.
26+
Quix Streams provides the following sources out of the box:
2727

2828
* [CSVSource](./csv-source.md): A source that reads data from a single CSV file.
2929
* [KafkaReplicatorSource](./kafka-source.md): A source that replicates a topic from a Kafka broker to your application broker.
3030
* [QuixEnvironmentSource](./quix-source.md): A source that replicates a topic from a Quix Cloud environment to your application broker.
3131

32-
You can also implement your own, have a look at [Creating a Custom Source](custom-sources.md) for documentation on how to do that.
32+
To create a custom source, read [Creating a Custom Source](custom-sources.md).
3333

3434
## Multiprocessing
3535

3636
For good performance, each source runs in a subprocess. Quix Streams automatically manages the subprocess's setting up, monitoring, and tearing down.
3737

3838
For multiplatform support, Quix Streams starts the source process using the [spawn](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods) approach. As a side effect, each Source instance must be pickleable. If a source needs to handle unpickleable objects, it's best to initialize those in the source subprocess (in the `BaseSource.start` or `Source.run` methods).
3939

40-
## Topics
40+
## Customize Topic Configuration
4141

42-
Sources work by sending data to Kafka topics. Then StreamingDataFrames consume these topics.
42+
Sources work by sending data to intermediate Kafka topics, which StreamingDataFrames then consume and process.
4343

44-
Each source provides a default topic based on its configuration. You can override the default topic by specifying a topic using the `app.dataframe()` method.
44+
By default, each Source provides a default topic based on its configuration.
45+
To customize the topic config, pass a new `Topic` object to the `app.dataframe()` method together with the Source instance.
4546

46-
**Example**
47+
**Example:**
4748

4849
Provide a custom topic with four partitions to the source.
4950

@@ -54,9 +55,14 @@ from quixstreams.models.topics import TopicConfig
5455

5556
def main():
5657
app = Application()
58+
# Create a CSVSource
5759
source = CSVSource(path="input.csv")
60+
61+
# Define a topic for the CSVSource with a custom config
5862
topic = app.topic("my_csv_source", config=TopicConfig(num_partitions=4, replication_factor=1))
5963

64+
# Pass the topic together with the CSVSource to a dataframe
65+
# When the CSVSource starts, it will produce data to this topic
6066
sdf = app.dataframe(topic=topic, source=source)
6167
sdf.print(metadata=True)
6268

@@ -65,3 +71,61 @@ def main():
6571
if __name__ == "__main__":
6672
main()
6773
```
74+
75+
## Standalone sources
76+
77+
So far we have covered how to run Sources and process data within the same application.
78+
79+
When you scale your processing applications to more instances, you may need to run only a single instance of the Source.
80+
For example, when the source is reading data from some Websocket API, and you want to process it with multiple apps.
81+
82+
To achieve that, Sources can be run in a standalone mode.
83+
84+
**Example**
85+
86+
Running an imaginary Websocket source in a standalone mode to read data only once.
87+
88+
```python
89+
from quixstreams import Application
90+
91+
def main():
92+
app = Application()
93+
94+
# Create an instance of SomeWebsocketSource
95+
source = SomeWebsocketSource(url="wss://example.com")
96+
97+
# Register the source in the app
98+
app.add_source(source)
99+
100+
# Start the application
101+
# The app will start SomeWebsocketSource, and it will produce data to the default intermediate topic
102+
app.run()
103+
104+
if __name__ == "__main__":
105+
main()
106+
```
107+
108+
To customize the topic the Source will use, create a new `Topic` and pass it to the `app.add_source()` method:
109+
110+
```python
111+
from quixstreams import Application
112+
from quixstreams.sources import CSVSource
113+
from quixstreams.models.topics import TopicConfig
114+
115+
def main():
116+
app = Application()
117+
# Create an instance of SomeWebsocketSource
118+
source = SomeWebsocketSource(url="wss://example.com")
119+
120+
# Define a topic for the CSVSource with a custom config
121+
topic = app.topic("some-websocket-source", config=TopicConfig(num_partitions=4, replication_factor=1))
122+
123+
# Register the source and topic in the application
124+
app.add_source(source=source, topic=topic)
125+
126+
# Start the application
127+
app.run()
128+
129+
if __name__ == "__main__":
130+
main()
131+
```

docs/connectors/sources/csv-source.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,16 @@ from quixstreams import Application
1313
from quixstreams.sources import CSVSource
1414

1515
def main():
16-
app = Application()
17-
source = CSVSource(path="input.csv")
16+
app = Application()
17+
source = CSVSource(path="input.csv")
1818

19-
sdf = app.dataframe(source=source)
20-
sdf.print(metadata=True)
19+
sdf = app.dataframe(source=source)
20+
sdf.print(metadata=True)
2121

22-
app.run()
22+
app.run()
2323

2424
if __name__ == "__main__":
25-
main()
25+
main()
2626
```
2727

2828
## File format

docs/connectors/sources/kafka-source.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@ from quixstreams.sources import KafkaReplicatorSource
1515
def main():
1616
app = Application()
1717
source = KafkaReplicatorSource(
18-
name="my-source",
19-
app_config=app.config,
20-
topic="source-topic",
21-
broker_address="source-broker-address"
18+
name="my-source",
19+
app_config=app.config,
20+
topic="source-topic",
21+
broker_address="source-broker-address"
2222
)
23-
23+
2424
sdf = app.dataframe(source=source)
2525
sdf.print(metadata=True)
26-
26+
2727
app.run()
2828

2929
if __name__ == "__main__":

docs/connectors/sources/quix-source.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ from quixstreams.sources import QuixEnvironmentSource
1313
def main():
1414
app = Application()
1515
source = QuixEnvironmentSource(
16-
name="my-source",
17-
app_config=app.config,
18-
topic="source-topic",
19-
quix_sdk_token="quix-sdk-token",
20-
quix_workspace_id="quix-workspace-id",
16+
name="my-source",
17+
app_config=app.config,
18+
topic="source-topic",
19+
quix_sdk_token="quix-sdk-token",
20+
quix_workspace_id="quix-workspace-id",
2121
)
2222

2323
sdf = app.dataframe(source=source)

0 commit comments

Comments
 (0)