|
1 | 1 | use super::{PathInfo, PathInfoService};
|
2 | 2 | use crate::proto;
|
| 3 | +use async_stream::try_stream; |
3 | 4 | use data_encoding::BASE64;
|
4 |
| -use futures::{StreamExt, stream::BoxStream}; |
| 5 | +use futures::stream::BoxStream; |
5 | 6 | use prost::Message;
|
6 | 7 | use redb::{Database, ReadableTable, TableDefinition};
|
7 | 8 | use snix_castore::{
|
8 | 9 | Error,
|
9 | 10 | composition::{CompositionContext, ServiceBuilder},
|
10 | 11 | };
|
11 | 12 | use std::{path::PathBuf, sync::Arc};
|
12 |
| -use tokio_stream::wrappers::ReceiverStream; |
13 | 13 | use tonic::async_trait;
|
14 | 14 | use tracing::{instrument, warn};
|
15 | 15 |
|
@@ -130,37 +130,26 @@ impl PathInfoService for RedbPathInfoService {
|
130 | 130 |
|
131 | 131 | fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
|
132 | 132 | let db = self.db.clone();
|
133 |
| - let (tx, rx) = tokio::sync::mpsc::channel(50); |
134 |
| - |
135 |
| - // Spawn a blocking task which writes all PathInfos to tx. |
136 |
| - tokio::task::spawn_blocking({ |
137 |
| - move || -> Result<(), Error> { |
138 |
| - let read_txn = db.begin_read()?; |
139 |
| - let table = read_txn.open_table(PATHINFO_TABLE)?; |
140 |
| - |
141 |
| - for elem in table.iter()? { |
142 |
| - let elem = elem?; |
143 |
| - tokio::runtime::Handle::current() |
144 |
| - .block_on(tx.send(Ok({ |
145 |
| - let path_info_proto = proto::PathInfo::decode( |
146 |
| - elem.1.value().as_slice(), |
147 |
| - ) |
148 |
| - .map_err(|e| { |
149 |
| - warn!(err=%e, "invalid PathInfo"); |
150 |
| - Error::StorageError("invalid PathInfo".to_string()) |
151 |
| - })?; |
152 |
| - PathInfo::try_from(path_info_proto).map_err(|e| { |
153 |
| - Error::StorageError(format!("Invalid path info: {e}")) |
154 |
| - })? |
155 |
| - }))) |
156 |
| - .map_err(|e| Error::StorageError(e.to_string()))?; |
| 133 | + Box::pin(try_stream! { |
| 134 | + let read_txn = db.begin_read()?; |
| 135 | + let table = read_txn.open_table(PATHINFO_TABLE)?; |
| 136 | + |
| 137 | + for elem in table.iter()? { |
| 138 | + let elem = elem?; |
| 139 | + yield { |
| 140 | + let path_info_proto = proto::PathInfo::decode( |
| 141 | + elem.1.value().as_slice(), |
| 142 | + ) |
| 143 | + .map_err(|e| { |
| 144 | + warn!(err=%e, "invalid PathInfo"); |
| 145 | + Error::StorageError("invalid PathInfo".to_string()) |
| 146 | + })?; |
| 147 | + PathInfo::try_from(path_info_proto).map_err(|e| { |
| 148 | + Error::StorageError(format!("Invalid path info: {e}")) |
| 149 | + })? |
157 | 150 | }
|
158 |
| - |
159 |
| - Ok(()) |
160 | 151 | }
|
161 |
| - }); |
162 |
| - |
163 |
| - ReceiverStream::from(rx).boxed() |
| 152 | + }) |
164 | 153 | }
|
165 | 154 | }
|
166 | 155 |
|
|
0 commit comments