What's Changed
π Interval joins: StreamingDataFrame.join_interval()
Use StreamingDataFrame.join_interval()
to join two topics into a new stream where each record is merged with records from the other topic that fall within a specified time interval.
This join is useful for cases where you need to match records that occur within a specific time window of each other, rather than just the latest record (as in as-of join).
from datetime import timedelta
from quixstreams import Application
app = Application(...)
sdf_measurements = app.dataframe(app.topic("measurements"))
sdf_events = app.dataframe(app.topic("events"))
# Join records from the topic "measurements"
# with records from "events" that occur within a 5-minute window
# before and after each measurement
sdf_joined = sdf_measurements.join_interval(
right=sdf_events,
how="inner", # Emit updates only if matches are found
on_merge="keep-left", # Prefer the columns from the left dataframe if they overlap
grace_ms=timedelta(days=7), # Keep the state for 7 days
backward_ms=timedelta(minutes=5), # Look for events up to 5 minutes before
forward_ms=timedelta(minutes=5), # Look for events up to 5 minutes after
)
if __name__ == '__main__':
app.run()
Please take a look at the Interval Join docs for more examples.
By @gwaramadze in #924
[breaking] π₯ Updated Application.run()
behavior with count
and timestamp
parameters
1. Changed the meaning of the count
parameter.
Previously, when calling Application.run(count=...)
or Application.run(count=..., timeout=...)
, the count
parameter meant
"Number of messages to process before stopping the app".
In this update, we're changing the meaning of the count
parameter to "number of outputs".
A simple way to think about outputs is "how many messages my application would send to an output topic."
This behavior is more intuitive instead of counting input messages.
Note that operations like filtering or aggregations reduce the number of outputs, and StreamingDataFrame.apply(..., expand=True)
may output more data than it receives.
2. Application.run()
can now collect and return outputs when collect=True
is passed.
You can now test and debug the applications more easily using count
and/or timeout
parameters:
from quixstreams import Application
app = Application(broker_address="localhost:9092")
topic = app.topic("some-topic")
# Assume the topic has one partition and three JSON messages:
# {"temperature": 30}
# {"temperature": 40}
# {"temperature": 50}
sdf = app.dataframe(topic=topic)
# Process one output and collect the values (stops if no messages for 10s)
result_values_only = app.run(count=1, timeout=10, collect=True)
# >>> result_values_only = [
# {"temperature": 30}
# ]
# Process one output and collect the values with metadata (stops if no messages for 10s)
result_values_and_metadata = app.run(count=1, timeout=10, collect=True, metadata=True)
# >>> result_values_and_metadata = [
# {"temperature": 40, "_key": "<message_key>", "_timestamp": 123, "_offset": 1, "_topic": "some-topic", "_partition": 1, "_headers": None},
# ]
# Process one output and without collecting (stops if no messages for 10s)
result_empty = app.run(count=1, timeout=10, collect=False)
# >>> result_empty = []
See more details in Inspecting Data and Debugging section.
By @daniil-quix in #932
π Log recovery progress
The application now logs the recovery progress every 10s to simplify the monitoring:
...
[INFO] [quixstreams] : Recovery progress for <RecoveryPartition "changelog__state-store[0]">: 100 / 1000
[INFO] [quixstreams] : Recovery progress for <RecoveryPartition "changelog__state-store[0]">: 199 / 1000
...
By @gwaramadze in #941
π Docs
- Add docs for InfluxDB1Sink by @daniil-quix in #947 #949
π οΈ Internal
- Use generic WindowType by @gwaramadze in #942
- Remove the window expiration logs by @daniil-quix in #945
π Connectors
- PostgresqlSink: correctly handle jsonb values by @tim-quix in #935
- BigQuerySink: use quix-streams user agent in requests by @gwaramadze in #943
- InfluxDB1Sink: new by @tim-quix in #936
Dependencies
- Bump mypy from 1.16.0 to 1.16.1 by @dependabot in #934
- Bump types-requests from 2.32.0.20250602 to 2.32.4.20250611 by @dependabot in #933
Full Changelog: v3.16.1...v3.17.0