Skip to content

Commit 7d03b52

Browse files
fixing bug when consuming from OffsetSpecification:Offset (#223)
1 parent b960884 commit 7d03b52

File tree

3 files changed

+68
-1
lines changed

3 files changed

+68
-1
lines changed

protocol/src/commands/subscribe.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ impl Command for SubscribeCommand {
9494
}
9595

9696
#[cfg_attr(test, derive(fake::Dummy))]
97-
#[derive(PartialEq, Eq, Debug)]
97+
#[derive(PartialEq, Eq, Debug, Clone)]
9898
pub enum OffsetSpecification {
9999
First,
100100
Last,

src/consumer.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub struct Consumer {
4040
struct ConsumerInternal {
4141
client: Client,
4242
stream: String,
43+
offset_specification: OffsetSpecification,
4344
subscription_id: u8,
4445
sender: Sender<Result<Delivery, ConsumerDeliveryError>>,
4546
closed: Arc<AtomicBool>,
@@ -110,6 +111,7 @@ impl ConsumerBuilder {
110111
subscription_id,
111112
stream: stream.to_string(),
112113
client: client.clone(),
114+
offset_specification: self.offset_specification.clone(),
113115
sender: tx,
114116
closed: Arc::new(AtomicBool::new(false)),
115117
waker: AtomicWaker::new(),
@@ -243,6 +245,12 @@ impl MessageHandler for ConsumerMessageHandler {
243245
let len = delivery.messages.len();
244246
trace!("Got delivery with messages {}", len);
245247
for message in delivery.messages {
248+
if let OffsetSpecification::Offset(offset_) = self.0.offset_specification {
249+
if offset_ > offset {
250+
offset += 1;
251+
continue;
252+
}
253+
}
246254
let _ = self
247255
.0
248256
.sender

tests/integration/consumer_test.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,65 @@ async fn consumer_test() {
5353
producer.close().await.unwrap();
5454
}
5555

56+
#[tokio::test(flavor = "multi_thread")]
57+
async fn consumer_test_offset_specification_offset() {
58+
let env = TestEnvironment::create().await;
59+
let reference: String = Faker.fake();
60+
let mut messages_received = 0;
61+
let mut first_offset = 0;
62+
63+
let message_count = 10;
64+
let mut producer = env
65+
.env
66+
.producer()
67+
.name(&reference)
68+
.build(&env.stream)
69+
.await
70+
.unwrap();
71+
72+
let mut consumer = env
73+
.env
74+
.consumer()
75+
.offset(OffsetSpecification::Offset(5))
76+
.build(&env.stream)
77+
.await
78+
.unwrap();
79+
80+
for n in 0..message_count - 1 {
81+
let _ = producer
82+
.send_with_confirm(Message::builder().body(format!("message{}", n)).build())
83+
.await
84+
.unwrap();
85+
}
86+
87+
let _ = producer
88+
.send_with_confirm(
89+
Message::builder()
90+
.body(format!("marker{}", message_count - 1))
91+
.build(),
92+
)
93+
.await
94+
.unwrap();
95+
96+
while let Some(delivery) = consumer.next().await {
97+
let d = delivery.unwrap();
98+
if first_offset == 0 {
99+
first_offset = d.offset();
100+
}
101+
messages_received += 1;
102+
103+
if String::from_utf8_lossy(d.message().data().unwrap()).contains("marker") {
104+
break;
105+
}
106+
}
107+
108+
consumer.handle().close().await.unwrap();
109+
producer.close().await.unwrap();
110+
111+
assert!(first_offset == 5);
112+
assert!(messages_received == 5);
113+
}
114+
56115
#[tokio::test(flavor = "multi_thread")]
57116
async fn consumer_close_test() {
58117
let env = TestEnvironment::create().await;

0 commit comments

Comments
 (0)