Skip to content

Commit 309973a

Browse files
committed
WIP: implementing create and delete superstream commands
1 parent 04e184b commit 309973a

File tree

12 files changed

+374
-5
lines changed

12 files changed

+374
-5
lines changed
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
use std::collections::HashMap;
2+
use std::io::Write;
3+
4+
#[cfg(test)]
5+
use fake::Fake;
6+
7+
use crate::{
8+
codec::{Decoder, Encoder},
9+
error::{DecodeError, EncodeError},
10+
protocol::commands::COMMAND_CREATE_SUPER_STREAM,
11+
};
12+
13+
use super::Command;
14+
15+
#[cfg_attr(test, derive(fake::Dummy))]
16+
#[derive(PartialEq, Eq, Debug)]
17+
pub struct CreateSuperStreamCommand {
18+
correlation_id: u32,
19+
super_stream_name: String,
20+
partitions: Vec<String>,
21+
binding_keys: Vec<String>,
22+
args: HashMap<String, String>,
23+
}
24+
25+
impl CreateSuperStreamCommand {
26+
pub fn new(
27+
correlation_id: u32,
28+
super_stream_name: String,
29+
partitions: Vec<String>,
30+
binding_keys: Vec<String>,
31+
args: HashMap<String, String>,
32+
) -> Self {
33+
Self {
34+
correlation_id,
35+
super_stream_name,
36+
partitions,
37+
binding_keys,
38+
args,
39+
}
40+
}
41+
}
42+
43+
impl Encoder for CreateSuperStreamCommand {
44+
fn encoded_size(&self) -> u32 {
45+
self.correlation_id.encoded_size()
46+
+ self.super_stream_name.as_str().encoded_size()
47+
+ self.partitions.encoded_size()
48+
+ self.binding_keys.encoded_size()
49+
+ self.args.encoded_size()
50+
}
51+
52+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
53+
self.correlation_id.encode(writer)?;
54+
self.super_stream_name.as_str().encode(writer)?;
55+
self.partitions.encode(writer)?;
56+
self.binding_keys.encode(writer)?;
57+
self.args.encode(writer)?;
58+
Ok(())
59+
}
60+
}
61+
62+
impl Decoder for CreateSuperStreamCommand {
63+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
64+
let (input, correlation_id) = u32::decode(input)?;
65+
let (input, super_stream_name) = Option::decode(input)?;
66+
let (input, partitions) = <Vec<String>>::decode(input)?;
67+
let (input, binding_keys) = <Vec<String>>::decode(input)?;
68+
let (input, args) = HashMap::decode(input)?;
69+
70+
Ok((
71+
input,
72+
CreateSuperStreamCommand {
73+
correlation_id,
74+
super_stream_name: super_stream_name.unwrap(),
75+
partitions,
76+
binding_keys,
77+
args,
78+
},
79+
))
80+
}
81+
}
82+
83+
impl Command for CreateSuperStreamCommand {
84+
fn key(&self) -> u16 {
85+
COMMAND_CREATE_SUPER_STREAM
86+
}
87+
}
88+
89+
#[cfg(test)]
90+
mod tests {
91+
use crate::commands::create_stream::CreateStreamCommand;
92+
use crate::commands::tests::command_encode_decode_test;
93+
94+
use super::CreateSuperStreamCommand;
95+
96+
#[test]
97+
fn create_super_stream_request_test() {
98+
command_encode_decode_test::<CreateSuperStreamCommand>();
99+
}
100+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use std::io::Write;
2+
3+
#[cfg(test)]
4+
use fake::Fake;
5+
6+
use crate::{
7+
codec::{Decoder, Encoder},
8+
error::{DecodeError, EncodeError},
9+
protocol::commands::COMMAND_DELETE_SUPER_STREAM,
10+
};
11+
12+
use super::Command;
13+
14+
#[cfg_attr(test, derive(fake::Dummy))]
15+
#[derive(PartialEq, Eq, Debug)]
16+
pub struct DeleteSuperStreamCommand {
17+
correlation_id: u32,
18+
super_stream_name: String,
19+
}
20+
21+
impl DeleteSuperStreamCommand {
22+
pub fn new(correlation_id: u32, super_stream_name: String) -> Self {
23+
Self {
24+
correlation_id,
25+
super_stream_name,
26+
}
27+
}
28+
}
29+
30+
impl Encoder for DeleteSuperStreamCommand {
31+
fn encoded_size(&self) -> u32 {
32+
self.correlation_id.encoded_size() + self.super_stream_name.as_str().encoded_size()
33+
}
34+
35+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
36+
self.correlation_id.encode(writer)?;
37+
self.super_stream_name.as_str().encode(writer)?;
38+
Ok(())
39+
}
40+
}
41+
42+
impl Decoder for DeleteSuperStreamCommand {
43+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
44+
let (input, correlation_id) = u32::decode(input)?;
45+
let (input, super_stream_name) = Option::decode(input)?;
46+
47+
Ok((
48+
input,
49+
DeleteSuperStreamCommand {
50+
correlation_id,
51+
super_stream_name: super_stream_name.unwrap(),
52+
},
53+
))
54+
}
55+
}
56+
57+
impl Command for DeleteSuperStreamCommand {
58+
fn key(&self) -> u16 {
59+
COMMAND_DELETE_SUPER_STREAM
60+
}
61+
}
62+
63+
#[cfg(test)]
64+
mod tests {
65+
use crate::commands::create_super_stream::CreateSuperStreamCommand;
66+
use crate::commands::tests::command_encode_decode_test;
67+
68+
use super::DeleteSuperStreamCommand;
69+
70+
#[test]
71+
fn delete_super_stream_request_test() {
72+
command_encode_decode_test::<DeleteSuperStreamCommand>();
73+
}
74+
}

