Skip to content

Commit ea9ceed

Browse files
committed
Added composition decoders and encoders
1 parent 611b793 commit ea9ceed

File tree

4 files changed

+371
-0
lines changed

4 files changed

+371
-0
lines changed
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
use bytes::BytesMut;
2+
3+
use crate::codec::{Decoder, Encoder};
4+
5+
/// A [`Decoder`] that composes two decoders into a single decoder.
6+
///
7+
/// The first decoder must output [`BytesMut`], to be connected to the second one.
8+
///
9+
/// This decoder will hold the intermediate result of the first decoder in a buffer.
10+
///
11+
/// [`Decoder`]: crate::codec::Decoder
12+
#[derive(Debug)]
13+
pub struct CompositionDecoder<D1, D2>
14+
where
15+
D1: Decoder<Item = BytesMut>,
16+
D2: Decoder,
17+
{
18+
first: D1,
19+
second: D2,
20+
intermediate_buffer: BytesMut,
21+
}
22+
23+
impl<D1, D2> CompositionDecoder<D1, D2>
24+
where
25+
D1: Decoder<Item = BytesMut>,
26+
D2: Decoder,
27+
{
28+
/// Creates a new [`CompositionDecoder`] that will use `first` to decode data
29+
/// and then pass the result to `second`.
30+
pub fn new(first: D1, second: D2) -> Self {
31+
Self {
32+
first,
33+
second,
34+
intermediate_buffer: BytesMut::new(),
35+
}
36+
}
37+
}
38+
39+
impl<D1, D2> Decoder for CompositionDecoder<D1, D2>
40+
where
41+
D1: Decoder<Item = BytesMut>,
42+
D2: Decoder,
43+
{
44+
type Item = D2::Item;
45+
46+
type Error = CompositionDecoderError<D1, D2>;
47+
48+
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
49+
// First, check if we can decode using our buffer
50+
if !self.intermediate_buffer.is_empty() {
51+
if let Some(result) = self
52+
.second
53+
.decode(&mut self.intermediate_buffer)
54+
.map_err(CompositionDecoderError::D2)?
55+
{
56+
return Ok(Some(result));
57+
}
58+
}
59+
60+
// Then, try to load more data into the second decoder's buffer using the first decoder
61+
if let Some(intermediate) = self
62+
.first
63+
.decode(src)
64+
.map_err(CompositionDecoderError::D1)?
65+
{
66+
// Add the intermediate result to our buffer
67+
self.intermediate_buffer.extend_from_slice(&intermediate);
68+
69+
// Now retry to decode using the second decoder
70+
return self
71+
.second
72+
.decode(&mut self.intermediate_buffer)
73+
.map_err(CompositionDecoderError::D2);
74+
}
75+
76+
Ok(None)
77+
}
78+
}
79+
80+
#[derive(Debug)]
81+
pub enum CompositionDecoderError<D1, D2>
82+
where
83+
D1: Decoder,
84+
D2: Decoder,
85+
{
86+
Io(std::io::Error),
87+
D1(D1::Error),
88+
D2(D2::Error),
89+
}
90+
91+
impl<D1, D2> From<std::io::Error> for CompositionDecoderError<D1, D2>
92+
where
93+
D1: Decoder,
94+
D2: Decoder,
95+
{
96+
fn from(err: std::io::Error) -> Self {
97+
CompositionDecoderError::Io(err)
98+
}
99+
}
100+
101+
/// Extension trait for [`Decoder`] that allows composing a decoder after `self`.
102+
pub trait DecoderCompositionExt: Decoder<Item = BytesMut> {
103+
/// Compose a decoder after `self`.
104+
fn compose_decoder<D>(self, other: D) -> CompositionDecoder<Self, D>
105+
where
106+
Self: Sized,
107+
D: Decoder,
108+
{
109+
CompositionDecoder::new(self, other)
110+
}
111+
}
112+
113+
impl<D> DecoderCompositionExt for D
114+
where
115+
D: Decoder<Item = BytesMut>,
116+
{
117+
fn compose_decoder<D2>(self, other: D2) -> CompositionDecoder<Self, D2>
118+
where
119+
Self: Sized,
120+
D2: Decoder,
121+
{
122+
CompositionDecoder::new(self, other)
123+
}
124+
}
125+
126+
/// An [`Encoder`] that composes two encoders into a single encoder.
127+
///
128+
/// The second encoder must take [`BytesMut`], to be connected to the first one.
129+
///
130+
/// [`Encoder`]: crate::codec::Encoder
131+
#[derive(Debug)]
132+
pub struct CompositionEncoder<E1, E2> {
133+
first: E1,
134+
second: E2,
135+
}
136+
137+
impl<E1, E2> CompositionEncoder<E1, E2> {
138+
/// Creates a new [`CompositionEncoder`] that will use `first` to encode data
139+
/// and then pass the result to `second`.
140+
pub fn new(first: E1, second: E2) -> Self {
141+
Self { first, second }
142+
}
143+
}
144+
145+
impl<E1, E2, Item> Encoder<Item> for CompositionEncoder<E1, E2>
146+
where
147+
E1: Encoder<Item>,
148+
E2: Encoder<BytesMut>,
149+
{
150+
type Error = CompositionEncoderError<E1, E2, Item>;
151+
152+
fn encode(&mut self, item: Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
153+
let mut intermediate = BytesMut::new();
154+
155+
// First, try to encode using our first encoder
156+
self.first
157+
.encode(item, &mut intermediate)
158+
.map_err(CompositionEncoderError::D1)?;
159+
160+
// Then, try to encode using our second encoder
161+
self.second
162+
.encode(intermediate, dst)
163+
.map_err(CompositionEncoderError::D2)?;
164+
165+
Ok(())
166+
}
167+
}
168+
169+
#[derive(Debug)]
170+
pub enum CompositionEncoderError<D1, D2, Item>
171+
where
172+
D1: Encoder<Item>,
173+
D2: Encoder<BytesMut>,
174+
{
175+
Io(std::io::Error),
176+
D1(D1::Error),
177+
D2(D2::Error),
178+
}
179+
180+
impl<D1, D2, Item> From<std::io::Error> for CompositionEncoderError<D1, D2, Item>
181+
where
182+
D1: Encoder<Item>,
183+
D2: Encoder<BytesMut>,
184+
{
185+
fn from(err: std::io::Error) -> Self {
186+
CompositionEncoderError::Io(err)
187+
}
188+
}
189+
190+
/// Extension trait for [`Encoder`] that allows composing a encoder after `self`.
191+
pub trait EncoderCompositionExt<E>: Encoder<E> {
192+
/// Compose a encoder after `self`.
193+
fn compose_encoder<D>(self, other: D) -> CompositionEncoder<Self, D>
194+
where
195+
Self: Sized,
196+
D: Encoder<BytesMut>,
197+
{
198+
CompositionEncoder::new(self, other)
199+
}
200+
}
201+
202+
impl<D, E> EncoderCompositionExt<E> for D
203+
where
204+
D: Encoder<E>,
205+
{
206+
fn compose_encoder<D2>(self, other: D2) -> CompositionEncoder<Self, D2>
207+
where
208+
Self: Sized,
209+
D2: Encoder<BytesMut>,
210+
{
211+
CompositionEncoder::new(self, other)
212+
}
213+
}

