Skip to content

Commit b3e62b9

Browse files
committed
implementing route and partition commands
1 parent 551df26 commit b3e62b9

File tree

7 files changed

+392
-10
lines changed

7 files changed

+392
-10
lines changed

protocol/src/commands/create_super_stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ impl Command for CreateSuperStreamCommand {
8888

8989
#[cfg(test)]
9090
mod tests {
91-
use crate::commands::create_stream::CreateStreamCommand;
91+
9292
use crate::commands::tests::command_encode_decode_test;
9393

9494
use super::CreateSuperStreamCommand;

protocol/src/commands/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ pub mod sasl_authenticate;
2525
pub mod sasl_handshake;
2626
pub mod store_offset;
2727
pub mod subscribe;
28+
pub mod superstream_partitions;
29+
pub mod superstream_route;
2830
pub mod tune;
2931
pub mod unsubscribe;
3032

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
use std::io::Write;
2+
3+
#[cfg(test)]
4+
use fake::Fake;
5+
6+
use crate::{
7+
codec::{Decoder, Encoder},
8+
error::{DecodeError, EncodeError},
9+
protocol::commands::COMMAND_PARTITIONS,
10+
FromResponse, ResponseCode,
11+
};
12+
13+
use super::Command;
14+
15+
#[cfg_attr(test, derive(fake::Dummy))]
16+
#[derive(PartialEq, Eq, Debug)]
17+
pub struct SuperStreamPartitionsRequest {
18+
correlation_id: u32,
19+
super_stream: String,
20+
}
21+
22+
impl SuperStreamPartitionsRequest {
23+
pub fn new(correlation_id: u32, super_stream: String) -> Self {
24+
Self {
25+
correlation_id,
26+
super_stream,
27+
}
28+
}
29+
}
30+
31+
impl Encoder for SuperStreamPartitionsRequest {
32+
fn encoded_size(&self) -> u32 {
33+
self.correlation_id.encoded_size() + self.super_stream.as_str().encoded_size()
34+
}
35+
36+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
37+
self.correlation_id.encode(writer)?;
38+
self.super_stream.as_str().encode(writer)?;
39+
Ok(())
40+
}
41+
}
42+
43+
impl Decoder for SuperStreamPartitionsRequest {
44+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
45+
let (input, correlation_id) = u32::decode(input)?;
46+
let (input, super_stream) = Option::decode(input)?;
47+
48+
Ok((
49+
input,
50+
SuperStreamPartitionsRequest {
51+
correlation_id,
52+
super_stream: super_stream.unwrap(),
53+
},
54+
))
55+
}
56+
}
57+
58+
impl Command for SuperStreamPartitionsRequest {
59+
fn key(&self) -> u16 {
60+
COMMAND_PARTITIONS
61+
}
62+
}
63+
64+
#[cfg_attr(test, derive(fake::Dummy))]
65+
#[derive(PartialEq, Eq, Debug)]
66+
pub struct SuperStreamPartitionsResponse {
67+
pub(crate) correlation_id: u32,
68+
response_code: ResponseCode,
69+
streams: Vec<String>,
70+
}
71+
72+
impl SuperStreamPartitionsResponse {
73+
pub fn new(correlation_id: u32, streams: Vec<String>, response_code: ResponseCode) -> Self {
74+
Self {
75+
correlation_id,
76+
response_code,
77+
streams,
78+
}
79+
}
80+
pub fn is_ok(&self) -> bool {
81+
self.response_code == ResponseCode::Ok
82+
}
83+
}
84+
85+
impl Encoder for SuperStreamPartitionsResponse {
86+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
87+
self.correlation_id.encode(writer)?;
88+
self.streams.encode(writer)?;
89+
self.response_code.encode(writer)?;
90+
Ok(())
91+
}
92+
93+
fn encoded_size(&self) -> u32 {
94+
self.correlation_id.encoded_size()
95+
+ self.streams.encoded_size()
96+
+ self.response_code.encoded_size()
97+
}
98+
}
99+
100+
impl Decoder for SuperStreamPartitionsResponse {
101+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
102+
let (input, correlation_id) = u32::decode(input)?;
103+
let (input, streams) = Vec::decode(input)?;
104+
let (input, response_code) = ResponseCode::decode(input)?;
105+
106+
Ok((
107+
input,
108+
SuperStreamPartitionsResponse {
109+
correlation_id,
110+
response_code,
111+
streams,
112+
},
113+
))
114+
}
115+
}
116+
117+
impl FromResponse for SuperStreamPartitionsResponse {
118+
fn from_response(response: crate::Response) -> Option<Self> {
119+
match response.kind {
120+
crate::ResponseKind::SuperStreamPartitions(partitions_response) => {
121+
Some(partitions_response)
122+
}
123+
_ => None,
124+
}
125+
}
126+
}
127+
128+
#[cfg(test)]
129+
mod tests {
130+
131+
use crate::commands::tests::command_encode_decode_test;
132+
133+
use super::SuperStreamPartitionsRequest;
134+
use super::SuperStreamPartitionsResponse;
135+
136+
#[test]
137+
fn super_stream_partition_request_test() {
138+
command_encode_decode_test::<SuperStreamPartitionsRequest>();
139+
}
140+
141+
#[test]
142+
fn super_stream_partition_response_test() {
143+
command_encode_decode_test::<SuperStreamPartitionsResponse>();
144+
}
145+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
use std::io::Write;
2+
3+
#[cfg(test)]
4+
use fake::Fake;
5+
6+
use crate::{
7+
codec::{Decoder, Encoder},
8+
error::{DecodeError, EncodeError},
9+
protocol::commands::COMMAND_ROUTE,
10+
FromResponse, ResponseCode,
11+
};
12+
13+
use super::Command;
14+
15+
#[cfg_attr(test, derive(fake::Dummy))]
16+
#[derive(PartialEq, Eq, Debug)]
17+
pub struct SuperStreamRouteRequest {
18+
correlation_id: u32,
19+
routing_key: String,
20+
super_stream: String,
21+
}
22+
23+
impl SuperStreamRouteRequest {
24+
pub fn new(correlation_id: u32, routing_key: String, super_stream: String) -> Self {
25+
Self {
26+
correlation_id,
27+
routing_key,
28+
super_stream,
29+
}
30+
}
31+
}
32+
33+
impl Encoder for SuperStreamRouteRequest {
34+
fn encoded_size(&self) -> u32 {
35+
self.correlation_id.encoded_size()
36+
+ self.routing_key.as_str().encoded_size()
37+
+ self.super_stream.as_str().encoded_size()
38+
}
39+
40+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
41+
self.correlation_id.encode(writer)?;
42+
self.routing_key.as_str().encode(writer)?;
43+
self.super_stream.as_str().encode(writer)?;
44+
Ok(())
45+
}
46+
}
47+
48+
impl Decoder for SuperStreamRouteRequest {
49+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
50+
let (input, correlation_id) = u32::decode(input)?;
51+
let (input, routing_key) = Option::decode(input)?;
52+
let (input, super_stream) = Option::decode(input)?;
53+
54+
Ok((
55+
input,
56+
SuperStreamRouteRequest {
57+
correlation_id,
58+
routing_key: routing_key.unwrap(),
59+
super_stream: super_stream.unwrap(),
60+
},
61+
))
62+
}
63+
}
64+
65+
impl Command for SuperStreamRouteRequest {
66+
fn key(&self) -> u16 {
67+
COMMAND_ROUTE
68+
}
69+
}
70+
71+
#[cfg_attr(test, derive(fake::Dummy))]
72+
#[derive(PartialEq, Eq, Debug)]
73+
pub struct SuperStreamRouteResponse {
74+
pub(crate) correlation_id: u32,
75+
response_code: ResponseCode,
76+
streams: Vec<String>,
77+
}
78+
79+
impl SuperStreamRouteResponse {
80+
pub fn new(correlation_id: u32, streams: Vec<String>, response_code: ResponseCode) -> Self {
81+
Self {
82+
correlation_id,
83+
response_code,
84+
streams,
85+
}
86+
}
87+
pub fn is_ok(&self) -> bool {
88+
self.response_code == ResponseCode::Ok
89+
}
90+
}
91+
92+
impl Encoder for SuperStreamRouteResponse {
93+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
94+
self.correlation_id.encode(writer)?;
95+
self.response_code.encode(writer)?;
96+
self.streams.encode(writer)?;
97+
Ok(())
98+
}
99+
100+
fn encoded_size(&self) -> u32 {
101+
self.correlation_id.encoded_size()
102+
+ self.streams.encoded_size()
103+
+ self.response_code.encoded_size()
104+
}
105+
}
106+
107+
impl Decoder for SuperStreamRouteResponse {
108+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
109+
let (input, correlation_id) = u32::decode(input)?;
110+
let (input, response_code) = ResponseCode::decode(input)?;
111+
let (input, streams) = Vec::decode(input)?;
112+
113+
Ok((
114+
input,
115+
SuperStreamRouteResponse {
116+
correlation_id,
117+
response_code,
118+
streams,
119+
},
120+
))
121+
}
122+
}
123+
124+
impl FromResponse for SuperStreamRouteResponse {
125+
fn from_response(response: crate::Response) -> Option<Self> {
126+
match response.kind {
127+
crate::ResponseKind::SuperStreamRoute(route) => Some(route),
128+
_ => None,
129+
}
130+
}
131+
}
132+
133+
#[cfg(test)]
134+
mod tests {
135+
136+
use crate::commands::tests::command_encode_decode_test;
137+
138+
use super::SuperStreamRouteRequest;
139+
use super::SuperStreamRouteResponse;
140+
141+
#[test]
142+
fn super_stream_route_request_test() {
143+
command_encode_decode_test::<SuperStreamRouteRequest>();
144+
}
145+
146+
#[test]
147+
fn super_stream_route_response_test() {
148+
command_encode_decode_test::<SuperStreamRouteResponse>();
149+
}
150+
}

