Skip to content

Commit d705bfb

Browse files
Superstream: Create/Delete superstream and Partition and route commands (#230)
* WIP: implementing create and delete superstream commands * adding superstream error-case test * implementing route and partition commands * refactoring tests
1 parent 04e184b commit d705bfb

File tree

16 files changed

+881
-21
lines changed

16 files changed

+881
-21
lines changed

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
FROM rabbitmq:3.13-rc-management
1+
FROM rabbitmq:4.0.1-management
22

33
COPY .ci/conf/rabbitmq.conf /etc/rabbitmq/rabbitmq.conf
44
COPY .ci/conf/enabled_plugins /etc/rabbitmq/enabled_plugins
55

6-
COPY .ci/certs /etc/rabbitmq/certs
6+
COPY .ci/certs /etc/rabbitmq/certs
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
use std::collections::HashMap;
2+
use std::io::Write;
3+
4+
#[cfg(test)]
5+
use fake::Fake;
6+
7+
use crate::{
8+
codec::{Decoder, Encoder},
9+
error::{DecodeError, EncodeError},
10+
protocol::commands::COMMAND_CREATE_SUPER_STREAM,
11+
};
12+
13+
use super::Command;
14+
15+
#[cfg_attr(test, derive(fake::Dummy))]
16+
#[derive(PartialEq, Eq, Debug)]
17+
pub struct CreateSuperStreamCommand {
18+
correlation_id: u32,
19+
super_stream_name: String,
20+
partitions: Vec<String>,
21+
binding_keys: Vec<String>,
22+
args: HashMap<String, String>,
23+
}
24+
25+
impl CreateSuperStreamCommand {
26+
pub fn new(
27+
correlation_id: u32,
28+
super_stream_name: String,
29+
partitions: Vec<String>,
30+
binding_keys: Vec<String>,
31+
args: HashMap<String, String>,
32+
) -> Self {
33+
Self {
34+
correlation_id,
35+
super_stream_name,
36+
partitions,
37+
binding_keys,
38+
args,
39+
}
40+
}
41+
}
42+
43+
impl Encoder for CreateSuperStreamCommand {
44+
fn encoded_size(&self) -> u32 {
45+
self.correlation_id.encoded_size()
46+
+ self.super_stream_name.as_str().encoded_size()
47+
+ self.partitions.encoded_size()
48+
+ self.binding_keys.encoded_size()
49+
+ self.args.encoded_size()
50+
}
51+
52+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
53+
self.correlation_id.encode(writer)?;
54+
self.super_stream_name.as_str().encode(writer)?;
55+
self.partitions.encode(writer)?;
56+
self.binding_keys.encode(writer)?;
57+
self.args.encode(writer)?;
58+
Ok(())
59+
}
60+
}
61+
62+
impl Decoder for CreateSuperStreamCommand {
63+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
64+
let (input, correlation_id) = u32::decode(input)?;
65+
let (input, super_stream_name) = Option::decode(input)?;
66+
let (input, partitions) = <Vec<String>>::decode(input)?;
67+
let (input, binding_keys) = <Vec<String>>::decode(input)?;
68+
let (input, args) = HashMap::decode(input)?;
69+
70+
Ok((
71+
input,
72+
CreateSuperStreamCommand {
73+
correlation_id,
74+
super_stream_name: super_stream_name.unwrap(),
75+
partitions,
76+
binding_keys,
77+
args,
78+
},
79+
))
80+
}
81+
}
82+
83+
impl Command for CreateSuperStreamCommand {
84+
fn key(&self) -> u16 {
85+
COMMAND_CREATE_SUPER_STREAM
86+
}
87+
}
88+
89+
#[cfg(test)]
90+
mod tests {
91+
92+
use crate::commands::tests::command_encode_decode_test;
93+
94+
use super::CreateSuperStreamCommand;
95+
96+
#[test]
97+
fn create_super_stream_request_test() {
98+
command_encode_decode_test::<CreateSuperStreamCommand>();
99+
}
100+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use std::io::Write;
2+
3+
#[cfg(test)]
4+
use fake::Fake;
5+
6+
use crate::{
7+
codec::{Decoder, Encoder},
8+
error::{DecodeError, EncodeError},
9+
protocol::commands::COMMAND_DELETE_SUPER_STREAM,
10+
};
11+
12+
use super::Command;
13+
14+
#[cfg_attr(test, derive(fake::Dummy))]
15+
#[derive(PartialEq, Eq, Debug)]
16+
pub struct DeleteSuperStreamCommand {
17+
correlation_id: u32,
18+
super_stream_name: String,
19+
}
20+
21+
impl DeleteSuperStreamCommand {
22+
pub fn new(correlation_id: u32, super_stream_name: String) -> Self {
23+
Self {
24+
correlation_id,
25+
super_stream_name,
26+
}
27+
}
28+
}
29+
30+
impl Encoder for DeleteSuperStreamCommand {
31+
fn encoded_size(&self) -> u32 {
32+
self.correlation_id.encoded_size() + self.super_stream_name.as_str().encoded_size()
33+
}
34+
35+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
36+
self.correlation_id.encode(writer)?;
37+
self.super_stream_name.as_str().encode(writer)?;
38+
Ok(())
39+
}
40+
}
41+
42+
impl Decoder for DeleteSuperStreamCommand {
43+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
44+
let (input, correlation_id) = u32::decode(input)?;
45+
let (input, super_stream_name) = Option::decode(input)?;
46+
47+
Ok((
48+
input,
49+
DeleteSuperStreamCommand {
50+
correlation_id,
51+
super_stream_name: super_stream_name.unwrap(),
52+
},
53+
))
54+
}
55+
}
56+
57+
impl Command for DeleteSuperStreamCommand {
58+
fn key(&self) -> u16 {
59+
COMMAND_DELETE_SUPER_STREAM
60+
}
61+
}
62+
63+
#[cfg(test)]
64+
mod tests {
65+
use crate::commands::create_super_stream::CreateSuperStreamCommand;
66+
use crate::commands::tests::command_encode_decode_test;
67+
68+
use super::DeleteSuperStreamCommand;
69+
70+
#[test]
71+
fn delete_super_stream_request_test() {
72+
command_encode_decode_test::<DeleteSuperStreamCommand>();
73+
}
74+
}

