@@ -9098,38 +9098,58 @@ This is the base class for all sources.
9098
9098
Sources are executed in a sub-process of the main application.
9099
9099
9100
9100
To create your own source you need to implement:
9101
- * `run`
9102
- * `stop`
9103
- * `default_topic`
9101
+
9102
+ * `start`
9103
+ * `stop`
9104
+ * `default_topic`
9104
9105
9105
9106
`BaseSource` is the most basic interface, and the framework expects every
9106
- sources to implement it. Use `Source` to benefit from a base implementation.
9107
+ source to implement it.
9108
+ Use `Source` to benefit from a base implementation.
9107
9109
9108
9110
You can connect a source to a StreamingDataframe using the Application.
9109
9111
9110
9112
Example snippet:
9111
9113
9112
9114
```python
9113
- from quixstreams import Application
9114
- from quixstreams.sources import Source
9115
+ class RandomNumbersSource(BaseSource):
9116
+ def __init__(self):
9117
+ super().__init__()
9118
+ self._running = False
9119
+
9120
+ def start(self):
9121
+ self._running = True
9122
+
9123
+ while self._running:
9124
+ number = random.randint(0, 100)
9125
+ serialized = self._producer_topic.serialize(value=number)
9126
+ self._producer.produce(
9127
+ topic=self._producer_topic.name,
9128
+ key=serialized.key,
9129
+ value=serialized.value,
9130
+ )
9131
+
9132
+ def stop(self):
9133
+ self._running = False
9134
+
9135
+ def default_topic(self) -> Topic:
9136
+ return Topic(
9137
+ name="topic-name",
9138
+ value_deserializer="json",
9139
+ value_serializer="json",
9140
+ )
9115
9141
9116
- class MySource(Source):
9117
- def run(self):
9118
- for _ in range(1000):
9119
- self.produce(
9120
- key="foo",
9121
- value=b"foo"
9122
- )
9123
9142
9124
9143
def main():
9125
- app = Application()
9126
- source = MySource(name="my_source" )
9144
+ app = Application(broker_address="localhost:9092" )
9145
+ source = RandomNumbersSource( )
9127
9146
9128
9147
sdf = app.dataframe(source=source)
9129
9148
sdf.print(metadata=True)
9130
9149
9131
9150
app.run()
9132
9151
9152
+
9133
9153
if __name__ == "__main__":
9134
9154
main()
9135
9155
```
@@ -9142,7 +9162,7 @@ if __name__ == "__main__":
9142
9162
def configure(topic: Topic, producer: RowProducer) -> None
9143
9163
```
9144
9164
9145
- [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L74 )
9165
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L94 )
9146
9166
9147
9167
This method is triggered when the source is registered to the Application.
9148
9168
@@ -9157,7 +9177,7 @@ It configures the source's Kafka producer and the topic it will produce to.
9157
9177
def start() -> None
9158
9178
```
9159
9179
9160
- [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L93 )
9180
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L113 )
9161
9181
9162
9182
This method is triggered in the subprocess when the source is started.
9163
9183
@@ -9173,29 +9193,12 @@ Use it to fetch data and produce it to Kafka.
9173
9193
def stop() -> None
9174
9194
```
9175
9195
9176
- [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L102 )
9196
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L122 )
9177
9197
9178
9198
This method is triggered when the application is shutting down.
9179
9199
9180
9200
The source must ensure that the `run` method is completed soon.
9181
9201
9182
- Example Snippet:
9183
-
9184
- ```python
9185
- class MySource(BaseSource):
9186
- def start(self):
9187
- self._running = True
9188
- while self._running:
9189
- self._producer.produce(
9190
- topic=self._producer_topic,
9191
- value="foo",
9192
- )
9193
- time.sleep(1)
9194
-
9195
- def stop(self):
9196
- self._running = False
9197
- ```
9198
-
9199
9202
<a id="quixstreams.sources.base.source.BaseSource.default_topic"></a>
9200
9203
9201
9204
#### BaseSource.default\_topic
@@ -9205,7 +9208,7 @@ class MySource(BaseSource):
9205
9208
def default_topic() -> Topic
9206
9209
```
9207
9210
9208
- [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L127 )
9211
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L130 )
9209
9212
9210
9213
This method is triggered when the topic is not provided to the source.
9211
9214
@@ -9219,24 +9222,53 @@ The source must return a default topic configuration.
9219
9222
class Source(BaseSource)
9220
9223
```
9221
9224
9222
- [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L135)
9225
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L138)
9226
+
9227
+ A base class for custom Sources that provides a basic implementation of `BaseSource`
9228
+ interface.
9229
+ It is recommended to interface to create custom sources.
9223
9230
9224
- BaseSource class implementation providing
9231
+ Subclass it and implement the `run` method to fetch data and produce it to Kafka.
9225
9232
9226
- Implementation for the abstract method:
9227
- * `default_topic`
9228
- * `start`
9229
- * `stop`
9233
+ **Example**:
9230
9234
9231
- Helper methods
9232
- * serialize
9233
- * produce
9234
- * flush
9235
+
9236
+ ```python
9237
+ from quixstreams import Application
9238
+ import random
9235
9239
9236
- Helper property
9237
- * running
9240
+ from quixstreams.sources import Source
9238
9241
9239
- Subclass it and implement the `run` method to fetch data and produce it to Kafka.
9242
+
9243
+ class RandomNumbersSource(Source):
9244
+ def run(self):
9245
+ while self.running:
9246
+ number = random.randint(0, 100)
9247
+ serialized = self._producer_topic.serialize(value=number)
9248
+ self.produce(key=str(number), value=serialized.value)
9249
+
9250
+
9251
+ def main():
9252
+ app = Application(broker_address="localhost:9092")
9253
+ source = RandomNumbersSource(name="random-source")
9254
+
9255
+ sdf = app.dataframe(source=source)
9256
+ sdf.print(metadata=True)
9257
+
9258
+ app.run()
9259
+
9260
+
9261
+ if __name__ == "__main__":
9262
+ main()
9263
+ ```
9264
+
9265
+
9266
+ Helper methods and properties:
9267
+
9268
+ * `serialize()`
9269
+ * `produce()`
9270
+ * `flush()`
9271
+ * `running`
9240
9272
9241
9273
<a id="quixstreams.sources.base.source.Source.__init__"></a>
9242
9274
@@ -9246,7 +9278,7 @@ Subclass it and implement the `run` method to fetch data and produce it to Kafka
9246
9278
def __init__(name: str, shutdown_timeout: float = 10) -> None
9247
9279
```
9248
9280
9249
- [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L155 )
9281
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L186 )
9250
9282
9251
9283
**Arguments**:
9252
9284
@@ -9262,24 +9294,12 @@ def __init__(name: str, shutdown_timeout: float = 10) -> None
9262
9294
def running() -> bool
9263
9295
```
9264
9296
9265
- [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L169 )
9297
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L200 )
9266
9298
9267
9299
Property indicating if the source is running.
9268
9300
9269
9301
The `stop` method will set it to `False`. Use it to stop the source gracefully.
9270
9302
9271
- Example snippet:
9272
-
9273
- ```python
9274
- class MySource(Source):
9275
- def run(self):
9276
- while self.running:
9277
- self.produce(
9278
- value="foo",
9279
- )
9280
- time.sleep(1)
9281
- ```
9282
-
9283
9303
<a id="quixstreams.sources.base.source.Source.cleanup"></a>
9284
9304
9285
9305
#### Source.cleanup
@@ -9288,13 +9308,13 @@ class MySource(Source):
9288
9308
def cleanup(failed: bool) -> None
9289
9309
```
9290
9310
9291
- [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L189 )
9311
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L208 )
9292
9312
9293
9313
This method is triggered once the `run` method completes.
9294
9314
9295
9315
Use it to clean up the resources and shut down the source gracefully.
9296
9316
9297
- It flush the producer when `_run` completes successfully.
9317
+ It flushes the producer when `_run` completes successfully.
9298
9318
9299
9319
<a id="quixstreams.sources.base.source.Source.stop"></a>
9300
9320
@@ -9304,7 +9324,7 @@ It flush the producer when `_run` completes successfully.
9304
9324
def stop() -> None
9305
9325
```
9306
9326
9307
- [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L200 )
9327
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L219 )
9308
9328
9309
9329
This method is triggered when the application is shutting down.
9310
9330
@@ -9318,7 +9338,7 @@ It sets the `running` property to `False`.
9318
9338
def start()
9319
9339
```
9320
9340
9321
- [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L209 )
9341
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L228 )
9322
9342
9323
9343
This method is triggered in the subprocess when the source is started.
9324
9344
@@ -9333,7 +9353,7 @@ It marks the source as running, execute it's run method and ensure cleanup happe
9333
9353
def run()
9334
9354
```
9335
9355
9336
- [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L225 )
9356
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L244 )
9337
9357
9338
9358
This method is triggered in the subprocess when the source is started.
9339
9359
@@ -9351,7 +9371,7 @@ def serialize(key: Optional[object] = None,
9351
9371
timestamp_ms: Optional[int] = None) -> KafkaMessage
9352
9372
```
9353
9373
9354
- [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L233 )
9374
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L252 )
9355
9375
9356
9376
Serialize data to bytes using the producer topic serializers and return a `quixstreams.models.messages.KafkaMessage`.
9357
9377
@@ -9373,7 +9393,7 @@ def produce(value: Optional[Union[str, bytes]] = None,
9373
9393
buffer_error_max_tries: int = 3) -> None
9374
9394
```
9375
9395
9376
- [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L249 )
9396
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L268 )
9377
9397
9378
9398
Produce a message to the configured source topic in Kafka.
9379
9399
@@ -9385,7 +9405,7 @@ Produce a message to the configured source topic in Kafka.
9385
9405
def flush(timeout: Optional[float] = None) -> None
9386
9406
```
9387
9407
9388
- [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L274 )
9408
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L293 )
9389
9409
9390
9410
This method flush the producer.
9391
9411
@@ -9408,7 +9428,7 @@ None use producer default or -1 is infinite. Default: None
9408
9428
def default_topic() -> Topic
9409
9429
```
9410
9430
9411
- [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L292 )
9431
+ [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L311 )
9412
9432
9413
9433
Return a default topic matching the source name.
9414
9434
0 commit comments