protocol/src/request/mod.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use crate::{
1212
publish::PublishCommand, query_offset::QueryOffsetRequest,
1313
query_publisher_sequence::QueryPublisherRequest,
1414
sasl_authenticate::SaslAuthenticateCommand, sasl_handshake::SaslHandshakeCommand,
15-
store_offset::StoreOffset, subscribe::SubscribeCommand, tune::TunesCommand,
15+
store_offset::StoreOffset, subscribe::SubscribeCommand,
16+
superstream_partitions::SuperStreamPartitionsRequest,
17+
superstream_route::SuperStreamRouteRequest, tune::TunesCommand,
1618
unsubscribe::UnSubscribeCommand,
1719
},
1820
error::{DecodeError, EncodeError},
@@ -64,6 +66,8 @@ pub enum RequestKind {
6466
ExchangeCommandVersions(ExchangeCommandVersionsRequest),
6567
CreateSuperStream(CreateSuperStreamCommand),
6668
DeleteSuperStream(DeleteSuperStreamCommand),
69+
SuperStreamPartitions(SuperStreamPartitionsRequest),
70+
SuperStreamRoute(SuperStreamRouteRequest),
6771
}
6872

6973
impl Encoder for RequestKind {
@@ -97,6 +101,10 @@ impl Encoder for RequestKind {
97101
RequestKind::DeleteSuperStream(delete_super_stream) => {
98102
delete_super_stream.encoded_size()
99103
}
104+
RequestKind::SuperStreamPartitions(super_stream_partitions) => {
105+
super_stream_partitions.encoded_size()
106+
}
107+
RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encoded_size(),
100108
}
101109
}
102110

