You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm looking for an alternative to Redis that performs decently on large objects. So far I've found that a local SSD is almost always faster, but maybe I'm doing something wrong in my testing.
I'm testing on an M4 MacBook Pro using docker run -p 6379:6379 --ulimit memlock=-1 docker.dragonflydb.io/dragonflydb/dragonfly and this test script. I'm using the ObjectStore trait for ~ reasons that are not relevant unless someone things that's the cause for slowdown, but really it's just a nice wrapper to get the same API for multiple KV stores.
I don't care about upload speed as much as I do download speed.
use std::io::Read;use std::sync::Arc;use std::time::Duration;use std::time::Instant;use futures::StreamExt;use object_store::local::LocalFileSystem;use object_store::GetResultPayload;use object_store::{path::Path,ObjectStore,PutPayload};use uuid::Uuid;use std::fmt;use async_trait::async_trait;use bytes::Bytes;use deadpool_redis::{Config,Connection,Pool,Runtime,};use futures::stream::BoxStream;use object_store::{Attributes,ErrorasObjectStoreE,GetOptions,GetRange,GetResult,ListResult,MultipartUpload,ObjectMeta,PutMode,PutMultipartOpts,PutOptions,PutResult,Result,};use redis::{AsyncCommands,RedisError,SetOptions};/// An object store implementation that reads and writes keys to Redis#[derive(Clone)]pubstructRedisObjectStore{namespace:String,pool:Pool,ttl_millis:u64,}impl fmt::DebugforRedisObjectStore{fnfmt(&self,f:&mut fmt::Formatter<'_>) -> fmt::Result{
f.debug_struct("RedisObjectStore").field("namespace",&self.namespace).field("pool",&"...").field("ttl_millis",&self.ttl_millis).finish()}}impl fmt::DisplayforRedisObjectStore{fnfmt(&self,f:&mut fmt::Formatter<'_>) -> fmt::Result{
f.debug_struct("RedisObjectStore").finish()}}implRedisObjectStore{pubfnfrom_dsn(mutdsn:&str,ttl:Duration) -> anyhow::Result<Self>{// dsn is of the form "redis://cluster-host-1:6379,cluster-host-2:6379/namespace"let protocol = "redis://";if dsn.starts_with(protocol){// remove the protocol
dsn = &dsn[protocol.len()..];}let parts:Vec<&str> = dsn.split('/').collect();if parts.len() != 2{
anyhow::bail!("invalid redis dsn: {}", dsn);}let namespace = parts[1].to_string();let dsn = parts[0].to_string();let timeout = Duration::from_secs(5);let config = Config::from_url(format!("redis://{dsn}"));let pool = config
.builder()?
.runtime(Runtime::Tokio1).create_timeout(Some(timeout)).wait_timeout(Some(timeout)).build()?;Ok(Self{
namespace,
pool,ttl_millis: u64::try_from(ttl.as_millis()).expect("ttl should fit in a usize"),})}fnform_key(&self,location:&Path) -> String{format!("{}:{}",self.namespace, location)}asyncfnget_conn(&self) -> Result<Connection>{self.pool.get().await.map_err(|e| ObjectStoreE::Generic{store:"redis",source:Box::new(e),})}}fnmap_redis_error(e:RedisError) -> ObjectStoreE{ObjectStoreE::Generic{store:"redis",source:Box::new(e),}}#[async_trait]implObjectStoreforRedisObjectStore{asyncfnput(&self,location:&Path,payload:PutPayload) -> Result<PutResult>{let key = self.form_key(location);let set_options = SetOptions::default().with_expiration(redis::SetExpiry::PX(self.ttl_millis));let chunks = payload.into_iter().collect::<Vec<_>>();let chunk = chunks.into_iter().map(|b| b.to_vec()).flatten().collect::<Vec<_>>();let _:() = self.get_conn().await?
.set_options(key,&chunk, set_options).await.map_err(map_redis_error)?;Ok(PutResult{e_tag:None,version:None,})}asyncfnput_opts(&self,location:&Path,payload:PutPayload,opts:PutOptions) -> Result<PutResult>{match opts.mode{PutMode::Overwrite => {}
other => {returnErr(ObjectStoreE::NotSupported{source:Box::from(format!("RedisObjectStore only supports PutMode::Overwrite, got {other:?}")),})}};assert!(opts.tags.encoded().is_empty());assert!(opts.attributes.is_empty());self.put(location, payload).await}asyncfnput_multipart(&self,_location:&Path) -> Result<Box<dynMultipartUpload>>{unimplemented!()}asyncfnput_multipart_opts(&self,_location:&Path,_opts:PutMultipartOpts) -> Result<Box<dynMultipartUpload>>{unimplemented!()}asyncfnget(&self,location:&Path) -> Result<GetResult>{self.get_opts(location,GetOptions::default()).await}#[allow(clippy::too_many_lines)]asyncfnget_opts(&self,location:&Path,options:GetOptions) -> Result<GetResult>{let key = self.form_key(location);letmut conn = self.get_conn().await?;let(data, range):(Bytes, std::ops::Range<usize>) = ifletSome(range) = options.range{match range {// note that GETRANGE returns an empty slice even if the key doesn't exist// so we also need to check if the key exists explicitlyGetRange::Bounded(range) => {let(data, exists):(Bytes,bool) = redis::pipe().atomic().cmd("GETRANGE").arg(key.clone()).arg(range.start).arg(range.end - 1).cmd("EXISTS").arg(key.clone())// extend ttl.cmd("PEXPIRE").arg(key.clone()).arg(self.ttl_millis).ignore().query_async(&mut conn).await.map_err(|e| ObjectStoreE::Generic{store:"redis",source:Box::new(e),})?;if !exists {returnErr(ObjectStoreE::NotFound{path: location.to_string(),source:Box::from("object not found"),});}(data, range)}GetRange::Offset(start) => {let(data, length, exists):(Option<Bytes>,usize,bool) = redis::pipe().atomic().cmd("GETRANGE").arg(key.clone()).arg(start).arg(-1).cmd("STRLEN").arg(key.clone()).cmd("EXISTS").arg(key.clone())// extend ttl.cmd("PEXPIRE").arg(key.clone()).arg(self.ttl_millis).ignore().query_async(&mut conn).await.map_err(|e| ObjectStoreE::Generic{store:"redis",source:Box::new(e),})?;if !exists {returnErr(ObjectStoreE::NotFound{path: location.to_string(),source:Box::from("object not found"),});}let range = start..length;letSome(data) = data else{returnErr(ObjectStoreE::NotFound{path: location.to_string(),source:Box::from("object not found"),});};(data, range)}GetRange::Suffix(suffix) => {let(data, length, exists):(Bytes,usize,bool) = redis::pipe().atomic().cmd("GETRANGE").arg(key.clone()).arg(-i64::try_from(suffix).expect("suffix should fit in i64")).arg(-1).cmd("STRLEN").arg(key.clone()).cmd("EXISTS").arg(key.clone())// extend ttl.cmd("PEXPIRE").arg(key.clone()).arg(self.ttl_millis).ignore().query_async(&mut conn).await.map_err(|e| ObjectStoreE::Generic{store:"redis",source:Box::new(e),})?;if !exists {returnErr(ObjectStoreE::NotFound{path: location.to_string(),source:Box::from("object not found"),});}let range = (length - suffix)..length;(data, range)}}}else{let(data,):(Option<Bytes>,) = redis::pipe().atomic().cmd("GET").arg(key.clone()).cmd("PEXPIRE").arg(key.clone()).arg(self.ttl_millis).ignore().query_async(&mut conn).await.map_err(|e| ObjectStoreE::Generic{store:"redis",source:Box::new(e),})?;letSome(data) = data else{returnErr(ObjectStoreE::NotFound{path: location.to_string(),source:Box::from("object not found"),});};let length = data.len();let range = 0..length;(data, range)};let size = data.len();let async_stream = Box::pin(futures::stream::once(asyncmove{Ok(data)}))as_;let payload = GetResultPayload::Stream(async_stream);let meta = ObjectMeta{location: location.clone(),// we *think* datafusion doesn't use this last_modified time anywhere important, but it's something to keep an eye out for// the alternative would be to store the last modified time in a separate key in redis but that seems like overkilllast_modified: chrono::offset::Utc::now(),
size,e_tag:None,version:None,};let attributes = Attributes::new();Ok(GetResult{
payload,
meta,
range,
attributes,})}asyncfnhead(&self,location:&Path) -> Result<ObjectMeta>{let key = self.form_key(location);letmut conn = self.get_conn().await?;let(size, exists):(usize,bool) = redis::pipe().atomic().cmd("STRLEN").arg(key.clone()).cmd("EXISTS").arg(key.clone())// extend ttl.cmd("PEXPIRE").arg(key.clone()).arg(self.ttl_millis).ignore().query_async(&mut conn).await.map_err(|e| ObjectStoreE::Generic{store:"redis",source:Box::new(e),})?;if !exists {returnErr(ObjectStoreE::NotFound{path: location.to_string(),source:Box::from("object not found"),});}Ok(ObjectMeta{location: location.clone(),last_modified: chrono::offset::Utc::now(),
size,e_tag:None,version:None,})}asyncfndelete(&self,location:&Path) -> Result<()>{let key = self.form_key(location);letmut conn = self.get_conn().await?;let existed:bool = conn.del(key).await.map_err(map_redis_error)?;if !existed {returnErr(ObjectStoreE::NotFound{path: location.to_string(),source:Box::from("object not found"),});}Ok(())}fndelete_stream<'a>(&'aself,_locations:BoxStream<'a,Result<Path>>) -> BoxStream<'a,Result<Path>>{unimplemented!()}fnlist(&self,_prefix:Option<&Path>) -> BoxStream<'_,Result<ObjectMeta>>{unimplemented!()}fnlist_with_offset(&self,_prefix:Option<&Path>,_offset:&Path) -> BoxStream<'_,Result<ObjectMeta>>{unimplemented!()}asyncfnlist_with_delimiter(&self,_prefix:Option<&Path>) -> Result<ListResult>{unimplemented!()}asyncfncopy(&self,_from:&Path,_to:&Path) -> Result<()>{unimplemented!()}asyncfnrename(&self,_from:&Path,_to:&Path) -> Result<()>{unimplemented!()}asyncfncopy_if_not_exists(&self,_from:&Path,_to:&Path) -> Result<()>{unimplemented!()}asyncfnrename_if_not_exists(&self,_from:&Path,_to:&Path) -> Result<()>{unimplemented!()}}asyncfnconsume_paylod(payload:GetResultPayload) -> anyhow::Result<()>{match payload {GetResultPayload::Stream(mut stream) => {whileletSome(chunk) = stream.next().await{let _ = chunk?;}Ok(())}GetResultPayload::File(mut file, _) => {letmut data = vec![];
file.read_to_end(&mut data).unwrap();Ok(())}}}#[derive(Debug,Clone,Copy)]enumPayloadSize{Kilobyte,TenKilobyte,HundredKilobyte,Megabyte,TenMegabyte,HundredMegabyte,}implPayloadSize{fnnum_bytes(&self) -> usize{matchself{PayloadSize::Kilobyte => 1024,PayloadSize::TenKilobyte => 1024*10,PayloadSize::HundredKilobyte => 1024*100,PayloadSize::Megabyte => 1024*1024,PayloadSize::TenMegabyte => 1024*1024*10,PayloadSize::HundredMegabyte => 1024*1024*100,}}}impl std::fmt::DisplayforPayloadSize{fnfmt(&self,f:&mut std::fmt::Formatter<'_>) -> std::fmt::Result{matchself{PayloadSize::Kilobyte => write!(f,"1KB"),PayloadSize::TenKilobyte => write!(f,"10KB"),PayloadSize::HundredKilobyte => write!(f,"100KB"),PayloadSize::Megabyte => write!(f,"1MB"),PayloadSize::TenMegabyte => write!(f,"10MB"),PayloadSize::HundredMegabyte => write!(f,"100MB"),}}}asyncfnbench_store(name:&str,store:Arc<dynObjectStore>,size:PayloadSize) -> anyhow::Result<()>{let num_bytes = size.num_bytes();let payload = vec![0u8; num_bytes];let path = Path::from(Uuid::new_v4().to_string());let start = Instant::now();
store.put(&path,PutPayload::from_iter(payload.iter().copied())).await?;println!("{name} uploaded {size} object in {:?}", start.elapsed());let start = Instant::now();let res = store.get(&path).await?;consume_paylod(res.payload).await?;println!("{name} downloaded {size} object in {:?}", start.elapsed());Ok(())}#[tokio::main]asyncfnmain(){
std::fs::create_dir_all("/tmp/test").unwrap();let local_store = Arc::new(LocalFileSystem::new_with_prefix("/tmp/test").unwrap());let redis_object_store = Arc::new(RedisObjectStore::from_dsn(&"redis://localhost:6379/test",Duration::from_secs(60)).unwrap());for size invec![PayloadSize::Kilobyte,PayloadSize::TenKilobyte,PayloadSize::HundredKilobyte,PayloadSize::Megabyte,PayloadSize::TenMegabyte,PayloadSize::HundredMegabyte,]{bench_store("SSD", local_store.clone(), size).await.unwrap();bench_store("Redis", redis_object_store.clone(), size).await.unwrap();}}
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
I'm looking for an alternative to Redis that performs decently on large objects. So far I've found that a local SSD is almost always faster, but maybe I'm doing something wrong in my testing.
I'm testing on an M4 MacBook Pro using
docker run -p 6379:6379 --ulimit memlock=-1 docker.dragonflydb.io/dragonflydb/dragonfly
and this test script. I'm using the ObjectStore trait for ~ reasons that are not relevant unless someone things that's the cause for slowdown, but really it's just a nice wrapper to get the same API for multiple KV stores.I don't care about upload speed as much as I do download speed.
Beta Was this translation helpful? Give feedback.
All reactions