Skip to content

Commit 51590eb

Browse files
committed
implementing route and partition commands
1 parent 551df26 commit 51590eb

File tree

10 files changed

+501
-13
lines changed

10 files changed

+501
-13
lines changed

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
FROM rabbitmq:3.13-rc-management
1+
FROM rabbitmq:4.0.1-management
22

33
COPY .ci/conf/rabbitmq.conf /etc/rabbitmq/rabbitmq.conf
44
COPY .ci/conf/enabled_plugins /etc/rabbitmq/enabled_plugins
55

6-
COPY .ci/certs /etc/rabbitmq/certs
6+
COPY .ci/certs /etc/rabbitmq/certs

protocol/src/commands/create_super_stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ impl Command for CreateSuperStreamCommand {
8888

8989
#[cfg(test)]
9090
mod tests {
91-
use crate::commands::create_stream::CreateStreamCommand;
91+
9292
use crate::commands::tests::command_encode_decode_test;
9393

9494
use super::CreateSuperStreamCommand;

protocol/src/commands/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ pub mod sasl_authenticate;
2525
pub mod sasl_handshake;
2626
pub mod store_offset;
2727
pub mod subscribe;
28+
pub mod superstream_partitions;
29+
pub mod superstream_route;
2830
pub mod tune;
2931
pub mod unsubscribe;
3032

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
use std::io::Write;
2+
3+
#[cfg(test)]
4+
use fake::Fake;
5+
6+
use crate::{
7+
codec::{Decoder, Encoder},
8+
error::{DecodeError, EncodeError},
9+
protocol::commands::COMMAND_PARTITIONS,
10+
FromResponse, ResponseCode,
11+
};
12+
use crate::commands::exchange_command_versions::ExchangeCommandVersion;
13+
use super::Command;
14+
15+
#[cfg_attr(test, derive(fake::Dummy))]
16+
#[derive(PartialEq, Eq, Debug)]
17+
pub struct SuperStreamPartitionsRequest {
18+
correlation_id: u32,
19+
super_stream: String,
20+
}
21+
22+
impl SuperStreamPartitionsRequest {
23+
pub fn new(correlation_id: u32, super_stream: String) -> Self {
24+
Self {
25+
correlation_id,
26+
super_stream,
27+
}
28+
}
29+
}
30+
31+
impl Encoder for SuperStreamPartitionsRequest {
32+
fn encoded_size(&self) -> u32 {
33+
self.correlation_id.encoded_size() + self.super_stream.as_str().encoded_size()
34+
}
35+
36+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
37+
self.correlation_id.encode(writer)?;
38+
self.super_stream.as_str().encode(writer)?;
39+
Ok(())
40+
}
41+
}
42+
43+
impl Decoder for SuperStreamPartitionsRequest {
44+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
45+
let (input, correlation_id) = u32::decode(input)?;
46+
let (input, super_stream) = Option::decode(input)?;
47+
48+
Ok((
49+
input,
50+
SuperStreamPartitionsRequest {
51+
correlation_id,
52+
super_stream: super_stream.unwrap(),
53+
},
54+
))
55+
}
56+
}
57+
58+
impl Command for SuperStreamPartitionsRequest {
59+
fn key(&self) -> u16 {
60+
COMMAND_PARTITIONS
61+
}
62+
}
63+
64+
#[cfg_attr(test, derive(fake::Dummy))]
65+
#[derive(PartialEq, Eq, Debug)]
66+
pub struct SuperStreamPartitionsResponse {
67+
pub(crate) correlation_id: u32,
68+
response_code: ResponseCode,
69+
pub streams: Vec<String>,
70+
}
71+
72+
impl SuperStreamPartitionsResponse {
73+
pub fn new(correlation_id: u32, streams: Vec<String>, response_code: ResponseCode) -> Self {
74+
Self {
75+
correlation_id,
76+
response_code,
77+
streams,
78+
}
79+
}
80+
pub fn is_ok(&self) -> bool {
81+
self.response_code == ResponseCode::Ok
82+
}
83+
}
84+
85+
impl Encoder for SuperStreamPartitionsResponse {
86+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
87+
self.correlation_id.encode(writer)?;
88+
self.response_code.encode(writer)?;
89+
self.streams.encode(writer)?;
90+
Ok(())
91+
}
92+
93+
fn encoded_size(&self) -> u32 {
94+
self.correlation_id.encoded_size()
95+
+ self.response_code.encoded_size()
96+
+ self.streams.encoded_size()
97+
}
98+
}
99+
100+
impl Decoder for SuperStreamPartitionsResponse {
101+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
102+
let (input, correlation_id) = u32::decode(input)?;
103+
let (input, response_code) = ResponseCode::decode(input)?;
104+
let (input, streams) = <Vec<String>>::decode(input)?;
105+
106+
Ok((
107+
input,
108+
SuperStreamPartitionsResponse {
109+
correlation_id,
110+
response_code,
111+
streams,
112+
},
113+
))
114+
}
115+
}
116+
117+
impl FromResponse for SuperStreamPartitionsResponse {
118+
fn from_response(response: crate::Response) -> Option<Self> {
119+
match response.kind {
120+
crate::ResponseKind::SuperStreamPartitions(partitions_response) => {
121+
Some(partitions_response)
122+
}
123+
_ => None,
124+
}
125+
}
126+
}
127+
128+
#[cfg(test)]
129+
mod tests {
130+
131+
use crate::commands::tests::command_encode_decode_test;
132+
133+
use super::SuperStreamPartitionsRequest;
134+
use super::SuperStreamPartitionsResponse;
135+
136+
#[test]
137+
fn super_stream_partition_request_test() {
138+
command_encode_decode_test::<SuperStreamPartitionsRequest>();
139+
}
140+
141+
#[test]
142+
fn super_stream_partition_response_test() {
143+
command_encode_decode_test::<SuperStreamPartitionsResponse>();
144+
}
145+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
use std::io::Write;
2+
3+
#[cfg(test)]
4+
use fake::Fake;
5+
6+
use crate::{
7+
codec::{Decoder, Encoder},
8+
error::{DecodeError, EncodeError},
9+
protocol::commands::COMMAND_ROUTE,
10+
FromResponse, ResponseCode,
11+
};
12+
13+
use super::Command;
14+
15+
#[cfg_attr(test, derive(fake::Dummy))]
16+
#[derive(PartialEq, Eq, Debug)]
17+
pub struct SuperStreamRouteRequest {
18+
correlation_id: u32,
19+
routing_key: String,
20+
super_stream: String,
21+
}
22+
23+
impl SuperStreamRouteRequest {
24+
pub fn new(correlation_id: u32, routing_key: String, super_stream: String) -> Self {
25+
Self {
26+
correlation_id,
27+
routing_key,
28+
super_stream,
29+
}
30+
}
31+
}
32+
33+
impl Encoder for SuperStreamRouteRequest {
34+
fn encoded_size(&self) -> u32 {
35+
self.correlation_id.encoded_size()
36+
+ self.routing_key.as_str().encoded_size()
37+
+ self.super_stream.as_str().encoded_size()
38+
}
39+
40+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
41+
self.correlation_id.encode(writer)?;
42+
self.routing_key.as_str().encode(writer)?;
43+
self.super_stream.as_str().encode(writer)?;
44+
Ok(())
45+
}
46+
}
47+
48+
impl Decoder for SuperStreamRouteRequest {
49+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
50+
let (input, correlation_id) = u32::decode(input)?;
51+
let (input, routing_key) = Option::decode(input)?;
52+
let (input, super_stream) = Option::decode(input)?;
53+
54+
Ok((
55+
input,
56+
SuperStreamRouteRequest {
57+
correlation_id,
58+
routing_key: routing_key.unwrap(),
59+
super_stream: super_stream.unwrap(),
60+
},
61+
))
62+
}
63+
}
64+
65+
impl Command for SuperStreamRouteRequest {
66+
fn key(&self) -> u16 {
67+
COMMAND_ROUTE
68+
}
69+
}
70+
71+
#[cfg_attr(test, derive(fake::Dummy))]
72+
#[derive(PartialEq, Eq, Debug)]
73+
pub struct SuperStreamRouteResponse {
74+
pub(crate) correlation_id: u32,
75+
response_code: ResponseCode,
76+
pub streams: Vec<String>,
77+
}
78+
79+
impl SuperStreamRouteResponse {
80+
pub fn new(correlation_id: u32, streams: Vec<String>, response_code: ResponseCode) -> Self {
81+
Self {
82+
correlation_id,
83+
response_code,
84+
streams,
85+
}
86+
}
87+
pub fn is_ok(&self) -> bool {
88+
self.response_code == ResponseCode::Ok
89+
}
90+
}
91+
92+
impl Encoder for SuperStreamRouteResponse {
93+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
94+
self.correlation_id.encode(writer)?;
95+
self.response_code.encode(writer)?;
96+
self.streams.encode(writer)?;
97+
Ok(())
98+
}
99+
100+
fn encoded_size(&self) -> u32 {
101+
self.correlation_id.encoded_size()
102+
+ self.streams.encoded_size()
103+
+ self.response_code.encoded_size()
104+
}
105+
}
106+
107+
impl Decoder for SuperStreamRouteResponse {
108+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
109+
let (input, correlation_id) = u32::decode(input)?;
110+
let (input, response_code) = ResponseCode::decode(input)?;
111+
let (input, streams) = Vec::decode(input)?;
112+
113+
Ok((
114+
input,
115+
SuperStreamRouteResponse {
116+
correlation_id,
117+
response_code,
118+
streams,
119+
},
120+
))
121+
}
122+
}
123+
124+
impl FromResponse for SuperStreamRouteResponse {
125+
fn from_response(response: crate::Response) -> Option<Self> {
126+
match response.kind {
127+
crate::ResponseKind::SuperStreamRoute(route) => Some(route),
128+
_ => None,
129+
}
130+
}
131+
}
132+
133+
#[cfg(test)]
134+
mod tests {
135+
136+
use crate::commands::tests::command_encode_decode_test;
137+
138+
use super::SuperStreamRouteRequest;
139+
use super::SuperStreamRouteResponse;
140+
141+
#[test]
142+
fn super_stream_route_request_test() {
143+
command_encode_decode_test::<SuperStreamRouteRequest>();
144+
}
145+
146+
#[test]
147+
fn super_stream_route_response_test() {
148+
command_encode_decode_test::<SuperStreamRouteResponse>();
149+
}
150+
}

0 commit comments

Comments
 (0)