@@ -130,6 +138,10 @@ impl Encoder for RequestKind {
130138
RequestKind::DeleteSuperStream(delete_super_stream) => {
131139
delete_super_stream.encode(writer)
132140
}
141+
RequestKind::SuperStreamPartitions(super_stream_partition) => {
142+
super_stream_partition.encode(writer)
143+
}
144+
RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encode(writer),
133145
}
134146
}
135147
}
@@ -204,6 +216,12 @@ impl Decoder for Request {
204216
COMMAND_DELETE_SUPER_STREAM => {
205217
DeleteSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
206218
}
219+
COMMAND_PARTITIONS => {
220+
SuperStreamPartitionsRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
221+
}
222+
COMMAND_ROUTE => {
223+
SuperStreamRouteRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
224+
}
207225
n => return Err(DecodeError::UnsupportedResponseType(n)),
208226
};
209227
Ok((input, Request { header, kind: cmd }))
@@ -226,7 +244,9 @@ mod tests {
226244
peer_properties::PeerPropertiesCommand, publish::PublishCommand,
227245
query_offset::QueryOffsetRequest, query_publisher_sequence::QueryPublisherRequest,
228246
sasl_authenticate::SaslAuthenticateCommand, sasl_handshake::SaslHandshakeCommand,
229-
store_offset::StoreOffset, subscribe::SubscribeCommand, tune::TunesCommand,
247+
store_offset::StoreOffset, subscribe::SubscribeCommand,
248+
superstream_partitions::SuperStreamPartitionsRequest,
249+
superstream_route::SuperStreamRouteRequest, tune::TunesCommand,
230250
unsubscribe::UnSubscribeCommand, Command,
231251
},
232252
};
@@ -357,4 +377,14 @@ mod tests {
357377
fn request_delete_super_stream_test() {
358378
request_encode_decode_test::<DeleteSuperStreamCommand>()
359379
}
380+
381+
#[test]
382+
fn request_partitions_command() {
383+
request_encode_decode_test::<SuperStreamPartitionsRequest>()
384+
}
385+
386+
#[test]
387+
fn request_route_command() {
388+
request_encode_decode_test::<SuperStreamRouteRequest>()
389+
}
360390
}

0 commit comments

Comments
 (0)