protocol/src/commands/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ use crate::protocol::version::PROTOCOL_VERSION;
22

33
pub mod close;
44
pub mod create_stream;
5+
pub mod create_super_stream;
56
pub mod credit;
67
pub mod declare_publisher;
78
pub mod delete;
89
pub mod delete_publisher;
10+
pub mod delete_super_stream;
911
pub mod deliver;
1012
pub mod exchange_command_versions;
1113
pub mod generic;

protocol/src/protocol.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ pub mod commands {
3030
pub const COMMAND_CONSUMER_UPDATE: u16 = 26;
3131
pub const COMMAND_EXCHANGE_COMMAND_VERSIONS: u16 = 27;
3232
pub const COMMAND_STREAMS_STATS: u16 = 28;
33+
pub const COMMAND_CREATE_SUPER_STREAM: u16 = 29;
34+
pub const COMMAND_DELETE_SUPER_STREAM: u16 = 30;
3335
}
3436

3537
// server responses

protocol/src/request/mod.rs

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ use std::io::Write;
33
use crate::{
44
codec::{decoder::read_u32, Decoder, Encoder},
55
commands::{
6-
close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand,
6+
close::CloseRequest, create_stream::CreateStreamCommand,
7+
create_super_stream::CreateSuperStreamCommand, credit::CreditCommand,
78
declare_publisher::DeclarePublisherCommand, delete::Delete,
8-
delete_publisher::DeletePublisherCommand,
9+
delete_publisher::DeletePublisherCommand, delete_super_stream::DeleteSuperStreamCommand,
910
exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand,
1011
metadata::MetadataCommand, open::OpenCommand, peer_properties::PeerPropertiesCommand,
1112
publish::PublishCommand, query_offset::QueryOffsetRequest,
@@ -20,6 +21,7 @@ use crate::{
2021
};
2122

2223
use byteorder::{BigEndian, WriteBytesExt};
24+
2325
mod shims;
2426
#[derive(Debug, PartialEq, Eq)]
2527
pub struct Request {
@@ -60,6 +62,8 @@ pub enum RequestKind {
6062
StoreOffset(StoreOffset),
6163
Unsubscribe(UnSubscribeCommand),
6264
ExchangeCommandVersions(ExchangeCommandVersionsRequest),
65+
CreateSuperStream(CreateSuperStreamCommand),
66+
DeleteSuperStream(DeleteSuperStreamCommand),
6367
}
6468

6569
impl Encoder for RequestKind {
@@ -87,6 +91,12 @@ impl Encoder for RequestKind {
8791
RequestKind::ExchangeCommandVersions(exchange_command_versions) => {
8892
exchange_command_versions.encoded_size()
8993
}
94+
RequestKind::CreateSuperStream(create_super_stream) => {
95+
create_super_stream.encoded_size()
96+
}
97+
RequestKind::DeleteSuperStream(delete_super_stream) => {
98+
delete_super_stream.encoded_size()
99+
}
90100
}
91101
}
92102

@@ -114,6 +124,12 @@ impl Encoder for RequestKind {
114124
RequestKind::ExchangeCommandVersions(exchange_command_versions) => {
115125
exchange_command_versions.encode(writer)
116126
}
127+
RequestKind::CreateSuperStream(create_super_stream) => {
128+
create_super_stream.encode(writer)
129+
}
130+
RequestKind::DeleteSuperStream(delete_super_stream) => {
131+
delete_super_stream.encode(writer)
132+
}
117133
}
118134
}
119135
}
@@ -182,6 +198,12 @@ impl Decoder for Request {
182198
COMMAND_EXCHANGE_COMMAND_VERSIONS => {
183199
ExchangeCommandVersionsRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
184200
}
201+
COMMAND_CREATE_SUPER_STREAM => {
202+
CreateSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
203+
}
204+
COMMAND_DELETE_SUPER_STREAM => {
205+
DeleteSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
206+
}
185207
n => return Err(DecodeError::UnsupportedResponseType(n)),
186208
};
187209
Ok((input, Request { header, kind: cmd }))
@@ -194,9 +216,11 @@ mod tests {
194216
use crate::{
195217
codec::{Decoder, Encoder},
196218
commands::{
197-
close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand,
219+
close::CloseRequest, create_stream::CreateStreamCommand,
220+
create_super_stream::CreateSuperStreamCommand, credit::CreditCommand,
198221
declare_publisher::DeclarePublisherCommand, delete::Delete,
199222
delete_publisher::DeletePublisherCommand,
223+
delete_super_stream::DeleteSuperStreamCommand,
200224
exchange_command_versions::ExchangeCommandVersionsRequest,
201225
heart_beat::HeartBeatCommand, metadata::MetadataCommand, open::OpenCommand,
202226
peer_properties::PeerPropertiesCommand, publish::PublishCommand,
@@ -209,9 +233,8 @@ mod tests {
209233

210234
use std::fmt::Debug;
211235

212-
use fake::{Dummy, Fake, Faker};
213-
214236
use super::Request;
237+
use fake::{Dummy, Fake, Faker};
215238

216239
#[test]
217240
fn request_open_test() {
@@ -324,4 +347,14 @@ mod tests {
324347
fn request_exchange_command_versions_test() {
325348
request_encode_decode_test::<ExchangeCommandVersionsRequest>()
326349
}
350+
351+
#[test]
352+
fn request_create_super_stream_test() {
353+
request_encode_decode_test::<CreateSuperStreamCommand>()
354+
}
355+
356+
#[test]
357+
fn request_delete_super_stream_test() {
358+
request_encode_decode_test::<DeleteSuperStreamCommand>()
359+
}
327360
}

protocol/src/request/shims.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use crate::commands::create_super_stream::CreateSuperStreamCommand;
2+
use crate::commands::delete_super_stream::DeleteSuperStreamCommand;
13
use crate::{
24
commands::{
35
close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand,
@@ -14,6 +16,7 @@ use crate::{
1416
types::Header,
1517
Request, RequestKind,
1618
};
19+
1720
impl<T> From<T> for Request
1821
where
1922
T: Into<RequestKind> + Command,
@@ -135,3 +138,15 @@ impl From<ExchangeCommandVersionsRequest> for RequestKind {
135138
RequestKind::ExchangeCommandVersions(cmd)
136139
}
137140
}
141+
142+
impl From<CreateSuperStreamCommand> for RequestKind {
143+
fn from(cmd: CreateSuperStreamCommand) -> Self {
144+
RequestKind::CreateSuperStream(cmd)
145+
}
146+
}
147+
148+
impl From<DeleteSuperStreamCommand> for RequestKind {
149+
fn from(cmd: DeleteSuperStreamCommand) -> Self {
150+
RequestKind::DeleteSuperStream(cmd)
151+
}
152+
}

protocol/src/response/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ impl Decoder for Response {
139139
| COMMAND_SUBSCRIBE
140140
| COMMAND_UNSUBSCRIBE
141141
| COMMAND_CREATE_STREAM
142+
| COMMAND_CREATE_SUPER_STREAM
143+
| COMMAND_DELETE_SUPER_STREAM
142144
| COMMAND_DELETE_STREAM => {
143145
GenericResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Generic(kind)))?
144146
}
@@ -250,6 +252,7 @@ mod tests {
250252
ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
251253
exchange_command_versions.encoded_size()
252254
}
255+
ResponseKind(exchange_command_versions) => exchange_command_versions.encoded_size(),
253256
}
254257
}
255258

0 commit comments

Comments
 (0)