Skip to content

Commit 53c404e

Browse files
committed
fix bug in consumer when starting from OffsetSpecification::Offset
1 parent d331ac8 commit 53c404e

File tree

4 files changed

+70
-3
lines changed

4 files changed

+70
-3
lines changed

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
FROM rabbitmq:3.13-rc-management
1+
FROM rabbitmq:3.13-management
22

33
COPY .ci/conf/rabbitmq.conf /etc/rabbitmq/rabbitmq.conf
44
COPY .ci/conf/enabled_plugins /etc/rabbitmq/enabled_plugins
55

6-
COPY .ci/certs /etc/rabbitmq/certs
6+
COPY .ci/certs /etc/rabbitmq/certs

protocol/src/commands/subscribe.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ impl Command for SubscribeCommand {
9494
}
9595

9696
#[cfg_attr(test, derive(fake::Dummy))]
97-
#[derive(PartialEq, Eq, Debug)]
97+
#[derive(PartialEq, Eq, Debug, Clone)]
9898
pub enum OffsetSpecification {
9999
First,
100100
Last,

src/consumer.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub struct Consumer {
4040
struct ConsumerInternal {
4141
client: Client,
4242
stream: String,
43+
offset_specification: OffsetSpecification,
4344
subscription_id: u8,
4445
sender: Sender<Result<Delivery, ConsumerDeliveryError>>,
4546
closed: Arc<AtomicBool>,
@@ -95,6 +96,7 @@ impl ConsumerBuilder {
9596
subscription_id,
9697
stream: stream.to_string(),
9798
client: client.clone(),
99+
offset_specification: self.offset_specification.clone(),
98100
sender: tx,
99101
closed: Arc::new(AtomicBool::new(false)),
100102
waker: AtomicWaker::new(),
@@ -229,6 +231,13 @@ impl MessageHandler for ConsumerMessageHandler {
229231
let len = delivery.messages.len();
230232
trace!("Got delivery with messages {}", len);
231233
for message in delivery.messages {
234+
if let OffsetSpecification::Offset(offset_) = self.0.offset_specification {
235+
if offset_ > offset {
236+
offset += 1;
237+
continue;
238+
}
239+
}
240+
232241
let _ = self
233242
.0
234243
.sender

tests/integration/consumer_test.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,64 @@ async fn consumer_test() {
5353
producer.close().await.unwrap();
5454
}
5555

56+
57+
#[tokio::test(flavor = "multi_thread")]
58+
async fn consumer_test_offset_specification_offset() {
59+
let env = TestEnvironment::create().await;
60+
let reference: String = Faker.fake();
61+
let mut messages_received = 0;
62+
let mut first_offset = 0;
63+
64+
let message_count = 10;
65+
let mut producer = env
66+
.env
67+
.producer()
68+
.name(&reference)
69+
.build(&env.stream)
70+
.await
71+
.unwrap();
72+
73+
let mut consumer = env
74+
.env
75+
.consumer()
76+
.offset(OffsetSpecification::Offset(5))
77+
.build(&env.stream)
78+
.await
79+
.unwrap();
80+
81+
for n in 0..message_count-1 {
82+
let _ = producer
83+
.send_with_confirm(Message::builder().body(format!("message{}", n)).build())
84+
.await
85+
.unwrap();
86+
}
87+
88+
let _ = producer
89+
.send_with_confirm(Message::builder().body(format!("marker{}", message_count-1)).build())
90+
.await
91+
.unwrap();
92+
93+
while let Some(delivery) = consumer.next().await {
94+
let d = delivery.unwrap();
95+
if first_offset == 0 {
96+
97+
first_offset = d.offset();
98+
}
99+
messages_received += 1;
100+
101+
if String::from_utf8_lossy(d.message().data().unwrap()).contains("marker") {
102+
break;
103+
}
104+
}
105+
106+
consumer.handle().close().await.unwrap();
107+
producer.close().await.unwrap();
108+
109+
assert!(first_offset == 5);
110+
assert!(messages_received == 5);
111+
}
112+
113+
56114
#[tokio::test(flavor = "multi_thread")]
57115
async fn consumer_close_test() {
58116
let env = TestEnvironment::create().await;

0 commit comments

Comments
 (0)