File tree Expand file tree Collapse file tree 1 file changed +8
-5
lines changed Expand file tree Collapse file tree 1 file changed +8
-5
lines changed Original file line number Diff line number Diff line change @@ -94,7 +94,7 @@ let environment = Environment::builder().build().await?;
94
94
let producer = environment.producer().name("myproducer").build("mystream").await?;
95
95
for i in 0..10 {
96
96
producer
97
- .send (Message::builder().body(format!("message{}", i)).build())
97
+ .send_with_confirm (Message::builder().body(format!("message{}", i)).build())
98
98
.await?;
99
99
}
100
100
producer.close().await?;
@@ -111,10 +111,13 @@ let environment = Environment::builder().build().await?;
111
111
let mut consumer = environment.consumer().build("mystream").await?;
112
112
let handle = consumer.handle();
113
113
task::spawn(async move {
114
- while let Some(delivery) = consumer.next().await {
115
- println!("Got message {:?}",delivery);
116
- }
117
- });
114
+ while let Some(delivery) = consumer.next().await {
115
+ let d = delivery.unwrap();
116
+ println!("Got message: {:#?} with offset: {}",
117
+ d.message().data().map(|data| String::from_utf8(data.to_vec()).unwrap()),
118
+ d.offset(),);
119
+ }
120
+ });
118
121
// wait 10 second and then close the consumer
119
122
sleep(Duration::from_secs(10)).await;
120
123
handle.close().await?;
You can’t perform that action at this time.
0 commit comments