protocol/src/commands/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ use crate::protocol::version::PROTOCOL_VERSION;
22

33
pub mod close;
44
pub mod create_stream;
5+
pub mod create_super_stream;
56
pub mod credit;
67
pub mod declare_publisher;
78
pub mod delete;
89
pub mod delete_publisher;
10+
pub mod delete_super_stream;
911
pub mod deliver;
1012
pub mod exchange_command_versions;
1113
pub mod generic;
@@ -23,6 +25,8 @@ pub mod sasl_authenticate;
2325
pub mod sasl_handshake;
2426
pub mod store_offset;
2527
pub mod subscribe;
28+
pub mod superstream_partitions;
29+
pub mod superstream_route;
2630
pub mod tune;
2731
pub mod unsubscribe;
2832

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
use std::io::Write;
2+
3+
#[cfg(test)]
4+
use fake::Fake;
5+
6+
use super::Command;
7+
use crate::{
8+
codec::{Decoder, Encoder},
9+
error::{DecodeError, EncodeError},
10+
protocol::commands::COMMAND_PARTITIONS,
11+
FromResponse, ResponseCode,
12+
};
13+
14+
#[cfg_attr(test, derive(fake::Dummy))]
15+
#[derive(PartialEq, Eq, Debug)]
16+
pub struct SuperStreamPartitionsRequest {
17+
correlation_id: u32,
18+
super_stream: String,
19+
}
20+
21+
impl SuperStreamPartitionsRequest {
22+
pub fn new(correlation_id: u32, super_stream: String) -> Self {
23+
Self {
24+
correlation_id,
25+
super_stream,
26+
}
27+
}
28+
}
29+
30+
impl Encoder for SuperStreamPartitionsRequest {
31+
fn encoded_size(&self) -> u32 {
32+
self.correlation_id.encoded_size() + self.super_stream.as_str().encoded_size()
33+
}
34+
35+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
36+
self.correlation_id.encode(writer)?;
37+
self.super_stream.as_str().encode(writer)?;
38+
Ok(())
39+
}
40+
}
41+
42+
impl Decoder for SuperStreamPartitionsRequest {
43+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
44+
let (input, correlation_id) = u32::decode(input)?;
45+
let (input, super_stream) = Option::decode(input)?;
46+
47+
Ok((
48+
input,
49+
SuperStreamPartitionsRequest {
50+
correlation_id,
51+
super_stream: super_stream.unwrap(),
52+
},
53+
))
54+
}
55+
}
56+
57+
impl Command for SuperStreamPartitionsRequest {
58+
fn key(&self) -> u16 {
59+
COMMAND_PARTITIONS
60+
}
61+
}
62+
63+
#[cfg_attr(test, derive(fake::Dummy))]
64+
#[derive(PartialEq, Eq, Debug)]
65+
pub struct SuperStreamPartitionsResponse {
66+
pub(crate) correlation_id: u32,
67+
response_code: ResponseCode,
68+
pub streams: Vec<String>,
69+
}
70+
71+
impl SuperStreamPartitionsResponse {
72+
pub fn new(correlation_id: u32, streams: Vec<String>, response_code: ResponseCode) -> Self {
73+
Self {
74+
correlation_id,
75+
response_code,
76+
streams,
77+
}
78+
}
79+
pub fn is_ok(&self) -> bool {
80+
self.response_code == ResponseCode::Ok
81+
}
82+
}
83+
84+
impl Encoder for SuperStreamPartitionsResponse {
85+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
86+
self.correlation_id.encode(writer)?;
87+
self.response_code.encode(writer)?;
88+
self.streams.encode(writer)?;
89+
Ok(())
90+
}
91+
92+
fn encoded_size(&self) -> u32 {
93+
self.correlation_id.encoded_size()
94+
+ self.response_code.encoded_size()
95+
+ self.streams.encoded_size()
96+
}
97+
}
98+
99+
impl Decoder for SuperStreamPartitionsResponse {
100+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
101+
let (input, correlation_id) = u32::decode(input)?;
102+
let (input, response_code) = ResponseCode::decode(input)?;
103+
let (input, streams) = <Vec<String>>::decode(input)?;
104+
105+
Ok((
106+
input,
107+
SuperStreamPartitionsResponse {
108+
correlation_id,
109+
response_code,
110+
streams,
111+
},
112+
))
113+
}
114+
}
115+
116+
impl FromResponse for SuperStreamPartitionsResponse {
117+
fn from_response(response: crate::Response) -> Option<Self> {
118+
match response.kind {
119+
crate::ResponseKind::SuperStreamPartitions(partitions_response) => {
120+
Some(partitions_response)
121+
}
122+
_ => None,
123+
}
124+
}
125+
}
126+
127+
#[cfg(test)]
128+
mod tests {
129+
130+
use crate::commands::tests::command_encode_decode_test;
131+
132+
use super::SuperStreamPartitionsRequest;
133+
use super::SuperStreamPartitionsResponse;
134+
135+
#[test]
136+
fn super_stream_partition_request_test() {
137+
command_encode_decode_test::<SuperStreamPartitionsRequest>();
138+
}
139+
140+
#[test]
141+
fn super_stream_partition_response_test() {
142+
command_encode_decode_test::<SuperStreamPartitionsResponse>();
143+
}
144+
}

0 commit comments

Comments
 (0)