Skip to content

Commit d185700

Browse files
committed
Added composition decoders and encoders
1 parent 611b793 commit d185700

File tree

4 files changed

+368
-0
lines changed

4 files changed

+368
-0
lines changed
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
use bytes::BytesMut;
2+
3+
use crate::codec::{Decoder, Encoder};
4+
5+
/// A [`Decoder`] that composes two decoders into a single decoder.
6+
///
7+
/// The first decoder must output [`BytesMut`], to be connected to the second one.
8+
///
9+
/// This decoder will hold the intermediate result of the first decoder in a buffer.
10+
///
11+
/// [`Decoder`]: crate::codec::Decoder
12+
#[derive(Debug)]
13+
pub struct CompositionDecoder<D1, D2>
14+
where
15+
D1: Decoder<Item = BytesMut>,
16+
D2: Decoder,
17+
{
18+
first: D1,
19+
second: D2,
20+
intermediate_buffer: BytesMut,
21+
}
22+
23+
impl<D1, D2> CompositionDecoder<D1, D2>
24+
where
25+
D1: Decoder<Item = BytesMut>,
26+
D2: Decoder,
27+
{
28+
/// Creates a new `CompositionDecoder` that will use `first` to decode data
29+
/// and then pass the result to `second`.
30+
pub fn new(first: D1, second: D2) -> Self {
31+
Self {
32+
first,
33+
second,
34+
intermediate_buffer: BytesMut::new(),
35+
}
36+
}
37+
}
38+
39+
impl<D1, D2> Decoder for CompositionDecoder<D1, D2>
40+
where
41+
D1: Decoder<Item = BytesMut>,
42+
D2: Decoder,
43+
{
44+
type Item = D2::Item;
45+
46+
type Error = CompositionDecoderError<D1, D2>;
47+
48+
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
49+
// First, check if we can decode using our buffer
50+
if !self.intermediate_buffer.is_empty() {
51+
if let Some(result) = self
52+
.second
53+
.decode(&mut self.intermediate_buffer)
54+
.map_err(CompositionDecoderError::D2)?
55+
{
56+
return Ok(Some(result));
57+
}
58+
}
59+
60+
// Then, try to load more data into the second decoder's buffer using the first decoder
61+
if let Some(intermediate) = self
62+
.first
63+
.decode(src)
64+
.map_err(CompositionDecoderError::D1)?
65+
{
66+
// Add the intermediate result to our buffer
67+
self.intermediate_buffer.extend_from_slice(&intermediate);
68+
}
69+
70+
// Now retry to decode using the second decoder
71+
self.second
72+
.decode(&mut self.intermediate_buffer)
73+
.map_err(CompositionDecoderError::D2)
74+
}
75+
}
76+
77+
#[derive(Debug)]
78+
pub enum CompositionDecoderError<D1, D2>
79+
where
80+
D1: Decoder,
81+
D2: Decoder,
82+
{
83+
Io(std::io::Error),
84+
D1(D1::Error),
85+
D2(D2::Error),
86+
}
87+
88+
impl<D1, D2> From<std::io::Error> for CompositionDecoderError<D1, D2>
89+
where
90+
D1: Decoder,
91+
D2: Decoder,
92+
{
93+
fn from(err: std::io::Error) -> Self {
94+
CompositionDecoderError::Io(err)
95+
}
96+
}
97+
98+
/// Extension trait for [`Decoder`] that allows composing a decoder after `self`.
99+
pub trait DecoderCompositionExt: Decoder<Item = BytesMut> {
100+
/// Compose a decoder after `self`.
101+
fn compose_decoder<D>(self, other: D) -> CompositionDecoder<Self, D>
102+
where
103+
Self: Sized,
104+
D: Decoder,
105+
{
106+
CompositionDecoder::new(self, other)
107+
}
108+
}
109+
110+
impl<D> DecoderCompositionExt for D
111+
where
112+
D: Decoder<Item = BytesMut>,
113+
{
114+
fn compose_decoder<D2>(self, other: D2) -> CompositionDecoder<Self, D2>
115+
where
116+
Self: Sized,
117+
D2: Decoder,
118+
{
119+
CompositionDecoder::new(self, other)
120+
}
121+
}
122+
123+
/// An [`Encoder`] that composes two encoders into a single encoder.
124+
///
125+
/// The second encoder must take [`BytesMut`], to be connected to the first one.
126+
///
127+
/// [`Encoder`]: crate::codec::Encoder
128+
#[derive(Debug)]
129+
pub struct CompositionEncoder<E1, E2> {
130+
first: E1,
131+
second: E2,
132+
}
133+
134+
impl<E1, E2> CompositionEncoder<E1, E2> {
135+
/// Creates a new `CompositionEncoder` that will use `first` to encode data
136+
/// and then pass the result to `second`.
137+
pub fn new(first: E1, second: E2) -> Self {
138+
Self { first, second }
139+
}
140+
}
141+
142+
impl<E1, E2, Item> Encoder<Item> for CompositionEncoder<E1, E2>
143+
where
144+
E1: Encoder<Item>,
145+
E2: Encoder<BytesMut>,
146+
{
147+
type Error = CompositionEncoderError<E1, E2, Item>;
148+
149+
fn encode(&mut self, item: Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
150+
let mut intermediate = BytesMut::new();
151+
152+
// First, try to encode using our first encoder
153+
self.first
154+
.encode(item, &mut intermediate)
155+
.map_err(CompositionEncoderError::D1)?;
156+
157+
// Then, try to encode using our second encoder
158+
self.second
159+
.encode(intermediate, dst)
160+
.map_err(CompositionEncoderError::D2)?;
161+
162+
Ok(())
163+
}
164+
}
165+
166+
#[derive(Debug)]
167+
pub enum CompositionEncoderError<D1, D2, Item>
168+
where
169+
D1: Encoder<Item>,
170+
D2: Encoder<BytesMut>,
171+
{
172+
Io(std::io::Error),
173+
D1(D1::Error),
174+
D2(D2::Error),
175+
}
176+
177+
impl<D1, D2, Item> From<std::io::Error> for CompositionEncoderError<D1, D2, Item>
178+
where
179+
D1: Encoder<Item>,
180+
D2: Encoder<BytesMut>,
181+
{
182+
fn from(err: std::io::Error) -> Self {
183+
CompositionEncoderError::Io(err)
184+
}
185+
}
186+
187+
/// Extension trait for [`Encoder`] that allows composing a encoder after `self`.
188+
pub trait EncoderCompositionExt<E>: Encoder<E> {
189+
/// Compose a encoder after `self`.
190+
fn compose_encoder<D>(self, other: D) -> CompositionEncoder<Self, D>
191+
where
192+
Self: Sized,
193+
D: Encoder<BytesMut>,
194+
{
195+
CompositionEncoder::new(self, other)
196+
}
197+
}
198+
199+
impl<D, E> EncoderCompositionExt<E> for D
200+
where
201+
D: Encoder<E>,
202+
{
203+
fn compose_encoder<D2>(self, other: D2) -> CompositionEncoder<Self, D2>
204+
where
205+
Self: Sized,
206+
D2: Encoder<BytesMut>,
207+
{
208+
CompositionEncoder::new(self, other)
209+
}
210+
}

