Skip to content

Commit b4180e1

Browse files
authored
Source API documentation (#494)
Source API documentation
1 parent 4d970e4 commit b4180e1

File tree

6 files changed

+551
-2
lines changed

6 files changed

+551
-2
lines changed

docs/api-reference/sources.md

Lines changed: 377 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,377 @@
1+
<a id="quixstreams.sources.base"></a>
2+
3+
## quixstreams.sources.base
4+
5+
<a id="quixstreams.sources.base.BaseSource"></a>
6+
7+
### BaseSource
8+
9+
```python
10+
class BaseSource(ABC)
11+
```
12+
13+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/docs/sources/quixstreams/sources/base.py#L22)
14+
15+
This is the base class for all sources.
16+
17+
Sources are executed in a sub-process of the main application.
18+
19+
To create your own source you need to implement:
20+
* `run`
21+
* `stop`
22+
* `default_topic`
23+
24+
`BaseSource` is the most basic interface, and the framework expects every
25+
sources to implement it. Use `Source` to benefit from a base implementation.
26+
27+
You can connect a source to a StreamingDataframe using the Application.
28+
29+
Example snippet:
30+
31+
```python
32+
from quixstreams import Application
33+
from quixstreams.sources import Source
34+
35+
class MySource(Source):
36+
def run(self):
37+
for _ in range(1000):
38+
self.produce(
39+
key="foo",
40+
value=b"foo"
41+
)
42+
43+
def main():
44+
app = Application()
45+
source = MySource(name="my_source")
46+
47+
sdf = app.dataframe(source=source)
48+
sdf.print(metadata=True)
49+
50+
app.run(sdf)
51+
52+
if __name__ == "__main__":
53+
main()
54+
```
55+
56+
<a id="quixstreams.sources.base.BaseSource.configure"></a>
57+
58+
<br><br>
59+
60+
#### BaseSource.configure
61+
62+
```python
63+
def configure(topic: Topic, producer: RowProducer) -> None
64+
```
65+
66+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/docs/sources/quixstreams/sources/base.py#L74)
67+
68+
This method is triggered when the source is registered to the Application.
69+
70+
It configures the source's Kafka producer and the topic it will produce to.
71+
72+
<a id="quixstreams.sources.base.BaseSource.start"></a>
73+
74+
<br><br>
75+
76+
#### BaseSource.start
77+
78+
```python
79+
@abstractmethod
80+
def start() -> None
81+
```
82+
83+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/docs/sources/quixstreams/sources/base.py#L93)
84+
85+
This method is triggered in the subprocess when the source is started.
86+
87+
The subprocess will run as long as the start method executes.
88+
Use it to fetch data and produce it to Kafka.
89+
90+
<a id="quixstreams.sources.base.BaseSource.stop"></a>
91+
92+
<br><br>
93+
94+
#### BaseSource.stop
95+
96+
```python
97+
@abstractmethod
98+
def stop() -> None
99+
```
100+
101+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/docs/sources/quixstreams/sources/base.py#L102)
102+
103+
This method is triggered when the application is shutting down.
104+
105+
The source must ensure that the `run` method is completed soon.
106+
107+
108+
<br>
109+
***Example Snippet:***
110+
111+
```python
112+
class MySource(BaseSource):
113+
def start(self):
114+
self._running = True
115+
while self._running:
116+
self._producer.produce(
117+
topic=self._producer_topic,
118+
value="foo",
119+
)
120+
time.sleep(1)
121+
122+
def stop(self):
123+
self._running = False
124+
```
125+
126+
<a id="quixstreams.sources.base.BaseSource.default_topic"></a>
127+
128+
<br><br>
129+
130+
#### BaseSource.default\_topic
131+
132+
```python
133+
@abstractmethod
134+
def default_topic() -> Topic
135+
```
136+
137+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/docs/sources/quixstreams/sources/base.py#L127)
138+
139+
This method is triggered when the topic is not provided to the source.
140+
141+
The source must return a default topic configuration.
142+
143+
<a id="quixstreams.sources.base.Source"></a>
144+
145+
### Source
146+
147+
```python
148+
class Source(BaseSource)
149+
```
150+
151+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/docs/sources/quixstreams/sources/base.py#L135)
152+
153+
BaseSource class implementation providing
154+
155+
Implementation for the abstract method:
156+
* `default_topic`
157+
* `start`
158+
* `stop`
159+
160+
Helper methods
161+
* serialize
162+
* produce
163+
* flush
164+
165+
Helper property
166+
* running
167+
168+
Subclass it and implement the `_run` method to fetch data and produce it to Kafka.
169+
170+
<a id="quixstreams.sources.base.Source.__init__"></a>
171+
172+
<br><br>
173+
174+
#### Source.\_\_init\_\_
175+
176+
```python
177+
def __init__(name: str, shutdown_timeout: float = 10) -> None
178+
```
179+
180+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/docs/sources/quixstreams/sources/base.py#L155)
181+
182+
183+
<br>
184+
***Arguments:***
185+
186+
- `name`: The source unique name. Used to generate the topic configurtion
187+
- `shutdown_timeout`: Time in second the application waits for the source to gracefully shutdown
188+
189+
<a id="quixstreams.sources.base.Source.running"></a>
190+
191+
<br><br>
192+
193+
#### Source.running
194+
195+
```python
196+
@property
197+
def running() -> bool
198+
```
199+
200+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/docs/sources/quixstreams/sources/base.py#L169)
201+
202+
Property indicating if the source is running.
203+
204+
The `stop` method will set it to `False`. Use it to stop the source gracefully.
205+
206+
Example snippet:
207+
208+
```python
209+
class MySource(Source):
210+
def run(self):
211+
while self.running:
212+
self.produce(
213+
value="foo",
214+
)
215+
time.sleep(1)
216+
```
217+
218+
<a id="quixstreams.sources.base.Source.cleanup"></a>
219+
220+
<br><br>
221+
222+
#### Source.cleanup
223+
224+
```python
225+
def cleanup(failed: bool) -> None
226+
```
227+
228+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/docs/sources/quixstreams/sources/base.py#L189)
229+
230+
This method is triggered once the `_run` method completes.
231+
232+
Use it to clean up the resources and shut down the source gracefully.
233+
234+
It flush the producer when `_run` completes successfully.
235+
236+
<a id="quixstreams.sources.base.Source.stop"></a>
237+
238+
<br><br>
239+
240+
#### Source.stop
241+
242+
```python
243+
def stop() -> None
244+
```
245+
246+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/docs/sources/quixstreams/sources/base.py#L200)
247+
248+
This method is triggered when the application is shutting down.
249+
250+
It sets the `running` property to `False`.
251+
252+
<a id="quixstreams.sources.base.Source.start"></a>
253+
254+
<br><br>
255+
256+
#### Source.start
257+
258+
```python
259+
def start()
260+
```
261+
262+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/docs/sources/quixstreams/sources/base.py#L209)
263+
264+
This method is triggered in the subprocess when the source is started.
265+
266+
It marks the source as running, execute it's run method and ensure cleanup happens.
267+
268+
<a id="quixstreams.sources.base.Source.run"></a>
269+
270+
<br><br>
271+
272+
#### Source.run
273+
274+
```python
275+
@abstractmethod
276+
def run()
277+
```
278+
279+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/docs/sources/quixstreams/sources/base.py#L225)
280+
281+
This method is triggered in the subprocess when the source is started.
282+
283+
The subprocess will run as long as the run method executes.
284+
Use it to fetch data and produce it to Kafka.
285+
286+
<a id="quixstreams.sources.base.Source.serialize"></a>
287+
288+
<br><br>
289+
290+
#### Source.serialize
291+
292+
```python
293+
def serialize(key: Optional[object] = None,
294+
value: Optional[object] = None,
295+
headers: Optional[Headers] = None,
296+
timestamp_ms: Optional[int] = None) -> KafkaMessage
297+
```
298+
299+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/docs/sources/quixstreams/sources/base.py#L233)
300+
301+
Serialize data to bytes using the producer topic serializers and return a :class:`quixstreams.models.messages.KafkaMessage` .
302+
303+
304+
<br>
305+
***Returns:***
306+
307+
:class:`quixstreams.models.messages.KafkaMessage`
308+
309+
<a id="quixstreams.sources.base.Source.produce"></a>
310+
311+
<br><br>
312+
313+
#### Source.produce
314+
315+
```python
316+
def produce(value: Optional[Union[str, bytes]] = None,
317+
key: Optional[Union[str, bytes]] = None,
318+
headers: Optional[Headers] = None,
319+
partition: Optional[int] = None,
320+
timestamp: Optional[int] = None,
321+
poll_timeout: float = 5.0,
322+
buffer_error_max_tries: int = 3) -> None
323+
```
324+
325+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/docs/sources/quixstreams/sources/base.py#L249)
326+
327+
Produce a message to the configured source topic in Kafka.
328+
329+
<a id="quixstreams.sources.base.Source.flush"></a>
330+
331+
<br><br>
332+
333+
#### Source.flush
334+
335+
```python
336+
def flush(timeout: Optional[float] = None) -> None
337+
```
338+
339+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/docs/sources/quixstreams/sources/base.py#L274)
340+
341+
This method flush the producer.
342+
343+
It ensures all messages are successfully delivered to Kafka.
344+
345+
346+
<br>
347+
***Arguments:***
348+
349+
- `timeout` (`float`): time to attempt flushing (seconds).
350+
None use producer default or -1 is infinite. Default: None
351+
352+
**Raises**:
353+
354+
- `CheckpointProducerTimeout`: if any message fails to produce before the timeout
355+
356+
<a id="quixstreams.sources.base.Source.default_topic"></a>
357+
358+
<br><br>
359+
360+
#### Source.default\_topic
361+
362+
```python
363+
def default_topic() -> Topic
364+
```
365+
366+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/docs/sources/quixstreams/sources/base.py#L292)
367+
368+
Return a default topic matching the source name.
369+
370+
The default topic will not be used if the topic has already been provided to the source.
371+
372+
373+
<br>
374+
***Returns:***
375+
376+
`:class:`quixstreams.models.topics.Topic`
377+

docs/build/build.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@
119119
"quixstreams.sinks.base.sink",
120120
]
121121
},
122+
"sources.md": {
123+
k: None
124+
for k in [
125+
"quixstreams.sources.base",
126+
]
127+
},
122128
}
123129

124130
# Go over all modules and assign them to doc files

0 commit comments

Comments
 (0)