Skip to content

Commit 6fe4c5e

Browse files
committed
implementing Consumer_Update command
1 parent 028f7e9 commit 6fe4c5e

File tree

4 files changed

+114
-3
lines changed

4 files changed

+114
-3
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use std::io::Write;
2+
3+
#[cfg(test)]
4+
use fake::Fake;
5+
6+
use crate::{
7+
codec::{Decoder, Encoder},
8+
error::{DecodeError, EncodeError},
9+
protocol::commands::COMMAND_CONSUMER_UPDATE,
10+
};
11+
12+
use super::Command;
13+
14+
#[cfg_attr(test, derive(fake::Dummy))]
15+
#[derive(PartialEq, Eq, Debug)]
16+
pub struct ConsumerUpdateCommand {
17+
pub(crate) correlation_id: u32,
18+
subscription_id: u8,
19+
active: u8,
20+
}
21+
22+
impl ConsumerUpdateCommand {
23+
pub fn new(
24+
correlation_id: u32,
25+
subscription_id: u8,
26+
active: u8,
27+
) -> Self {
28+
Self {
29+
correlation_id,
30+
subscription_id,
31+
active,
32+
}
33+
}
34+
}
35+
36+
impl Encoder for ConsumerUpdateCommand {
37+
fn encoded_size(&self) -> u32 {
38+
self.correlation_id.encoded_size()
39+
+ self.subscription_id.encoded_size()
40+
+ self.active.encoded_size()
41+
}
42+
43+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
44+
self.correlation_id.encode(writer)?;
45+
self.subscription_id.encode(writer)?;
46+
self.active.encode(writer)?;
47+
Ok(())
48+
}
49+
}
50+
51+
impl Decoder for ConsumerUpdateCommand {
52+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
53+
let (input, correlation_id) = u32::decode(input)?;
54+
let (input, subscription_id) = u8::decode(input)?;
55+
let (input, active) = u8::decode(input)?;
56+
57+
Ok((
58+
input,
59+
ConsumerUpdateCommand {
60+
correlation_id,
61+
subscription_id,
62+
active,
63+
},
64+
))
65+
}
66+
}
67+
68+
impl Command for ConsumerUpdateCommand {
69+
fn key(&self) -> u16 {
70+
COMMAND_CONSUMER_UPDATE
71+
}
72+
}
73+
74+
#[cfg(test)]
75+
mod tests {
76+
use crate::commands::tests::command_encode_decode_test;
77+
78+
use super::{ConsumerUpdateCommand};
79+
80+
#[test]
81+
fn subscribe_request_test() {
82+
command_encode_decode_test::<ConsumerUpdateCommand>();
83+
}
84+
}

protocol/src/commands/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub mod superstream_partitions;
2929
pub mod superstream_route;
3030
pub mod tune;
3131
pub mod unsubscribe;
32+
pub mod consumer_update;
3233

