@@ -1344,7 +1344,8 @@ like `sum`, `count`, etc. and applied to the StreamingDataFrame.
1344
1344
#### StreamingDataFrame.drop
1345
1345
1346
1346
```python
1347
- def drop(columns: Union[str, List[str]]) -> Self
1347
+ def drop(columns: Union[str, List[str]],
1348
+ errors: Literal["ignore", "raise"] = "raise") -> Self
1348
1349
```
1349
1350
1350
1351
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L969)
@@ -1368,6 +1369,8 @@ sdf.drop(["x", "y"])
1368
1369
**Arguments**:
1369
1370
1370
1371
- `columns`: a single column name or a list of names, where names are `str`
1372
+ - `errors`: If "ignore", suppress error and only existing labels are dropped.
1373
+ Default - `"raise"`.
1371
1374
1372
1375
**Returns**:
1373
1376
@@ -7770,6 +7773,7 @@ def __init__(broker_address: Optional[Union[str, ConnectionConfig]] = None,
7770
7773
consumer_group: Optional[str] = None,
7771
7774
auto_offset_reset: AutoOffsetReset = "latest",
7772
7775
commit_interval: float = 5.0,
7776
+ commit_every: int = 0,
7773
7777
consumer_extra_config: Optional[dict] = None,
7774
7778
producer_extra_config: Optional[dict] = None,
7775
7779
state_dir: str = "state",
@@ -7814,6 +7818,15 @@ Default - "quixstreams-default" (set during init)
7814
7818
>***NOTE:*** Quix Applications will prefix it with the Quix workspace id.
7815
7819
- `commit_interval`: How often to commit the processed messages in seconds.
7816
7820
Default - 5.0.
7821
+ - `commit_every`: Commit the checkpoint after processing N messages.
7822
+ Use this parameter for more granular control of the commit schedule.
7823
+ If the value is > 0, the application will commit the checkpoint after
7824
+ processing the specified number of messages across all the assigned
7825
+ partitions.
7826
+ If the value is <= 0, only the `commit_interval` will be considered.
7827
+ Default - 0.
7828
+ >***NOTE:*** Only input offsets are counted, and the application
7829
+ > may produce more results than the number of incoming messages.
7817
7830
- `auto_offset_reset`: Consumer `auto.offset.reset` setting
7818
7831
- `consumer_extra_config`: A dictionary with additional options that
7819
7832
will be passed to `confluent_kafka.Consumer` as is.
@@ -8388,7 +8401,9 @@ def expired() -> bool
8388
8401
8389
8402
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/checkpointing/checkpoint.py#L57)
8390
8403
8391
- Returns `True` if checkpoint deadline has expired.
8404
+ Returns `True` if checkpoint deadline has expired OR
8405
+ if the total number of processed offsets exceeded the "commit_every" limit
8406
+ when it's defined.
8392
8407
8393
8408
<a id="quixstreams.checkpointing.checkpoint.Checkpoint.empty"></a>
8394
8409
0 commit comments