Skip to content

Commit ed08b5f

Browse files
committed
Made OutgoingValue::finish() more errorful. Removed memory provider because FS is better for testing anyway.
Signed-off-by: itowlson <ivan.towlson@fermyon.com>
1 parent 0d89199 commit ed08b5f

File tree

9 files changed

+109
-342
lines changed

9 files changed

+109
-342
lines changed

Cargo.lock

Lines changed: 0 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/blobstore-azure/src/store.rs

Lines changed: 46 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -123,49 +123,12 @@ impl Container for AzureContainer {
123123
Ok(Box::new(AzureIncomingData::new(client, range)))
124124
}
125125

126-
async fn connect_stm(&self, name: &str, mut stm: tokio::io::ReadHalf<tokio::io::SimplexStream>, finished_tx: tokio::sync::mpsc::Sender<()>) -> anyhow::Result<()> {
127-
use tokio::io::AsyncReadExt;
128-
129-
// Azure limits us to 50k blocks per blob. At 2MB/block that allows 100GB, which will be
130-
// enough for most use cases. If users need flexibility for larger blobs, we could make
131-
// the block size configurable via the runtime config ("size hint" or something).
132-
const BLOCK_SIZE: usize = 2 * 1024 * 1024;
133-
126+
async fn connect_stm(&self, name: &str, stm: tokio::io::ReadHalf<tokio::io::SimplexStream>, finished_tx: tokio::sync::mpsc::Sender<anyhow::Result<()>>) -> anyhow::Result<()> {
134127
let client = self.client.blob_client(name);
135128

136129
tokio::spawn(async move {
137-
let mut blocks = vec![];
138-
139-
'put_blocks: loop {
140-
let mut bytes = Vec::with_capacity(BLOCK_SIZE);
141-
loop {
142-
let read = stm.read_buf(&mut bytes).await.unwrap();
143-
let len = bytes.len();
144-
145-
if read == 0 {
146-
// end of stream - send the last block and go
147-
let id_bytes = uuid::Uuid::new_v4().as_bytes().to_vec();
148-
let block_id = azure_storage_blobs::prelude::BlockId::new(id_bytes);
149-
client.put_block(block_id.clone(), bytes).await.unwrap();
150-
blocks.push(azure_storage_blobs::blob::BlobBlockType::Uncommitted(block_id));
151-
break 'put_blocks;
152-
}
153-
if len >= BLOCK_SIZE {
154-
let id_bytes = uuid::Uuid::new_v4().as_bytes().to_vec();
155-
let block_id = azure_storage_blobs::prelude::BlockId::new(id_bytes);
156-
client.put_block(block_id.clone(), bytes).await.unwrap();
157-
blocks.push(azure_storage_blobs::blob::BlobBlockType::Uncommitted(block_id));
158-
break;
159-
}
160-
}
161-
}
162-
163-
let block_list = azure_storage_blobs::blob::BlockList {
164-
blocks
165-
};
166-
client.put_block_list(block_list).await.unwrap();
167-
168-
finished_tx.send(()).await.expect("should sent finish tx");
130+
let result = Self::connect_stm_core(stm, client).await;
131+
finished_tx.send(result).await.expect("should sent finish tx");
169132
});
170133

171134
Ok(())
@@ -177,3 +140,46 @@ impl Container for AzureContainer {
177140
}
178141
}
179142

