Skip to content

Commit 08127dd

Browse files
committed
Introduce ArcVec<T> to improve Copy performance of messages
Summary: The ArcVec<T> is just a thin wrapper around Arc<[T]> that then can be cheaply cloned. But at the same time can be seralized and most importantly deserialized when sent over the wire
1 parent 89d0aa5 commit 08127dd

File tree

2 files changed

+126
-1
lines changed

2 files changed

+126
-1
lines changed

crates/types/src/net/replicated_loglet.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::logs::metadata::SegmentIndex;
2020
use crate::logs::{LogId, LogletOffset, Record, SequenceNumber, TailState};
2121
use crate::net::define_rpc;
2222
use crate::replicated_loglet::ReplicatedLogletId;
23+
use crate::storage::ArcVec;
2324

2425
// ----- ReplicatedLoglet Sequencer API -----
2526
define_rpc! {
@@ -69,7 +70,7 @@ impl CommonResponseHeader {
6970
pub struct Append {
7071
#[serde(flatten)]
7172
pub header: CommonRequestHeader,
72-
pub payloads: Vec<Record>,
73+
pub payloads: ArcVec<Record>,
7374
}
7475

7576
impl Append {

crates/types/src/storage.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11+
use core::fmt;
12+
use std::marker::PhantomData;
1113
use std::mem;
14+
use std::ops::Deref;
1215
use std::sync::Arc;
1316

1417
use bytes::{Buf, BufMut, Bytes, BytesMut};
1518
use downcast_rs::{impl_downcast, DowncastSync};
1619
use serde::de::{DeserializeOwned, Error as DeserializationError};
1720
use serde::ser::Error as SerializationError;
21+
use serde::ser::SerializeSeq;
1822
use serde::{Deserialize, Serialize};
1923
use tracing::error;
2024

@@ -395,6 +399,126 @@ pub fn decode_from_flexbuffers<T: DeserializeOwned, B: Buf>(
395399
}
396400
}
397401

402+
/// [`ArcVec`] mainly used by `message` types to improve
403+
/// cloning of messages.
404+
///
405+
/// It can replace [`Vec<T>`] most of the time in all structures
406+
/// that need to be serialized over the wire.
407+
///
408+
/// Internally it keeps the data inside an [`Arc<[T]>`]
409+
#[derive(Debug)]
410+
pub struct ArcVec<T> {
411+
inner: Arc<[T]>,
412+
}
413+
414+
impl<T> Deref for ArcVec<T> {
415+
type Target = [T];
416+
fn deref(&self) -> &Self::Target {
417+
&self.inner
418+
}
419+
}
420+
421+
impl<T> serde::Serialize for ArcVec<T>
422+
where
423+
T: serde::Serialize,
424+
{
425+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
426+
where
427+
S: serde::Serializer,
428+
{
429+
let mut seq = serializer.serialize_seq(Some(self.len()))?;
430+
for elem in self.iter() {
431+
seq.serialize_element(elem)?;
432+
}
433+
434+
seq.end()
435+
}
436+
}
437+
438+
impl<'de, T> serde::Deserialize<'de> for ArcVec<T>
439+
where
440+
T: serde::Deserialize<'de>,
441+
{
442+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
443+
where
444+
D: serde::Deserializer<'de>,
445+
{
446+
deserializer.deserialize_seq(ArcVecVisitor::default())
447+
}
448+
}
449+
450+
struct ArcVecVisitor<T> {
451+
_phantom: PhantomData<T>,
452+
}
453+
454+
impl<T> Default for ArcVecVisitor<T> {
455+
fn default() -> Self {
456+
Self {
457+
_phantom: PhantomData,
458+
}
459+
}
460+
}
461+
462+
impl<'de, T> serde::de::Visitor<'de> for ArcVecVisitor<T>
463+
where
464+
T: serde::Deserialize<'de>,
465+
{
466+
type Value = ArcVec<T>;
467+
468+
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
469+
write!(formatter, "expecting an array")
470+
}
471+
472+
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
473+
where
474+
A: serde::de::SeqAccess<'de>,
475+
{
476+
let mut vec: Vec<T> = Vec::with_capacity(seq.size_hint().unwrap_or_default());
477+
while let Some(value) = seq.next_element()? {
478+
vec.push(value);
479+
}
480+
481+
Ok(vec.into())
482+
}
483+
}
484+
485+
impl<T> Clone for ArcVec<T> {
486+
fn clone(&self) -> Self {
487+
Self {
488+
inner: Arc::clone(&self.inner),
489+
}
490+
}
491+
}
492+
493+
impl<T> From<ArcVec<T>> for Arc<[T]> {
494+
fn from(value: ArcVec<T>) -> Self {
495+
value.inner
496+
}
497+
}
498+
499+
impl<T> From<ArcVec<T>> for Vec<T>
500+
where
501+
T: Clone,
502+
{
503+
fn from(value: ArcVec<T>) -> Self {
504+
Vec::from_iter(value.iter().cloned())
505+
}
506+
}
507+
508+
impl<T> From<Vec<T>> for ArcVec<T> {
509+
fn from(value: Vec<T>) -> Self {
510+
Self {
511+
inner: value.into(),
512+
}
513+
}
514+
}
515+
516+
impl<T> From<Arc<[T]>> for ArcVec<T> {
517+
fn from(value: Arc<[T]>) -> Self {
518+
Self { inner: value }
519+
}
520+
}
521+
398522
#[cfg(test)]
399523
mod tests {
400524
use bytes::Bytes;

0 commit comments

Comments
 (0)