Skip to content

Commit 7cb0a1b

Browse files
authored
Fix/docs fixes (#565)
* Fix links in docs * Fix Sources docstrings
1 parent 2e2675b commit 7cb0a1b

File tree

9 files changed

+87
-68
lines changed

9 files changed

+87
-68
lines changed

docs/branching.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ Most of branching's limitations are around:
134134

135135
## Branching Example
136136

137-
In this example, we have purchase events similar to our [purchase-filtering tutorial](../tutorials/purchase-filtering):
137+
In this example, we have purchase events similar to our [purchase-filtering tutorial](tutorials/purchase-filtering/tutorial.md):
138138

139139
In short, customers who have either a `Gold`, `Silver`, or `Bronze` membership
140140
are making purchases at this store.

docs/connectors/sinks/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ This is to ensure no further mutations can be applied to the outbound data.
4343
_However_, you can continue other operations with other branches, including using
4444
the same `Sink` to push another value (with another `SDF.sink()` call).
4545

46-
[Learn more about _branching_ here](../../advanced/branching.md).
46+
[Learn more about _branching_ here](../../branching.md).
4747

4848
### Branching after SDF.sink()
4949

docs/connectors/sources/custom-sources.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
Quix Streams also provides a set of classes to help users implement custom sources.
44

5-
* [`quixstreams.sources.base.Source`](../../api-reference/sources.md#sources): A subclass of `BaseSource` that implements some helpful methods for writing sources. We recommend subclassing `Source` instead of `BaseSource`.
6-
* [`quixstreams.sources.base.BaseSource`](../../api-reference/sources.md#BaseSource): This is the base class for all other sources. It defines the must have methods.
5+
* [`quixstreams.sources.base.Source`](../../api-reference/sources.md#source): A subclass of `BaseSource` that implements some helpful methods for writing sources. We recommend subclassing `Source` instead of `BaseSource`.
6+
* [`quixstreams.sources.base.BaseSource`](../../api-reference/sources.md#basesource): This is the base class for all other sources. It defines the must-have methods.
77

88
## Source
99

docs/connectors/sources/quix-source.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,4 @@ if __name__ == "__main__":
3131

3232
## SDK Token
3333

34-
The Quix Environment Source requires the SDK token of the source environment. [Click here](../../../develop/authentication/streaming-token.md) for more information on SDK tokens.
34+
The Quix Environment Source requires the SDK token of the source environment. [Click here](https://quix.io/docs/develop/authentication/streaming-token.html) for more information on SDK tokens.

docs/processing.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ Since version 2.6, the `StreamingDataFrame.to_topic()` method always forwards th
439439

440440
This way, **the outgoing messages will be produced with the identical timestamps and headers as they were received with** by default.
441441

442-
To change the timestamp of the message, use the `StreamingDataFrame.set_timestamp()` API, described in [this section](#updating-timestamps).
442+
To change the timestamp of the message, use the `StreamingDataFrame.set_timestamp()` API, described in [this section](#updating-kafka-timestamps).
443443

444444
## Using Columns and DataFrame API
445445

@@ -673,7 +673,7 @@ To update the current timestamp, use the `StreamingDataFrame.set_timestamp()` AP
673673

674674
There are several things to note:
675675

676-
- The new timestamp will be used by the `StreamingDataFrame.to_topic()` method when the message is produced (see [here](#timestamps-of-produced-messages)).
676+
- The new timestamp will be used by the `StreamingDataFrame.to_topic()` method when the message is produced (see [here](#timestamps-and-headers-of-produced-messages)).
677677
- The timestamp will also determine the applicable window in the windowed aggregations.
678678

679679
**Example:**
@@ -757,7 +757,7 @@ The `quixstreams.message_context()` function should be called
757757
only from the custom functions during processing.
758758

759759
> **_NOTE:_** Before quixstreams==2.6.0, `MessageContext` also provided access to message keys and timestamps.
760-
> To access keys and timestamps in `quixstreams>=2.6`, use the approach described in this [section](#accessing-kafka-keys-anad-timestamps)
760+
> To access keys and timestamps in `quixstreams>=2.6`, use the approach described in this [section](#accessing-kafka-keys-timestamps-and-headers)
761761
762762

763763
**Example:**

docs/tutorials/anomaly-detection/tutorial.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ NOTE: because we use "Current" windowing, we may produce a lot of "duplicate" al
206206
### 1. Run Kafka
207207
First, have a running Kafka cluster.
208208

209-
To conveniently follow along with this tutorial, just [run this simple one-liner](../tutorials-overview.md#running-kafka-locally).
209+
To conveniently follow along with this tutorial, just [run this simple one-liner](../README.md#running-kafka-locally).
210210

211211
### 2. Install Quix Streams
212212
In your python environment, run `pip install quixstreams`

docs/tutorials/purchase-filtering/tutorial.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ NOTE: by default, our outgoing Kafka key is persisted from the input message.
285285
### 1. Run Kafka
286286
First, have a running Kafka cluster.
287287

288-
To conveniently follow along with this tutorial, just [run this simple one-liner](../tutorials-overview.md#running-kafka-locally).
288+
To conveniently follow along with this tutorial, just [run this simple one-liner](../README.md#running-kafka-locally).
289289

290290
### 2. Install Quix Streams
291291
In your python environment, run `pip install quixstreams`

docs/tutorials/word-count/tutorial.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ NOTE: This is how you would see the values in the Kafka topic `product_review_wo
214214
### 1. Run Kafka
215215
First, have a running Kafka cluster.
216216

217-
To conveniently follow along with this tutorial, just [run this simple one-liner](../tutorials-overview.md#running-kafka-locally).
217+
To conveniently follow along with this tutorial, just [run this simple one-liner](../README.md#running-kafka-locally).
218218

219219
### 2. Install Quix Streams
220220
In your python environment, run `pip install quixstreams`

quixstreams/sources/base/source.py

Lines changed: 76 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -26,38 +26,58 @@ class BaseSource(ABC):
2626
Sources are executed in a sub-process of the main application.
2727
2828
To create your own source you need to implement:
29-
* `run`
30-
* `stop`
31-
* `default_topic`
29+
30+
* `start`
31+
* `stop`
32+
* `default_topic`
3233
3334
`BaseSource` is the most basic interface, and the framework expects every
34-
sources to implement it. Use `Source` to benefit from a base implementation.
35+
source to implement it.
36+
Use `Source` to benefit from a base implementation.
3537
3638
You can connect a source to a StreamingDataframe using the Application.
3739
3840
Example snippet:
3941
4042
```python
41-
from quixstreams import Application
42-
from quixstreams.sources import Source
43+
class RandomNumbersSource(BaseSource):
44+
def __init__(self):
45+
super().__init__()
46+
self._running = False
47+
48+
def start(self):
49+
self._running = True
50+
51+
while self._running:
52+
number = random.randint(0, 100)
53+
serialized = self._producer_topic.serialize(value=number)
54+
self._producer.produce(
55+
topic=self._producer_topic.name,
56+
key=serialized.key,
57+
value=serialized.value,
58+
)
59+
60+
def stop(self):
61+
self._running = False
62+
63+
def default_topic(self) -> Topic:
64+
return Topic(
65+
name="topic-name",
66+
value_deserializer="json",
67+
value_serializer="json",
68+
)
4369
44-
class MySource(Source):
45-
def run(self):
46-
for _ in range(1000):
47-
self.produce(
48-
key="foo",
49-
value=b"foo"
50-
)
5170
5271
def main():
53-
app = Application()
54-
source = MySource(name="my_source")
72+
app = Application(broker_address="localhost:9092")
73+
source = RandomNumbersSource()
5574
5675
sdf = app.dataframe(source=source)
5776
sdf.print(metadata=True)
5877
5978
app.run()
6079
80+
6181
if __name__ == "__main__":
6282
main()
6383
```
@@ -104,23 +124,6 @@ def stop(self) -> None:
104124
This method is triggered when the application is shutting down.
105125
106126
The source must ensure that the `run` method is completed soon.
107-
108-
Example Snippet:
109-
110-
```python
111-
class MySource(BaseSource):
112-
def start(self):
113-
self._running = True
114-
while self._running:
115-
self._producer.produce(
116-
topic=self._producer_topic,
117-
value="foo",
118-
)
119-
time.sleep(1)
120-
121-
def stop(self):
122-
self._running = False
123-
```
124127
"""
125128

126129
@abstractmethod
@@ -134,22 +137,50 @@ def default_topic(self) -> Topic:
134137

135138
class Source(BaseSource):
136139
"""
137-
BaseSource class implementation providing
140+
A base class for custom Sources that provides a basic implementation of `BaseSource`
141+
interface.
142+
It is recommended to interface to create custom sources.
138143
139-
Implementation for the abstract method:
140-
* `default_topic`
141-
* `start`
142-
* `stop`
144+
Subclass it and implement the `run` method to fetch data and produce it to Kafka.
143145
144-
Helper methods
145-
* serialize
146-
* produce
147-
* flush
146+
Example:
148147
149-
Helper property
150-
* running
148+
```python
149+
from quixstreams import Application
150+
import random
151151
152-
Subclass it and implement the `run` method to fetch data and produce it to Kafka.
152+
from quixstreams.sources import Source
153+
154+
155+
class RandomNumbersSource(Source):
156+
def run(self):
157+
while self.running:
158+
number = random.randint(0, 100)
159+
serialized = self._producer_topic.serialize(value=number)
160+
self.produce(key=str(number), value=serialized.value)
161+
162+
163+
def main():
164+
app = Application(broker_address="localhost:9092")
165+
source = RandomNumbersSource(name="random-source")
166+
167+
sdf = app.dataframe(source=source)
168+
sdf.print(metadata=True)
169+
170+
app.run()
171+
172+
173+
if __name__ == "__main__":
174+
main()
175+
```
176+
177+
178+
Helper methods and properties:
179+
180+
* `serialize()`
181+
* `produce()`
182+
* `flush()`
183+
* `running`
153184
"""
154185

155186
def __init__(self, name: str, shutdown_timeout: float = 10) -> None:
@@ -171,18 +202,6 @@ def running(self) -> bool:
171202
Property indicating if the source is running.
172203
173204
The `stop` method will set it to `False`. Use it to stop the source gracefully.
174-
175-
Example snippet:
176-
177-
```python
178-
class MySource(Source):
179-
def run(self):
180-
while self.running:
181-
self.produce(
182-
value="foo",
183-
)
184-
time.sleep(1)
185-
```
186205
"""
187206
return self._running
188207

@@ -192,7 +211,7 @@ def cleanup(self, failed: bool) -> None:
192211
193212
Use it to clean up the resources and shut down the source gracefully.
194213
195-
It flush the producer when `_run` completes successfully.
214+
It flushes the producer when `_run` completes successfully.
196215
"""
197216
if not failed:
198217
self.flush(self.shutdown_timeout / 2)

0 commit comments

Comments
 (0)