143+
impl AzureContainer {
144+
async fn connect_stm_core(mut stm: tokio::io::ReadHalf<tokio::io::SimplexStream>, client: azure_storage_blobs::prelude::BlobClient) -> anyhow::Result<()> {
145+
use tokio::io::AsyncReadExt;
146+
147+
// Azure limits us to 50k blocks per blob. At 2MB/block that allows 100GB, which will be
148+
// enough for most use cases. If users need flexibility for larger blobs, we could make
149+
// the block size configurable via the runtime config ("size hint" or something).
150+
const BLOCK_SIZE: usize = 2 * 1024 * 1024;
151+
152+
let mut blocks = vec![];
153+
154+
'put_blocks: loop {
155+
let mut bytes = Vec::with_capacity(BLOCK_SIZE);
156+
loop {
157+
let read = stm.read_buf(&mut bytes).await?;
158+
let len = bytes.len();
159+
160+
if read == 0 {
161+
// end of stream - send the last block and go
162+
let id_bytes = uuid::Uuid::new_v4().as_bytes().to_vec();
163+
let block_id = azure_storage_blobs::prelude::BlockId::new(id_bytes);
164+
client.put_block(block_id.clone(), bytes).await?;
165+
blocks.push(azure_storage_blobs::blob::BlobBlockType::Uncommitted(block_id));
166+
break 'put_blocks;
167+
}
168+
if len >= BLOCK_SIZE {
169+
let id_bytes = uuid::Uuid::new_v4().as_bytes().to_vec();
170+
let block_id = azure_storage_blobs::prelude::BlockId::new(id_bytes);
171+
client.put_block(block_id.clone(), bytes).await?;
172+
blocks.push(azure_storage_blobs::blob::BlobBlockType::Uncommitted(block_id));
173+
break;
174+
}
175+
}
176+
}
177+
178+
let block_list = azure_storage_blobs::blob::BlockList {
179+
blocks
180+
};
181+
client.put_block_list(block_list).await?;
182+
183+
Ok(())
184+
}
185+
}

crates/blobstore-fs/src/lib.rs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -151,30 +151,16 @@ impl spin_factor_blobstore::Container for FileSystemContainer {
151151
Ok(Box::new(BlobContent { file: Some(file), start, end }))
152152
}
153153

154-
async fn connect_stm(&self, name: &str, mut stm: tokio::io::ReadHalf<tokio::io::SimplexStream>, finished_tx: tokio::sync::mpsc::Sender<()>) -> anyhow::Result<()> {
154+
async fn connect_stm(&self, name: &str, stm: tokio::io::ReadHalf<tokio::io::SimplexStream>, finished_tx: tokio::sync::mpsc::Sender<anyhow::Result<()>>) -> anyhow::Result<()> {
155155
let path = self.object_path(name)?;
156156
if let Some(dir) = path.parent() {
157157
tokio::fs::create_dir_all(dir).await?;
158158
}
159-
let mut file = tokio::fs::File::create(&path).await?;
159+
let file = tokio::fs::File::create(&path).await?;
160160

161161
tokio::spawn(async move {
162-
use tokio::io::AsyncWriteExt;
163-
use tokio::io::AsyncReadExt;
164-
165-
const BUF_SIZE: usize = 8192;
166-
167-
loop {
168-
// TODO: I think errors should send `finished_tx`.
169-
// (Should finished_tx take a `Result<()` for error reporting?)
170-
let mut buf = vec![0; BUF_SIZE];
171-
let count = stm.read(&mut buf).await.unwrap();
172-
if count == 0 {
173-
finished_tx.send(()).await.unwrap();
174-
break;
175-
}
176-
file.write_all(&buf[0..count]).await.unwrap();
177-
}
162+
let result = Self::connect_stm_core(stm, file).await;
163+
finished_tx.send(result).await.expect("shoulda sent finished_tx");
178164
});
179165

180166
Ok(())
@@ -188,6 +174,26 @@ impl spin_factor_blobstore::Container for FileSystemContainer {
188174
}
189175
}
190176

177+
impl FileSystemContainer {
178+
async fn connect_stm_core(mut stm: tokio::io::ReadHalf<tokio::io::SimplexStream>, mut file: tokio::fs::File) -> anyhow::Result<()> {
179+
use tokio::io::AsyncWriteExt;
180+
use tokio::io::AsyncReadExt;
181+
182+
const BUF_SIZE: usize = 8192;
183+
184+
loop {
185+
let mut buf = vec![0; BUF_SIZE];
186+
let count = stm.read(&mut buf).await?;
187+
if count == 0 {
188+
break;
189+
}
190+
file.write_all(&buf[0..count]).await?;
191+
}
192+
193+
Ok(())
194+
}
195+
}
196+
191197
struct BlobContent {
192198
file: Option<tokio::fs::File>,
193199
start: u64,

crates/blobstore-memory/Cargo.toml

Lines changed: 0 additions & 23 deletions
This file was deleted.

0 commit comments

Comments
 (0)