tokio-util/src/codec/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,3 +354,8 @@ pub use self::lines_codec::{LinesCodec, LinesCodecError};
354354

355355
mod any_delimiter_codec;
356356
pub use self::any_delimiter_codec::{AnyDelimiterCodec, AnyDelimiterCodecError};
357+
358+
mod composition_codec;
359+
pub use self::composition_codec::{
360+
CompositionDecoder, CompositionEncoder, DecoderCompositionExt, EncoderCompositionExt,
361+
};
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use bytes::{Buf, BufMut, BytesMut};
2+
use tokio_util::codec::{Decoder, DecoderCompositionExt};
3+
4+
#[derive(Debug)]
5+
struct StringDecoder {}
6+
7+
impl Decoder for StringDecoder {
8+
type Item = String;
9+
10+
type Error = std::io::Error;
11+
12+
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
13+
if src.len() < size_of::<u32>() {
14+
return Ok(None);
15+
}
16+
17+
let length: usize = src.get_u32().try_into().expect("length is too large");
18+
if src.len() < length.into() {
19+
return Ok(None);
20+
}
21+
22+
let string = src.split_to(length.into());
23+
Ok(Some(
24+
String::from_utf8(string.to_vec()).expect("string is not valid utf8"),
25+
))
26+
}
27+
}
28+
29+
#[derive(Debug)]
30+
struct BitwiseNotDecoder {}
31+
32+
impl Decoder for BitwiseNotDecoder {
33+
type Item = BytesMut;
34+
35+
type Error = std::io::Error;
36+
37+
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
38+
let mut buffer = src.split();
39+
40+
bitwise_not_in_place(&mut buffer);
41+
42+
Ok(Some(buffer))
43+
}
44+
}
45+
46+
fn bitwise_not_in_place(buffer: &mut BytesMut) {
47+
for byte in buffer.iter_mut() {
48+
*byte = !*byte;
49+
}
50+
}
51+
52+
#[test]
53+
fn test_simple_chain() {
54+
let mut decoder = BitwiseNotDecoder {}.compose_decoder(StringDecoder {});
55+
56+
let string = String::from("Hello, world!");
57+
let mut buffer = BytesMut::new();
58+
buffer.put_u32(string.len() as u32);
59+
buffer.put(string.as_bytes());
60+
61+
bitwise_not_in_place(&mut buffer);
62+
63+
let result = decoder.decode(&mut buffer).unwrap().unwrap();
64+
assert_eq!(result, string);
65+
}
66+
67+
#[test]
68+
fn test_not_not_not() {
69+
let mut decoder = BitwiseNotDecoder {}
70+
.compose_decoder(BitwiseNotDecoder {})
71+
.compose_decoder(BitwiseNotDecoder {})
72+
.compose_decoder(StringDecoder {});
73+
74+
let string = String::from("Hello, world!");
75+
let mut buffer = BytesMut::new();
76+
buffer.put_u32(string.len() as u32);
77+
buffer.put(string.as_bytes());
78+
79+
bitwise_not_in_place(&mut buffer);
80+
81+
let result = decoder.decode(&mut buffer).unwrap().unwrap();
82+
assert_eq!(result, string);
83+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
use bytes::{BufMut, BytesMut};
2+
use tokio_util::codec::{Encoder, EncoderCompositionExt};
3+
4+
#[derive(Debug)]
5+
struct StringEncoder {}
6+
7+
impl Encoder<String> for StringEncoder {
8+
type Error = std::io::Error;
9+
10+
fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
11+
let length: u32 = item.len().try_into().expect("length is too large");
12+
dst.put_u32(length);
13+
dst.put(item.as_bytes());
14+
Ok(())
15+
}
16+
}
17+
18+
#[derive(Debug)]
19+
struct BitwiseNotEncoder {}
20+
21+
impl Encoder<BytesMut> for BitwiseNotEncoder {
22+
type Error = std::io::Error;
23+
24+
fn encode(&mut self, mut item: BytesMut, dst: &mut BytesMut) -> Result<(), Self::Error> {
25+
bitwise_not_in_place(&mut item);
26+
dst.extend_from_slice(&item);
27+
Ok(())
28+
}
29+
}
30+
31+
fn bitwise_not_in_place(buffer: &mut BytesMut) {
32+
for byte in buffer.iter_mut() {
33+
*byte = !*byte;
34+
}
35+
}
36+
37+
#[test]
38+
fn test_simple_chain() {
39+
let mut encoder = StringEncoder {}.compose_encoder(BitwiseNotEncoder {});
40+
41+
let string = String::from("Hello, world!");
42+
let mut buffer = BytesMut::new();
43+
buffer.put_u32(string.len() as u32);
44+
buffer.put(string.as_bytes());
45+
46+
bitwise_not_in_place(&mut buffer);
47+
let mut dst = BytesMut::new();
48+
49+
encoder.encode(string, &mut dst).unwrap();
50+
assert_eq!(dst, buffer);
51+
}
52+
53+
#[test]
54+
fn test_not_not() {
55+
let mut encoder = StringEncoder {}
56+
.compose_encoder(BitwiseNotEncoder {})
57+
.compose_encoder(BitwiseNotEncoder {})
58+
.compose_encoder(BitwiseNotEncoder {});
59+
60+
let string = String::from("Hello, world!");
61+
let mut buffer = BytesMut::new();
62+
buffer.put_u32(string.len() as u32);
63+
buffer.put(string.as_bytes());
64+
65+
bitwise_not_in_place(&mut buffer);
66+
let mut dst = BytesMut::new();
67+
68+
encoder.encode(string, &mut dst).unwrap();
69+
assert_eq!(dst, buffer);
70+
}

0 commit comments

Comments
 (0)