Skip to content

Commit 2a83cbf

Browse files
authored
Add TryChunks and TryChunksError types for error handling in chunked streams (#31)
1 parent 6307e0a commit 2a83cbf

File tree

3 files changed

+341
-19
lines changed

3 files changed

+341
-19
lines changed

README.md

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,29 @@ async fn main() {
4141

4242
This creates chunks of up to 5 items with a 10-second timeout.
4343

44+
## TryChunksTimeout
45+
46+
For streams that yield `Result` values, use `try_chunks_timeout` to batch successful values and immediately propagate errors:
47+
48+
```rust
49+
use std::time::Duration;
50+
use futures::{stream, StreamExt};
51+
use futures_batch::TryChunksTimeoutStreamExt;
52+
53+
#[tokio::main]
54+
async fn main() {
55+
let results = stream::iter((0..10).map(|i| if i == 5 { Err("error") } else { Ok(i) }))
56+
.try_chunks_timeout(3, Duration::from_secs(10))
57+
.collect::<Vec<_>>()
58+
.await;
59+
60+
// Results in: [Ok([0, 1, 2]), Ok([3, 4]), Err("error"), Ok([6, 7, 8]), Ok([9])]
61+
println!("{:?}", results);
62+
}
63+
```
64+
65+
This batches `Ok` values until the buffer is full or timeout occurs, while immediately propagating any `Err` values.
66+
4467
## Features
4568

4669
### `sink` (optional)
@@ -52,7 +75,7 @@ Enable `Sink` support for bidirectional streams:
5275
futures-batch = { version = "0.7", features = ["sink"] }
5376
```
5477

55-
When enabled, `ChunksTimeout` implements `Sink` and forwards sink operations to the underlying stream.
78+
When enabled, both `ChunksTimeout` and `TryChunksTimeout` implement `Sink` and forward sink operations to the underlying stream.
5679

5780
## Performance
5881

benches/futures_batch_benchmark.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
1+
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
22
use futures::stream::{self, Stream};
33
use futures_batch::ChunksTimeoutStreamExt;
44
use std::time::Duration;

0 commit comments

Comments
 (0)