tokio-util/src/codec/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,3 +354,8 @@ pub use self::lines_codec::{LinesCodec, LinesCodecError};
354354

355355
mod any_delimiter_codec;
356356
pub use self::any_delimiter_codec::{AnyDelimiterCodec, AnyDelimiterCodecError};
357+
358+
mod composition_codec;
359+
pub use self::composition_codec::{
360+
CompositionDecoder, CompositionEncoder, DecoderCompositionExt, EncoderCompositionExt,
361+
};
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use bytes::{Buf, BufMut, BytesMut};
2+
use tokio_util::codec::{Decoder, DecoderCompositionExt};
3+
4+
#[derive(Debug)]
5+
struct StringDecoder {}
6+
7+
impl Decoder for StringDecoder {
8+
type Item = String;
9+
10+
type Error = std::io::Error;
11+
12+
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
13+
if src.len() < size_of::<u32>() {
14+
return Ok(None);
15+
}
16+
17+
let length: usize = src.get_u32().try_into().expect("length is too large");
18+
if src.len() < length {
19+
return Ok(None);
20+
}
21+
22+
let string = src.split_to(length);
23+
Ok(Some(
24+
String::from_utf8(string.to_vec()).expect("string is not valid utf8"),
25+
))
26+
}
27+
}
28+
29+
#[derive(Debug)]
30+
struct BitwiseNotDecoder {}
31+
32+
impl Decoder for BitwiseNotDecoder {
33+
type Item = BytesMut;
34+
35+
type Error = std::io::Error;
36+
37+
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
38+
let mut buffer = src.split();
39+
40+
bitwise_not_in_place(&mut buffer);
41+
42+
Ok(Some(buffer))
43+
}
44+
}
45+
46+
fn bitwise_not_in_place(buffer: &mut BytesMut) {
47+
for byte in buffer.iter_mut() {
48+
*byte = !*byte;
49+
}
50+
}
51+
52+
#[test]
53+
fn test_simple_chain() {
54+
let mut decoder = BitwiseNotDecoder {}.compose_decoder(StringDecoder {});
55+
56+
let string = String::from("Hello, world!");
57+
let mut buffer = BytesMut::new();
58+
buffer.put_u32(string.len() as u32);
59+
buffer.put(string.as_bytes());
60+
61+
bitwise_not_in_place(&mut buffer);
62+
63+
let result = decoder.decode(&mut buffer).unwrap().unwrap();
64+
assert_eq!(result, string);
65+
}
66+
67+
#[test]
68+
fn test_not_not_not() {
69+
let mut decoder = BitwiseNotDecoder {}
70+
.compose_decoder(BitwiseNotDecoder {})
71+
.compose_decoder(BitwiseNotDecoder {})
72+
.compose_decoder(StringDecoder {});
73+
74+
let string = String::from("Hello, world!");
75+
let mut buffer = BytesMut::new();
76+
buffer.put_u32(string.len() as u32);
77+
buffer.put(string.as_bytes());
78+
79+
bitwise_not_in_place(&mut buffer);
80+
81+
let result = decoder.decode(&mut buffer).unwrap().unwrap();
82+
assert_eq!(result, string);
83+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
use bytes::{BufMut, BytesMut};
2+
use tokio_util::codec::{Encoder, EncoderCompositionExt};
3+
4+
#[derive(Debug)]
5+
struct StringEncoder {}
6+
7+
impl Encoder<String> for StringEncoder {
8+
type Error = std::io::Error;
9+
10+
fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
11+
let length: u32 = item.len().try_into().expect("length is too large");
12+
dst.put_u32(length);
13+
dst.put(item.as_bytes());
14+
Ok(())
15+
}
16+
}
17+
18+
#[derive(Debug)]
19+
struct BitwiseNotEncoder {}
20+
21+
impl Encoder<BytesMut> for BitwiseNotEncoder {
22+
type Error = std::io::Error;
23+
24+
fn encode(&mut self, mut item: BytesMut, dst: &mut BytesMut) -> Result<(), Self::Error> {
25+
bitwise_not_in_place(&mut item);
26+
dst.extend_from_slice(&item);
27+
Ok(())
28+
}
29+
}
30+
31+
fn bitwise_not_in_place(buffer: &mut BytesMut) {
32+
for byte in buffer.iter_mut() {
33+
*byte = !*byte;
34+
}
35+
}
36+
37+
#[test]
38+
fn test_simple_chain() {
39+
let mut encoder = StringEncoder {}.compose_encoder(BitwiseNotEncoder {});
40+
41+
let string = String::from("Hello, world!");
42+
let mut buffer = BytesMut::new();
43+
buffer.put_u32(string.len() as u32);
44+
buffer.put(string.as_bytes());
45+
46+
bitwise_not_in_place(&mut buffer);
47+
let mut dst = BytesMut::new();
48+
49+
encoder.encode(string, &mut dst).unwrap();
50+
assert_eq!(dst, buffer);
51+
}
52+
53+
#[test]
54+
fn test_not_not() {
55+
let mut encoder = StringEncoder {}
56+
.compose_encoder(BitwiseNotEncoder {})
57+
.compose_encoder(BitwiseNotEncoder {})
58+
.compose_encoder(BitwiseNotEncoder {});
59+
60+
let string = String::from("Hello, world!");
61+
let mut buffer = BytesMut::new();
62+
buffer.put_u32(string.len() as u32);
63+
buffer.put(string.as_bytes());
64+
65+
bitwise_not_in_place(&mut buffer);
66+
let mut dst = BytesMut::new();
67+
68+
encoder.encode(string, &mut dst).unwrap();
69+
assert_eq!(dst, buffer);
70+
}

0 commit comments

Comments
 (0)