Skip to content

Commit 26db741

Browse files
authored
Update the propagation of keys, timestamps and headers in StreamingDataFrame (#375)
1 parent 1ae078b commit 26db741

File tree

44 files changed

+6462
-4781
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+6462
-4781
lines changed

docs/api-reference/application.md

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
class Application()
1111
```
1212

13-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L54)
13+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/4c9a9ea1533d8d925d1d33952dc650e142cb6e62/quixstreams/app.py#L55)
1414

1515
The main Application class.
1616

@@ -82,7 +82,7 @@ def __init__(broker_address: Optional[Union[str, ConnectionConfig]] = None,
8282
topic_create_timeout: float = 60)
8383
```
8484

85-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L92)
85+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/4c9a9ea1533d8d925d1d33952dc650e142cb6e62/quixstreams/app.py#L93)
8686

8787

8888
<br>
@@ -179,7 +179,7 @@ def Quix(cls,
179179
topic_create_timeout: float = 60) -> Self
180180
```
181181

182-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L305)
182+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/4c9a9ea1533d8d925d1d33952dc650e142cb6e62/quixstreams/app.py#L310)
183183

184184
>***NOTE:*** DEPRECATED: use Application with `quix_sdk_token` argument instead.
185185

@@ -289,7 +289,7 @@ def topic(name: str,
289289
timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic
290290
```
291291

292-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L443)
292+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/4c9a9ea1533d8d925d1d33952dc650e142cb6e62/quixstreams/app.py#L448)
293293

294294
Create a topic definition.
295295

@@ -370,7 +370,7 @@ topic = app.topic("input-topic", timestamp_extractor=custom_ts_extractor)
370370
def dataframe(topic: Topic) -> StreamingDataFrame
371371
```
372372

373-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L523)
373+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/4c9a9ea1533d8d925d1d33952dc650e142cb6e62/quixstreams/app.py#L528)
374374

375375
A simple helper method that generates a `StreamingDataFrame`, which is used
376376

@@ -420,7 +420,7 @@ to be used as an input topic.
420420
def stop(fail: bool = False)
421421
```
422422

423-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L562)
423+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/4c9a9ea1533d8d925d1d33952dc650e142cb6e62/quixstreams/app.py#L567)
424424

425425
Stop the internal poll loop and the message processing.
426426

@@ -447,7 +447,7 @@ to unhandled exception, and it shouldn't commit the current checkpoint.
447447
def get_producer() -> Producer
448448
```
449449

450-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L585)
450+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/4c9a9ea1533d8d925d1d33952dc650e142cb6e62/quixstreams/app.py#L590)
451451

452452
Create and return a pre-configured Producer instance.
453453
The Producer is initialized with params passed to Application.
@@ -482,7 +482,7 @@ with app.get_producer() as producer:
482482
def get_consumer(auto_commit_enable: bool = True) -> Consumer
483483
```
484484

485-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L615)
485+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/4c9a9ea1533d8d925d1d33952dc650e142cb6e62/quixstreams/app.py#L620)
486486

487487
Create and return a pre-configured Consumer instance.
488488
The Consumer is initialized with params passed to Application.
@@ -527,7 +527,7 @@ with app.get_consumer() as consumer:
527527
def clear_state()
528528
```
529529

530-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L658)
530+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/4c9a9ea1533d8d925d1d33952dc650e142cb6e62/quixstreams/app.py#L663)
531531

532532
Clear the state of the application.
533533

@@ -541,7 +541,7 @@ Clear the state of the application.
541541
def run(dataframe: StreamingDataFrame)
542542
```
543543

544-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L664)
544+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/4c9a9ea1533d8d925d1d33952dc650e142cb6e62/quixstreams/app.py#L669)
545545

546546
Start processing data from Kafka using provided `StreamingDataFrame`
547547

docs/api-reference/context.md

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
def set_message_context(context: Optional[MessageContext])
1313
```
1414

15-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/context.py#L21)
15+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/4c9a9ea1533d8d925d1d33952dc650e142cb6e62/quixstreams/context.py#L20)
1616

1717
Set a MessageContext for the current message in the given `contextvars.Context`
1818

@@ -55,7 +55,7 @@ sdf = sdf.update(lambda value: alter_context(value))
5555
def message_context() -> MessageContext
5656
```
5757

58-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/context.py#L52)
58+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/4c9a9ea1533d8d925d1d33952dc650e142cb6e62/quixstreams/context.py#L51)
5959

6060
Get a MessageContext for the current message, which houses most of the message
6161

@@ -86,37 +86,3 @@ sdf = sdf.apply(lambda value: 1 if message_context().partition == 2 else 0)
8686

8787
instance of `MessageContext`
8888

89-
<a id="quixstreams.context.message_key"></a>
90-
91-
<br><br>
92-
93-
#### message\_key
94-
95-
```python
96-
def message_key() -> Any
97-
```
98-
99-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/context.py#L83)
100-
101-
Get the current message's key.
102-
103-
104-
<br>
105-
***Example Snippet:***
106-
107-
```python
108-
from quixstreams import Application, message_key
109-
110-
# Changes the current sdf value based on what the message key is.
111-
112-
app = Application()
113-
sdf = app.dataframe()
114-
sdf = sdf.apply(lambda value: 1 if message_key() == b'1' else 0)
115-
```
116-
117-
118-
<br>
119-
***Returns:***
120-
121-
a deserialized message key
122-

0 commit comments

Comments
 (0)