Skip to content

Commit 6a085e4

Browse files
committed
implementing ConsumerUpdateRequest command
1 parent 6fe4c5e commit 6a085e4

File tree

9 files changed

+166
-30
lines changed

9 files changed

+166
-30
lines changed

protocol/src/commands/consumer_update.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,7 @@ pub struct ConsumerUpdateCommand {
2020
}
2121

2222
impl ConsumerUpdateCommand {
23-
pub fn new(
24-
correlation_id: u32,
25-
subscription_id: u8,
26-
active: u8,
27-
) -> Self {
23+
pub fn new(correlation_id: u32, subscription_id: u8, active: u8) -> Self {
2824
Self {
2925
correlation_id,
3026
subscription_id,
@@ -75,7 +71,7 @@ impl Command for ConsumerUpdateCommand {
7571
mod tests {
7672
use crate::commands::tests::command_encode_decode_test;
7773

78-
use super::{ConsumerUpdateCommand};
74+
use super::ConsumerUpdateCommand;
7975

8076
#[test]
8177
fn subscribe_request_test() {
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
use std::io::Write;
2+
3+
#[cfg(test)]
4+
use fake::Fake;
5+
6+
use crate::{
7+
codec::{Decoder, Encoder},
8+
error::{DecodeError, EncodeError},
9+
protocol::commands::COMMAND_CONSUMER_UPDATE_REQUEST,
10+
};
11+
12+
use crate::commands::subscribe::OffsetSpecification;
13+
14+
use super::Command;
15+
16+
#[cfg_attr(test, derive(fake::Dummy))]
17+
#[derive(PartialEq, Eq, Debug)]
18+
pub struct ConsumerUpdateRequestCommand {
19+
pub(crate) correlation_id: u32,
20+
response_code: u16,
21+
offset_specification: OffsetSpecification,
22+
}
23+
24+
impl ConsumerUpdateRequestCommand {
25+
pub fn new(
26+
correlation_id: u32,
27+
response_code: u16,
28+
offset_specification: OffsetSpecification,
29+
) -> Self {
30+
Self {
31+
correlation_id,
32+
response_code,
33+
offset_specification,
34+
}
35+
}
36+
}
37+
38+
impl Encoder for ConsumerUpdateRequestCommand {
39+
fn encoded_size(&self) -> u32 {
40+
self.correlation_id.encoded_size()
41+
+ self.response_code.encoded_size()
42+
+ self.offset_specification.encoded_size()
43+
}
44+
45+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
46+
self.correlation_id.encode(writer)?;
47+
self.response_code.encode(writer)?;
48+
self.offset_specification.encode(writer)?;
49+
Ok(())
50+
}
51+
}
52+
53+
impl Decoder for ConsumerUpdateRequestCommand {
54+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
55+
let (input, correlation_id) = u32::decode(input)?;
56+
let (input, response_code) = u16::decode(input)?;
57+
let (input, offset_specification) = OffsetSpecification::decode(input)?;
58+
59+
Ok((
60+
input,
61+
ConsumerUpdateRequestCommand {
62+
correlation_id,
63+
response_code,
64+
offset_specification,
65+
},
66+
))
67+
}
68+
}
69+
70+
impl Command for ConsumerUpdateRequestCommand {
71+
fn key(&self) -> u16 {
72+
COMMAND_CONSUMER_UPDATE_REQUEST
73+
}
74+
}
75+
76+
#[cfg(test)]
77+
mod tests {
78+
use crate::commands::tests::command_encode_decode_test;
79+
80+
use super::ConsumerUpdateRequestCommand;
81+
82+
#[test]
83+
fn subscribe_request_test() {
84+
command_encode_decode_test::<ConsumerUpdateRequestCommand>();
85+
}
86+
}

protocol/src/commands/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use crate::protocol::version::PROTOCOL_VERSION;
22

33
pub mod close;
4+
pub mod consumer_update;
5+
pub mod consumer_update_request;
46
pub mod create_stream;
57
pub mod create_super_stream;
68
pub mod credit;
@@ -29,7 +31,6 @@ pub mod superstream_partitions;
2931
pub mod superstream_route;
3032
pub mod tune;
3133
pub mod unsubscribe;
32-
pub mod consumer_update;
3334

3435
pub trait Command {
3536
fn key(&self) -> u16;

protocol/src/protocol.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ pub mod commands {
3333
pub const COMMAND_CREATE_SUPER_STREAM: u16 = 29;
3434
pub const COMMAND_DELETE_SUPER_STREAM: u16 = 30;
3535
pub const COMMAND_CONSUMER_UPDATE_REQUEST: u16 = 32794;
36-
37-
3836
}
3937

4038
// server responses

protocol/src/request/mod.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use std::io::Write;
33
use crate::{
44
codec::{decoder::read_u32, Decoder, Encoder},
55
commands::{
6-
close::CloseRequest, create_stream::CreateStreamCommand,
7-
create_super_stream::CreateSuperStreamCommand, credit::CreditCommand,
8-
declare_publisher::DeclarePublisherCommand, delete::Delete,
6+
close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand,
7+
create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand,
8+
credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete,
99
delete_publisher::DeletePublisherCommand, delete_super_stream::DeleteSuperStreamCommand,
1010
exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand,
1111
metadata::MetadataCommand, open::OpenCommand, peer_properties::PeerPropertiesCommand,
@@ -68,6 +68,7 @@ pub enum RequestKind {
6868
DeleteSuperStream(DeleteSuperStreamCommand),
6969
SuperStreamPartitions(SuperStreamPartitionsRequest),
7070
SuperStreamRoute(SuperStreamRouteRequest),
71+
ConsumerUpdateRequest(ConsumerUpdateRequestCommand),
7172
}
7273

7374
impl Encoder for RequestKind {
@@ -105,6 +106,9 @@ impl Encoder for RequestKind {
105106
super_stream_partitions.encoded_size()
106107
}
107108
RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encoded_size(),
109+
RequestKind::ConsumerUpdateRequest(consumer_update_request) => {
110+
consumer_update_request.encoded_size()
111+
}
108112
}
109113
}
110114

@@ -142,6 +146,9 @@ impl Encoder for RequestKind {
142146
super_stream_partition.encode(writer)
143147
}
144148
RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encode(writer),
149+
RequestKind::ConsumerUpdateRequest(consumer_update_request) => {
150+
consumer_update_request.encode(writer)
151+
}
145152
}
146153
}
147154
}
@@ -222,6 +229,9 @@ impl Decoder for Request {
222229
COMMAND_ROUTE => {
223230
SuperStreamRouteRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
224231
}
232+
COMMAND_CONSUMER_UPDATE_REQUEST => {
233+
ConsumerUpdateRequestCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
234+
}
225235
n => return Err(DecodeError::UnsupportedResponseType(n)),
226236
};
227237
Ok((input, Request { header, kind: cmd }))
@@ -234,9 +244,9 @@ mod tests {
234244
use crate::{
235245
codec::{Decoder, Encoder},
236246
commands::{
237-
close::CloseRequest, create_stream::CreateStreamCommand,
238-
create_super_stream::CreateSuperStreamCommand, credit::CreditCommand,
239-
declare_publisher::DeclarePublisherCommand, delete::Delete,
247+
close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand,
248+
create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand,
249+
credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete,
240250
delete_publisher::DeletePublisherCommand,
241251
delete_super_stream::DeleteSuperStreamCommand,
242252
exchange_command_versions::ExchangeCommandVersionsRequest,
@@ -387,4 +397,9 @@ mod tests {
387397
fn request_route_command() {
388398
request_encode_decode_test::<SuperStreamRouteRequest>()
389399
}
400+
401+
#[test]
402+
fn request_consumer_update_request_command() {
403+
request_encode_decode_test::<ConsumerUpdateRequestCommand>()
404+
}
390405
}

protocol/src/request/shims.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ use crate::commands::create_super_stream::CreateSuperStreamCommand;
22
use crate::commands::delete_super_stream::DeleteSuperStreamCommand;
33
use crate::{
44
commands::{
5-
close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand,
5+
close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand,
6+
create_stream::CreateStreamCommand, credit::CreditCommand,
67
declare_publisher::DeclarePublisherCommand, delete::Delete,
78
delete_publisher::DeletePublisherCommand,
89
exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand,
@@ -164,3 +165,9 @@ impl From<SuperStreamRouteRequest> for RequestKind {
164165
RequestKind::SuperStreamRoute(cmd)
165166
}
166167
}
168+
169+
impl From<ConsumerUpdateRequestCommand> for RequestKind {
170+
fn from(cmd: ConsumerUpdateRequestCommand) -> Self {
171+
RequestKind::ConsumerUpdateRequest(cmd)
172+
}
173+
}

protocol/src/response/mod.rs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@ use crate::{
66
Decoder,
77
},
88
commands::{
9-
close::CloseResponse, credit::CreditResponse, deliver::DeliverCommand,
10-
exchange_command_versions::ExchangeCommandVersionsResponse, generic::GenericResponse,
11-
heart_beat::HeartbeatResponse, metadata::MetadataResponse,
9+
close::CloseResponse, consumer_update::ConsumerUpdateCommand,
10+
consumer_update_request::ConsumerUpdateRequestCommand, credit::CreditResponse,
11+
deliver::DeliverCommand, exchange_command_versions::ExchangeCommandVersionsResponse,
12+
generic::GenericResponse, heart_beat::HeartbeatResponse, metadata::MetadataResponse,
1213
metadata_update::MetadataUpdateCommand, open::OpenResponse,
1314
peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm,
1415
publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse,
1516
query_publisher_sequence::QueryPublisherResponse, sasl_handshake::SaslHandshakeResponse,
1617
superstream_partitions::SuperStreamPartitionsResponse,
17-
superstream_route::SuperStreamRouteResponse, tune::TunesCommand, consumer_update::ConsumerUpdateCommand
18+
superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
1819
},
1920
error::DecodeError,
2021
protocol::commands::*,
@@ -73,6 +74,7 @@ pub enum ResponseKind {
7374
SuperStreamPartitions(SuperStreamPartitionsResponse),
7475
SuperStreamRoute(SuperStreamRouteResponse),
7576
ConsumerUpdate(ConsumerUpdateCommand),
77+
ConsumerUpdateRequest(ConsumerUpdateRequestCommand),
7678
}
7779

7880
impl Response {
@@ -111,6 +113,9 @@ impl Response {
111113
ResponseKind::ConsumerUpdate(consumer_update_command) => {
112114
Some(consumer_update_command.correlation_id)
113115
}
116+
ResponseKind::ConsumerUpdateRequest(consumer_update_request_command) => {
117+
Some(consumer_update_request_command.correlation_id)
118+
}
114119
}
115120
}
116121

@@ -155,6 +160,7 @@ impl Decoder for Response {
155160
| COMMAND_CREATE_STREAM
156161
| COMMAND_CREATE_SUPER_STREAM
157162
| COMMAND_DELETE_SUPER_STREAM
163+
| COMMAND_CONSUMER_UPDATE_REQUEST
158164
| COMMAND_DELETE_STREAM => {
159165
GenericResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Generic(kind)))?
160166
}
@@ -225,7 +231,8 @@ mod tests {
225231
use crate::{
226232
codec::{Decoder, Encoder},
227233
commands::{
228-
close::CloseResponse, deliver::DeliverCommand,
234+
close::CloseResponse, consumer_update::ConsumerUpdateCommand,
235+
consumer_update_request::ConsumerUpdateRequestCommand, deliver::DeliverCommand,
229236
exchange_command_versions::ExchangeCommandVersionsResponse, generic::GenericResponse,
230237
heart_beat::HeartbeatResponse, metadata::MetadataResponse,
231238
metadata_update::MetadataUpdateCommand, open::OpenResponse,
@@ -234,23 +241,22 @@ mod tests {
234241
query_publisher_sequence::QueryPublisherResponse,
235242
sasl_handshake::SaslHandshakeResponse,
236243
superstream_partitions::SuperStreamPartitionsResponse,
237-
superstream_route::SuperStreamRouteResponse, tune::TunesCommand, consumer_update::ConsumerUpdateCommand
244+
superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
238245
},
239246
protocol::{
240247
commands::{
241-
COMMAND_CLOSE, COMMAND_DELIVER, COMMAND_HEARTBEAT, COMMAND_METADATA,
242-
COMMAND_METADATA_UPDATE, COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES,
243-
COMMAND_PUBLISH_CONFIRM, COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET,
244-
COMMAND_QUERY_PUBLISHER_SEQUENCE, COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE,
245-
COMMAND_SASL_HANDSHAKE, COMMAND_TUNE, COMMAND_CONSUMER_UPDATE,
248+
COMMAND_CLOSE, COMMAND_CONSUMER_UPDATE, COMMAND_CONSUMER_UPDATE_REQUEST,
249+
COMMAND_DELIVER, COMMAND_HEARTBEAT, COMMAND_METADATA, COMMAND_METADATA_UPDATE,
250+
COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES, COMMAND_PUBLISH_CONFIRM,
251+
COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET, COMMAND_QUERY_PUBLISHER_SEQUENCE,
252+
COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE, COMMAND_SASL_HANDSHAKE, COMMAND_TUNE,
246253
},
247254
version::PROTOCOL_VERSION,
248255
},
249256
response::COMMAND_EXCHANGE_COMMAND_VERSIONS,
250257
types::Header,
251258
ResponseCode,
252259
};
253-
use crate::protocol::commands::COMMAND_CONSUMER_UPDATE;
254260

255261
impl Encoder for ResponseKind {
256262
fn encoded_size(&self) -> u32 {
@@ -284,6 +290,9 @@ mod tests {
284290
ResponseKind::ConsumerUpdate(consumer_update_response) => {
285291
consumer_update_response.encoded_size()
286292
}
293+
ResponseKind::ConsumerUpdateRequest(consumer_update_request_response) => {
294+
consumer_update_request_response.encoded_size()
295+
}
287296
}
288297
}
289298

@@ -321,6 +330,9 @@ mod tests {
321330
ResponseKind::ConsumerUpdate(consumer_update_command_version) => {
322331
consumer_update_command_version.encode(writer)
323332
}
333+
ResponseKind::ConsumerUpdateRequest(consumer_update_request_command_version) => {
334+
consumer_update_request_command_version.encode(writer)
335+
}
324336
}
325337
}
326338
}
@@ -517,4 +529,13 @@ mod tests {
517529
COMMAND_CONSUMER_UPDATE
518530
);
519531
}
532+
533+
#[test]
534+
fn consumer_update_request_response_test() {
535+
response_test!(
536+
ConsumerUpdateRequestCommand,
537+
ResponseKind::ConsumerUpdateRequest,
538+
COMMAND_CONSUMER_UPDATE_REQUEST
539+
);
540+
}
520541
}

src/client/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use tokio_rustls::{rustls, TlsConnector};
3434
use tokio_util::codec::Framed;
3535
use tracing::trace;
3636

37+
use crate::{error::ClientError, RabbitMQStreamResult};
3738
pub use message::ClientMessage;
3839
pub use metadata::{Broker, StreamMetadata};
3940
pub use metrics::MetricsCollector;
@@ -71,8 +72,6 @@ use rabbitmq_stream_protocol::{
7172
FromResponse, Request, Response, ResponseCode, ResponseKind,
7273
};
7374

74-
use crate::{error::ClientError, RabbitMQStreamResult};
75-
7675
pub use self::handler::{MessageHandler, MessageResult};
7776
use self::{
7877
channel::{channel, ChannelReceiver, ChannelSender},

tests/integration/client_test.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,3 +457,16 @@ async fn client_test_route_test() {
457457
test.partitions.get(0).unwrap()
458458
);
459459
}
460+
461+
#[tokio::test(flavor = "multi_thread")]
462+
async fn client_consumer_update_request_test() {
463+
let test = TestClient::create().await;
464+
465+
let response = test
466+
.client
467+
.consumer_update(OffsetSpecification::Next)
468+
.await
469+
.unwrap();
470+
471+
assert_eq!(&ResponseCode::Ok, response.code());
472+
}

0 commit comments

Comments
 (0)