3334
pub trait Command {
3435
fn key(&self) -> u16;

protocol/src/protocol.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ pub mod commands {
3232
pub const COMMAND_STREAMS_STATS: u16 = 28;
3333
pub const COMMAND_CREATE_SUPER_STREAM: u16 = 29;
3434
pub const COMMAND_DELETE_SUPER_STREAM: u16 = 30;
35+
pub const COMMAND_CONSUMER_UPDATE_REQUEST: u16 = 32794;
36+
37+
3538
}
3639

3740
// server responses

protocol/src/response/mod.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::{
1414
publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse,
1515
query_publisher_sequence::QueryPublisherResponse, sasl_handshake::SaslHandshakeResponse,
1616
superstream_partitions::SuperStreamPartitionsResponse,
17-
superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
17+
superstream_route::SuperStreamRouteResponse, tune::TunesCommand, consumer_update::ConsumerUpdateCommand
1818
},
1919
error::DecodeError,
2020
protocol::commands::*,
@@ -72,6 +72,7 @@ pub enum ResponseKind {
7272
ExchangeCommandVersions(ExchangeCommandVersionsResponse),
7373
SuperStreamPartitions(SuperStreamPartitionsResponse),
7474
SuperStreamRoute(SuperStreamRouteResponse),
75+
ConsumerUpdate(ConsumerUpdateCommand),
7576
}
7677

7778
impl Response {
@@ -107,6 +108,9 @@ impl Response {
107108
ResponseKind::SuperStreamRoute(super_stream_route_command) => {
108109
Some(super_stream_route_command.correlation_id)
109110
}
111+
ResponseKind::ConsumerUpdate(consumer_update_command) => {
112+
Some(consumer_update_command.correlation_id)
113+
}
110114
}
111115
}
112116

@@ -188,6 +192,8 @@ impl Decoder for Response {
188192
.map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamPartitions(kind)))?,
189193
COMMAND_ROUTE => SuperStreamRouteResponse::decode(input)
190194
.map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamRoute(kind)))?,
195+
COMMAND_CONSUMER_UPDATE => ConsumerUpdateCommand::decode(input)
196+
.map(|(remaining, kind)| (remaining, ResponseKind::ConsumerUpdate(kind)))?,
191197
n => return Err(DecodeError::UnsupportedResponseType(n)),
192198
};
193199
Ok((input, Response { header, kind }))
@@ -228,22 +234,24 @@ mod tests {
228234
query_publisher_sequence::QueryPublisherResponse,
229235
sasl_handshake::SaslHandshakeResponse,
230236
superstream_partitions::SuperStreamPartitionsResponse,
231-
superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
237+
superstream_route::SuperStreamRouteResponse, tune::TunesCommand, consumer_update::ConsumerUpdateCommand
232238
},
233239
protocol::{
234240
commands::{
235241
COMMAND_CLOSE, COMMAND_DELIVER, COMMAND_HEARTBEAT, COMMAND_METADATA,
236242
COMMAND_METADATA_UPDATE, COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES,
237243
COMMAND_PUBLISH_CONFIRM, COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET,
238244
COMMAND_QUERY_PUBLISHER_SEQUENCE, COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE,
239-
COMMAND_SASL_HANDSHAKE, COMMAND_TUNE,
245+
COMMAND_SASL_HANDSHAKE, COMMAND_TUNE, COMMAND_CONSUMER_UPDATE,
240246
},
241247
version::PROTOCOL_VERSION,
242248
},
243249
response::COMMAND_EXCHANGE_COMMAND_VERSIONS,
244250
types::Header,
245251
ResponseCode,
246252
};
253+
use crate::protocol::commands::COMMAND_CONSUMER_UPDATE;
254+
247255
impl Encoder for ResponseKind {
248256
fn encoded_size(&self) -> u32 {
249257
match self {
@@ -273,6 +281,9 @@ mod tests {
273281
ResponseKind::SuperStreamRoute(super_stream_response) => {
274282
super_stream_response.encoded_size()
275283
}
284+
ResponseKind::ConsumerUpdate(consumer_update_response) => {
285+
consumer_update_response.encoded_size()
286+
}
276287
}
277288
}
278289

@@ -307,6 +318,9 @@ mod tests {
307318
ResponseKind::SuperStreamRoute(super_stream_command_versions) => {
308319
super_stream_command_versions.encode(writer)
309320
}
321+
ResponseKind::ConsumerUpdate(consumer_update_command_version) => {
322+
consumer_update_command_version.encode(writer)
323+
}
310324
}
311325
}
312326
}
@@ -494,4 +508,13 @@ mod tests {
494508
COMMAND_ROUTE
495509
);
496510
}
511+
512+
#[test]
513+
fn consumer_update_response_test() {
514+
response_test!(
515+
ConsumerUpdateCommand,
516+
ResponseKind::ConsumerUpdate,
517+
COMMAND_CONSUMER_UPDATE
518+
);
519+
}
497520
}

0 commit comments

Comments
 (0)