Skip to content

Commit 5aa44ce

Browse files
committed
fixing bug when consuming from OffsetSpecification:Offset
1 parent b960884 commit 5aa44ce

File tree

3 files changed

+65
-1
lines changed

3 files changed

+65
-1
lines changed

protocol/src/commands/subscribe.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ impl Command for SubscribeCommand {
9494
}
9595

9696
#[cfg_attr(test, derive(fake::Dummy))]
97-
#[derive(PartialEq, Eq, Debug)]
97+
#[derive(PartialEq, Eq, Debug, Clone)]
9898
pub enum OffsetSpecification {
9999
First,
100100
Last,

src/consumer.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub struct Consumer {
4040
struct ConsumerInternal {
4141
client: Client,
4242
stream: String,
43+
offset_specification: OffsetSpecification,
4344
subscription_id: u8,
4445
sender: Sender<Result<Delivery, ConsumerDeliveryError>>,
4546
closed: Arc<AtomicBool>,
@@ -110,6 +111,7 @@ impl ConsumerBuilder {
110111
subscription_id,
111112
stream: stream.to_string(),
112113
client: client.clone(),
114+
offset_specification: self.offset_specification.clone(),
113115
sender: tx,
114116
closed: Arc::new(AtomicBool::new(false)),
115117
waker: AtomicWaker::new(),
@@ -243,6 +245,12 @@ impl MessageHandler for ConsumerMessageHandler {
243245
let len = delivery.messages.len();
244246
trace!("Got delivery with messages {}", len);
245247
for message in delivery.messages {
248+
if let OffsetSpecification::Offset(offset_) = self.0.offset_specification {
249+
if offset_ > offset {
250+
offset += 1;
251+
continue;
252+
}
253+
}
246254
let _ = self
247255
.0
248256
.sender

tests/integration/consumer_test.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,62 @@ async fn consumer_test() {
5353
producer.close().await.unwrap();
5454
}
5555

56+
#[tokio::test(flavor = "multi_thread")]
57+
async fn consumer_test_offset_specification_offset() {
58+
let env = TestEnvironment::create().await;
59+
let reference: String = Faker.fake();
60+
let mut messages_received = 0;
61+
let mut first_offset = 0;
62+
63+
let message_count = 10;
64+
let mut producer = env
65+
.env
66+
.producer()
67+
.name(&reference)
68+
.build(&env.stream)
69+
.await
70+
.unwrap();
71+
72+
let mut consumer = env
73+
.env
74+
.consumer()
75+
.offset(OffsetSpecification::Offset(5))
76+
.build(&env.stream)
77+
.await
78+
.unwrap();
79+
80+
for n in 0..message_count-1 {
81+
let _ = producer
82+
.send_with_confirm(Message::builder().body(format!("message{}", n)).build())
83+
.await
84+
.unwrap();
85+
}
86+
87+
let _ = producer
88+
.send_with_confirm(Message::builder().body(format!("marker{}", message_count-1)).build())
89+
.await
90+
.unwrap();
91+
92+
while let Some(delivery) = consumer.next().await {
93+
let d = delivery.unwrap();
94+
if first_offset == 0 {
95+
96+
first_offset = d.offset();
97+
}
98+
messages_received += 1;
99+
100+
if String::from_utf8_lossy(d.message().data().unwrap()).contains("marker") {
101+
break;
102+
}
103+
}
104+
105+
consumer.handle().close().await.unwrap();
106+
producer.close().await.unwrap();
107+
108+
assert!(first_offset == 5);
109+
assert!(messages_received == 5);
110+
}
111+
56112
#[tokio::test(flavor = "multi_thread")]
57113
async fn consumer_close_test() {
58114
let env = TestEnvironment::create().await;

0 commit comments

Comments
 (0)