Skip to content

Commit 5e2d3c5

Browse files
committed
implementing ConsumerUpdateRequest command
1 parent 6fe4c5e commit 5e2d3c5

File tree

7 files changed

+160
-28
lines changed

7 files changed

+160
-28
lines changed

protocol/src/commands/consumer_update.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,7 @@ pub struct ConsumerUpdateCommand {
2020
}
2121

2222
impl ConsumerUpdateCommand {
23-
pub fn new(
24-
correlation_id: u32,
25-
subscription_id: u8,
26-
active: u8,
27-
) -> Self {
23+
pub fn new(correlation_id: u32, subscription_id: u8, active: u8) -> Self {
2824
Self {
2925
correlation_id,
3026
subscription_id,
@@ -75,7 +71,7 @@ impl Command for ConsumerUpdateCommand {
7571
mod tests {
7672
use crate::commands::tests::command_encode_decode_test;
7773

78-
use super::{ConsumerUpdateCommand};
74+
use super::ConsumerUpdateCommand;
7975

8076
#[test]
8177
fn subscribe_request_test() {
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
use std::io::Write;
2+
3+
#[cfg(test)]
4+
use fake::Fake;
5+
6+
use crate::{
7+
codec::{Decoder, Encoder},
8+
error::{DecodeError, EncodeError},
9+
protocol::commands::COMMAND_CONSUMER_UPDATE_REQUEST,
10+
};
11+
12+
use crate::commands::subscribe::OffsetSpecification;
13+
14+
use super::Command;
15+
16+
#[cfg_attr(test, derive(fake::Dummy))]
17+
#[derive(PartialEq, Eq, Debug)]
18+
pub struct ConsumerUpdateRequestCommand {
19+
pub(crate) correlation_id: u32,
20+
subscription_id: u8,
21+
response_code: u16,
22+
offset_specification: OffsetSpecification,
23+
}
24+
25+
impl ConsumerUpdateRequestCommand {
26+
pub fn new(
27+
correlation_id: u32,
28+
subscription_id: u8,
29+
response_code: u16,
30+
offset_specification: OffsetSpecification,
31+
) -> Self {
32+
Self {
33+
correlation_id,
34+
subscription_id,
35+
response_code,
36+
offset_specification,
37+
}
38+
}
39+
}
40+
41+
impl Encoder for ConsumerUpdateRequestCommand {
42+
fn encoded_size(&self) -> u32 {
43+
self.correlation_id.encoded_size()
44+
+ self.subscription_id.encoded_size()
45+
+ self.response_code.encoded_size()
46+
+ self.offset_specification.encoded_size()
47+
}
48+
49+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
50+
self.correlation_id.encode(writer)?;
51+
self.subscription_id.encode(writer)?;
52+
self.response_code.encode(writer)?;
53+
self.offset_specification.encode(writer)?;
54+
Ok(())
55+
}
56+
}
57+
58+
impl Decoder for ConsumerUpdateRequestCommand {
59+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
60+
let (input, correlation_id) = u32::decode(input)?;
61+
let (input, subscription_id) = u8::decode(input)?;
62+
let (input, response_code) = u16::decode(input)?;
63+
let (input, offset_specification) = OffsetSpecification::decode(input)?;
64+
65+
Ok((
66+
input,
67+
ConsumerUpdateRequestCommand {
68+
correlation_id,
69+
subscription_id,
70+
response_code,
71+
offset_specification,
72+
},
73+
))
74+
}
75+
}
76+
77+
impl Command for ConsumerUpdateRequestCommand {
78+
fn key(&self) -> u16 {
79+
COMMAND_CONSUMER_UPDATE_REQUEST
80+
}
81+
}
82+
83+
#[cfg(test)]
84+
mod tests {
85+
use crate::commands::tests::command_encode_decode_test;
86+
87+
use super::ConsumerUpdateRequestCommand;
88+
89+
#[test]
90+
fn subscribe_request_test() {
91+
command_encode_decode_test::<ConsumerUpdateRequestCommand>();
92+
}
93+
}

protocol/src/commands/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use crate::protocol::version::PROTOCOL_VERSION;
22

33
pub mod close;
4+
pub mod consumer_update;
5+
pub mod consumer_update_request;
46
pub mod create_stream;
57
pub mod create_super_stream;
68
pub mod credit;
@@ -29,7 +31,6 @@ pub mod superstream_partitions;
2931
pub mod superstream_route;
3032
pub mod tune;
3133
pub mod unsubscribe;
32-
pub mod consumer_update;
3334

3435
pub trait Command {
3536
fn key(&self) -> u16;

protocol/src/protocol.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ pub mod commands {
3333
pub const COMMAND_CREATE_SUPER_STREAM: u16 = 29;
3434
pub const COMMAND_DELETE_SUPER_STREAM: u16 = 30;
3535
pub const COMMAND_CONSUMER_UPDATE_REQUEST: u16 = 32794;
36-
37-
3836
}
3937

4038
// server responses

protocol/src/request/mod.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use std::io::Write;
33
use crate::{
44
codec::{decoder::read_u32, Decoder, Encoder},
55
commands::{
6-
close::CloseRequest, create_stream::CreateStreamCommand,
7-
create_super_stream::CreateSuperStreamCommand, credit::CreditCommand,
8-
declare_publisher::DeclarePublisherCommand, delete::Delete,
6+
close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand,
7+
create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand,
8+
credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete,
99
delete_publisher::DeletePublisherCommand, delete_super_stream::DeleteSuperStreamCommand,
1010
exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand,
1111
metadata::MetadataCommand, open::OpenCommand, peer_properties::PeerPropertiesCommand,
@@ -68,6 +68,7 @@ pub enum RequestKind {
6868
DeleteSuperStream(DeleteSuperStreamCommand),
6969
SuperStreamPartitions(SuperStreamPartitionsRequest),
7070
SuperStreamRoute(SuperStreamRouteRequest),
71+
ConsumerUpdateRequest(ConsumerUpdateRequestCommand),
7172
}
7273

7374
impl Encoder for RequestKind {
@@ -105,6 +106,9 @@ impl Encoder for RequestKind {
105106
super_stream_partitions.encoded_size()
106107
}
107108
RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encoded_size(),
109+
RequestKind::ConsumerUpdateRequest(consumer_update_request) => {
110+
consumer_update_request.encoded_size()
111+
}
108112
}
109113
}
110114

@@ -142,6 +146,9 @@ impl Encoder for RequestKind {
142146
super_stream_partition.encode(writer)
143147
}
144148
RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encode(writer),
149+
RequestKind::ConsumerUpdateRequest(consumer_update_request) => {
150+
consumer_update_request.encode(writer)
151+
}
145152
}
146153
}
147154
}
@@ -222,6 +229,9 @@ impl Decoder for Request {
222229
COMMAND_ROUTE => {
223230
SuperStreamRouteRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
224231
}
232+
COMMAND_CONSUMER_UPDATE_REQUEST => {
233+
ConsumerUpdateRequestCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
234+
}
225235
n => return Err(DecodeError::UnsupportedResponseType(n)),
226236
};
227237
Ok((input, Request { header, kind: cmd }))
@@ -234,9 +244,9 @@ mod tests {
234244
use crate::{
235245
codec::{Decoder, Encoder},
236246
commands::{
237-
close::CloseRequest, create_stream::CreateStreamCommand,
238-
create_super_stream::CreateSuperStreamCommand, credit::CreditCommand,
239-
declare_publisher::DeclarePublisherCommand, delete::Delete,
247+
close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand,
248+
create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand,
249+
credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete,
240250
delete_publisher::DeletePublisherCommand,
241251
delete_super_stream::DeleteSuperStreamCommand,
242252
exchange_command_versions::ExchangeCommandVersionsRequest,
@@ -387,4 +397,9 @@ mod tests {
387397
fn request_route_command() {
388398
request_encode_decode_test::<SuperStreamRouteRequest>()
389399
}
400+
401+
#[test]
402+
fn request_consumer_update_request_command() {
403+
request_encode_decode_test::<ConsumerUpdateRequestCommand>()
404+
}
390405
}

protocol/src/request/shims.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ use crate::commands::create_super_stream::CreateSuperStreamCommand;
22
use crate::commands::delete_super_stream::DeleteSuperStreamCommand;
33
use crate::{
44
commands::{
5-
close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand,
5+
close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand,
6+
create_stream::CreateStreamCommand, credit::CreditCommand,
67
declare_publisher::DeclarePublisherCommand, delete::Delete,
78
delete_publisher::DeletePublisherCommand,
89
exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand,
@@ -164,3 +165,9 @@ impl From<SuperStreamRouteRequest> for RequestKind {
164165
RequestKind::SuperStreamRoute(cmd)
165166
}
166167
}
168+
169+
impl From<ConsumerUpdateRequestCommand> for RequestKind {
170+
fn from(cmd: ConsumerUpdateRequestCommand) -> Self {
171+
RequestKind::ConsumerUpdateRequest(cmd)
172+
}
173+
}

protocol/src/response/mod.rs

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@ use crate::{
66
Decoder,
77
},
88
commands::{
9-
close::CloseResponse, credit::CreditResponse, deliver::DeliverCommand,
10-
exchange_command_versions::ExchangeCommandVersionsResponse, generic::GenericResponse,
11-
heart_beat::HeartbeatResponse, metadata::MetadataResponse,
9+
close::CloseResponse, consumer_update::ConsumerUpdateCommand,
10+
consumer_update_request::ConsumerUpdateRequestCommand, credit::CreditResponse,
11+
deliver::DeliverCommand, exchange_command_versions::ExchangeCommandVersionsResponse,
12+
generic::GenericResponse, heart_beat::HeartbeatResponse, metadata::MetadataResponse,
1213
metadata_update::MetadataUpdateCommand, open::OpenResponse,
1314
peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm,
1415
publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse,
1516
query_publisher_sequence::QueryPublisherResponse, sasl_handshake::SaslHandshakeResponse,
1617
superstream_partitions::SuperStreamPartitionsResponse,
17-
superstream_route::SuperStreamRouteResponse, tune::TunesCommand, consumer_update::ConsumerUpdateCommand
18+
superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
1819
},
1920
error::DecodeError,
2021
protocol::commands::*,
@@ -73,6 +74,7 @@ pub enum ResponseKind {
7374
SuperStreamPartitions(SuperStreamPartitionsResponse),
7475
SuperStreamRoute(SuperStreamRouteResponse),
7576
ConsumerUpdate(ConsumerUpdateCommand),
77+
ConsumerUpdateRequest(ConsumerUpdateRequestCommand),
7678
}
7779

7880
impl Response {
@@ -111,6 +113,9 @@ impl Response {
111113
ResponseKind::ConsumerUpdate(consumer_update_command) => {
112114
Some(consumer_update_command.correlation_id)
113115
}
116+
ResponseKind::ConsumerUpdateRequest(consumer_update_request_command) => {
117+
Some(consumer_update_request_command.correlation_id)
118+
}
114119
}
115120
}
116121

@@ -155,6 +160,7 @@ impl Decoder for Response {
155160
| COMMAND_CREATE_STREAM
156161
| COMMAND_CREATE_SUPER_STREAM
157162
| COMMAND_DELETE_SUPER_STREAM
163+
| COMMAND_CONSUMER_UPDATE_REQUEST
158164
| COMMAND_DELETE_STREAM => {
159165
GenericResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Generic(kind)))?
160166
}
@@ -222,10 +228,12 @@ mod tests {
222228
use byteorder::{BigEndian, WriteBytesExt};
223229

224230
use super::{Response, ResponseKind};
231+
use crate::protocol::commands::COMMAND_CONSUMER_UPDATE;
225232
use crate::{
226233
codec::{Decoder, Encoder},
227234
commands::{
228-
close::CloseResponse, deliver::DeliverCommand,
235+
close::CloseResponse, consumer_update::ConsumerUpdateCommand,
236+
consumer_update_request::ConsumerUpdateRequestCommand, deliver::DeliverCommand,
229237
exchange_command_versions::ExchangeCommandVersionsResponse, generic::GenericResponse,
230238
heart_beat::HeartbeatResponse, metadata::MetadataResponse,
231239
metadata_update::MetadataUpdateCommand, open::OpenResponse,
@@ -234,23 +242,22 @@ mod tests {
234242
query_publisher_sequence::QueryPublisherResponse,
235243
sasl_handshake::SaslHandshakeResponse,
236244
superstream_partitions::SuperStreamPartitionsResponse,
237-
superstream_route::SuperStreamRouteResponse, tune::TunesCommand, consumer_update::ConsumerUpdateCommand
245+
superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
238246
},
239247
protocol::{
240248
commands::{
241-
COMMAND_CLOSE, COMMAND_DELIVER, COMMAND_HEARTBEAT, COMMAND_METADATA,
242-
COMMAND_METADATA_UPDATE, COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES,
243-
COMMAND_PUBLISH_CONFIRM, COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET,
244-
COMMAND_QUERY_PUBLISHER_SEQUENCE, COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE,
245-
COMMAND_SASL_HANDSHAKE, COMMAND_TUNE, COMMAND_CONSUMER_UPDATE,
249+
COMMAND_CLOSE, COMMAND_CONSUMER_UPDATE, COMMAND_CONSUMER_UPDATE_REQUEST,
250+
COMMAND_DELIVER, COMMAND_HEARTBEAT, COMMAND_METADATA, COMMAND_METADATA_UPDATE,
251+
COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES, COMMAND_PUBLISH_CONFIRM,
252+
COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET, COMMAND_QUERY_PUBLISHER_SEQUENCE,
253+
COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE, COMMAND_SASL_HANDSHAKE, COMMAND_TUNE,
246254
},
247255
version::PROTOCOL_VERSION,
248256
},
249257
response::COMMAND_EXCHANGE_COMMAND_VERSIONS,
250258
types::Header,
251259
ResponseCode,
252260
};
253-
use crate::protocol::commands::COMMAND_CONSUMER_UPDATE;
254261

255262
impl Encoder for ResponseKind {
256263
fn encoded_size(&self) -> u32 {
@@ -284,6 +291,9 @@ mod tests {
284291
ResponseKind::ConsumerUpdate(consumer_update_response) => {
285292
consumer_update_response.encoded_size()
286293
}
294+
ResponseKind::ConsumerUpdateRequest(consumer_update_request_response) => {
295+
consumer_update_request_response.encoded_size()
296+
}
287297
}
288298
}
289299

@@ -321,6 +331,9 @@ mod tests {
321331
ResponseKind::ConsumerUpdate(consumer_update_command_version) => {
322332
consumer_update_command_version.encode(writer)
323333
}
334+
ResponseKind::ConsumerUpdateRequest(consumer_update_request_command_version) => {
335+
consumer_update_request_command_version.encode(writer)
336+
}
324337
}
325338
}
326339
}
@@ -517,4 +530,13 @@ mod tests {
517530
COMMAND_CONSUMER_UPDATE
518531
);
519532
}
533+
534+
#[test]
535+
fn consumer_update_request_response_test() {
536+
response_test!(
537+
ConsumerUpdateRequestCommand,
538+
ResponseKind::ConsumerUpdateRequest,
539+
COMMAND_CONSUMER_UPDATE_REQUEST
540+
);
541+
}
520542
}

0 commit comments

Comments
 (0)