@@ -6,7 +6,7 @@ use candid::{
6
6
CandidType , Decode , Principal ,
7
7
} ;
8
8
use chrono:: DateTime ;
9
- use futures:: { stream:: BoxStream , StreamExt , TryStreamExt } ;
9
+ use futures:: { stream:: BoxStream , StreamExt } ;
10
10
use ic_agent:: Agent ;
11
11
use ic_cose_types:: { BoxError , CanisterCaller } ;
12
12
use ic_oss_types:: { format_error, object_store:: * } ;
@@ -321,7 +321,7 @@ pub trait ObjectStoreSDK: CanisterCaller + Sized {
321
321
. await
322
322
. map_err ( |error| Error :: Generic {
323
323
error : format_error ( error) ,
324
- } )
324
+ } ) ?
325
325
}
326
326
327
327
/// Lists objects under a prefix
@@ -829,17 +829,12 @@ impl ObjectStore for ObjectStoreClient {
829
829
) -> BoxStream < ' static , object_store:: Result < object_store:: ObjectMeta > > {
830
830
let prefix = prefix. cloned ( ) ;
831
831
let client = self . client . clone ( ) ;
832
- futures:: stream:: once ( async move {
833
- let res = client. list ( prefix. as_ref ( ) ) . await ;
834
- let values: Vec < object_store:: Result < object_store:: ObjectMeta , object_store:: Error > > =
835
- match res {
836
- Ok ( res) => res. into_iter ( ) . map ( |v| Ok ( from_object_meta ( v) ) ) . collect ( ) ,
837
- Err ( err) => vec ! [ Err ( from_error( err) ) ] ,
838
- } ;
839
-
840
- Ok :: < _ , object_store:: Error > ( futures:: stream:: iter ( values) )
841
- } )
842
- . try_flatten ( )
832
+ try_stream ! {
833
+ let res = client. list( prefix. as_ref( ) ) . await . map_err( from_error) ?;
834
+ for object in res {
835
+ yield from_object_meta( object) ;
836
+ }
837
+ }
843
838
. boxed ( )
844
839
}
845
840
@@ -852,17 +847,12 @@ impl ObjectStore for ObjectStoreClient {
852
847
let prefix = prefix. cloned ( ) ;
853
848
let offset = offset. clone ( ) ;
854
849
let client = self . client . clone ( ) ;
855
- futures:: stream:: once ( async move {
856
- let res = client. list_with_offset ( prefix. as_ref ( ) , & offset) . await ;
857
- let values: Vec < object_store:: Result < object_store:: ObjectMeta , object_store:: Error > > =
858
- match res {
859
- Ok ( res) => res. into_iter ( ) . map ( |v| Ok ( from_object_meta ( v) ) ) . collect ( ) ,
860
- Err ( err) => vec ! [ Err ( from_error( err) ) ] ,
861
- } ;
862
-
863
- Ok :: < _ , object_store:: Error > ( futures:: stream:: iter ( values) )
864
- } )
865
- . try_flatten ( )
850
+ try_stream ! {
851
+ let res = client. list_with_offset( prefix. as_ref( ) , & offset) . await . map_err( from_error) ?;
852
+ for object in res {
853
+ yield from_object_meta( object) ;
854
+ }
855
+ }
866
856
. boxed ( )
867
857
}
868
858
@@ -879,7 +869,11 @@ impl ObjectStore for ObjectStoreClient {
879
869
880
870
Ok ( object_store:: ListResult {
881
871
objects : res. objects . into_iter ( ) . map ( from_object_meta) . collect ( ) ,
882
- common_prefixes : res. common_prefixes . into_iter ( ) . map ( Path :: from) . collect ( ) ,
872
+ common_prefixes : res
873
+ . common_prefixes
874
+ . into_iter ( )
875
+ . map ( |p| Path :: parse ( p) . unwrap ( ) )
876
+ . collect ( ) ,
883
877
} )
884
878
}
885
879
@@ -995,7 +989,7 @@ fn create_decryption_stream(
995
989
let data = data?;
996
990
buf. extend_from_slice( & data) ;
997
991
998
- while buf. len( ) >= CHUNK_SIZE as usize {
992
+ while buf. len( ) > CHUNK_SIZE as usize {
999
993
let mut chunk = buf. drain( ..CHUNK_SIZE as usize ) . collect:: <Vec <u8 >>( ) ;
1000
994
1001
995
let tag = aes_tags. get( idx) . ok_or_else( || object_store:: Error :: Generic {
@@ -1004,8 +998,8 @@ fn create_decryption_stream(
1004
998
} ) ?;
1005
999
1006
1000
decrypt_chunk( & cipher, nonce_ref, & mut chunk, tag, & location) ?;
1007
- if idx == start_idx {
1008
- chunk = chunk [ start_offset.. ] . to_vec ( ) ;
1001
+ if idx == start_idx && start_offset > 0 {
1002
+ chunk. drain ( ..start_offset ) ;
1009
1003
}
1010
1004
1011
1005
remaining = remaining. saturating_sub( chunk. len( ) ) ;
@@ -1021,9 +1015,10 @@ fn create_decryption_stream(
1021
1015
source: format!( "missing AES256 tag for chunk {idx} for path {location}" ) . into( ) ,
1022
1016
} ) ?;
1023
1017
decrypt_chunk( & cipher, nonce_ref, & mut buf, tag, & location) ?;
1024
- if idx == start_idx {
1025
- buf = buf [ start_offset.. ] . to_vec ( ) ;
1018
+ if idx == start_idx && start_offset > 0 {
1019
+ buf. drain ( ..start_offset ) ;
1026
1020
}
1021
+
1027
1022
buf. truncate( remaining) ;
1028
1023
yield bytes:: Bytes :: from( buf) ;
1029
1024
}
@@ -1091,7 +1086,7 @@ pub fn from_error(err: Error) -> object_store::Error {
1091
1086
/// Converted object_store::ObjectMeta with equivalent fields
1092
1087
pub fn from_object_meta ( val : ObjectMeta ) -> object_store:: ObjectMeta {
1093
1088
object_store:: ObjectMeta {
1094
- location : val. location . into ( ) ,
1089
+ location : Path :: parse ( val. location ) . unwrap ( ) ,
1095
1090
last_modified : DateTime :: from_timestamp_millis ( val. last_modified as i64 )
1096
1091
. expect ( "invalid timestamp" ) ,
1097
1092
size : val. size ,
@@ -1227,6 +1222,7 @@ mod tests {
1227
1222
use ed25519_consensus:: SigningKey ;
1228
1223
use ic_agent:: { identity:: BasicIdentity , Identity } ;
1229
1224
use ic_cose_types:: cose:: sha3_256;
1225
+ use object_store:: integration:: * ;
1230
1226
1231
1227
#[ tokio:: test( flavor = "current_thread" ) ]
1232
1228
#[ ignore]
@@ -1253,23 +1249,13 @@ mod tests {
1253
1249
println ! ( "put result: {:?}" , res) ;
1254
1250
1255
1251
let res = oc. get_opts ( & path, Default :: default ( ) ) . await . unwrap ( ) ;
1256
- println ! ( "get result: {:?}" , res) ;
1257
1252
assert_eq ! ( res. meta. size as usize , payload. len( ) ) ;
1258
- let res = match res. payload {
1259
- object_store:: GetResultPayload :: Stream ( mut stream) => {
1260
- let mut buf = Vec :: new ( ) ;
1261
- while let Some ( data) = stream. next ( ) . await {
1262
- buf. extend_from_slice ( & data. unwrap ( ) ) ;
1263
- }
1264
- buf
1265
- }
1266
- } ;
1267
- assert_eq ! ( res, payload) ;
1253
+ let res = res. bytes ( ) . await . unwrap ( ) ;
1254
+ assert_eq ! ( res. to_vec( ) , payload) ;
1268
1255
1269
1256
let res = cli. get_opts ( & path, Default :: default ( ) ) . await . unwrap ( ) ;
1270
- println ! ( "get result: {:?}" , res) ;
1271
1257
assert_eq ! ( res. meta. size as usize , payload. len( ) ) ;
1272
- assert_eq ! ( & res. payload, & payload) ;
1258
+ assert_ne ! ( & res. payload, & payload) ;
1273
1259
let aes_nonce = res. meta . aes_nonce . unwrap ( ) ;
1274
1260
assert_eq ! ( aes_nonce. len( ) , 12 ) ;
1275
1261
let aes_tags = res. meta . aes_tags . unwrap ( ) ;
@@ -1299,44 +1285,92 @@ mod tests {
1299
1285
}
1300
1286
let res = oc. get_opts ( & path, Default :: default ( ) ) . await . unwrap ( ) ;
1301
1287
assert_eq ! ( res. meta. size as usize , payload. len( ) ) ;
1302
- let res = match res. payload {
1303
- object_store:: GetResultPayload :: Stream ( mut stream) => {
1304
- let mut buf = bytes:: BytesMut :: new ( ) ;
1305
- while let Some ( data) = stream. next ( ) . await {
1306
- buf. extend_from_slice ( & data. unwrap ( ) ) ;
1307
- }
1308
- buf. freeze ( ) // Convert to immutable Bytes
1309
- }
1310
- } ;
1311
- assert_eq ! ( res, payload) ;
1288
+ let res = res. bytes ( ) . await . unwrap ( ) ;
1289
+ assert_eq ! ( res. to_vec( ) , payload) ;
1312
1290
1313
1291
let res = cli. get_opts ( & path, Default :: default ( ) ) . await . unwrap ( ) ;
1314
1292
assert_eq ! ( res. meta. size as usize , payload. len( ) ) ;
1315
- assert_eq ! ( & res. payload, & payload) ;
1293
+ assert_ne ! ( & res. payload, & payload) ;
1316
1294
let aes_nonce = res. meta . aes_nonce . unwrap ( ) ;
1317
1295
assert_eq ! ( aes_nonce. len( ) , 12 ) ;
1318
1296
let aes_tags = res. meta . aes_tags . unwrap ( ) ;
1319
1297
assert_eq ! ( aes_tags. len( ) , len. div_ceil( CHUNK_SIZE ) as usize ) ;
1320
1298
1321
- let ranges = vec ! [ ( 0u64 , 1000 ) , ( 100 , 100000 ) , ( len - CHUNK_SIZE - 1 , len) ] ;
1299
+ let ranges = vec ! [ 0u64 .. 1000 , 100 .. 100000 , len - CHUNK_SIZE - 1 .. len] ;
1322
1300
1323
- let rt = cli . get_ranges ( & path, & ranges) . await . unwrap ( ) ;
1301
+ let rt = oc . get_ranges ( & path, & ranges) . await . unwrap ( ) ;
1324
1302
assert_eq ! ( rt. len( ) , ranges. len( ) ) ;
1325
- for ( i, ( start, end) ) in ranges. into_iter ( ) . enumerate ( ) {
1326
- let res = cli
1303
+
1304
+ for ( i, Range { start, end } ) in ranges. into_iter ( ) . enumerate ( ) {
1305
+ let res = oc
1327
1306
. get_opts (
1328
1307
& path,
1329
- GetOptions {
1330
- range : Some ( GetRange :: Bounded ( start, end) ) ,
1308
+ object_store :: GetOptions {
1309
+ range : Some ( object_store :: GetRange :: Bounded ( start.. end) ) ,
1331
1310
..Default :: default ( )
1332
1311
} ,
1333
1312
)
1334
1313
. await
1335
1314
. unwrap ( ) ;
1336
- assert_eq ! ( rt[ i] , & res. payload) ;
1337
- assert_eq ! ( & res. payload, & payload[ start as usize ..end as usize ] ) ;
1338
- assert_eq ! ( res. meta. location, path. as_ref( ) ) ;
1315
+ assert_eq ! ( res. meta. location, path) ;
1339
1316
assert_eq ! ( res. meta. size as usize , payload. len( ) ) ;
1317
+ let data = res. bytes ( ) . await . unwrap ( ) ;
1318
+ assert_eq ! ( rt[ i] . len( ) , data. len( ) ) ;
1319
+ assert_eq ! ( & data, & payload[ start as usize ..end as usize ] ) ;
1320
+ }
1321
+ }
1322
+
1323
+ const NON_EXISTENT_NAME : & str = "nonexistentname" ;
1324
+
1325
+ #[ tokio:: test]
1326
+ #[ ignore]
1327
+ async fn integration_test ( ) {
1328
+ // Should be run in a clean environment
1329
+ // dfx canister call ic_object_store_canister admin_clear '()'
1330
+ let secret = [ 8u8 ; 32 ] ;
1331
+ let canister = Principal :: from_text ( "6at64-oyaaa-aaaap-anvza-cai" ) . unwrap ( ) ;
1332
+ let sk = SigningKey :: from ( secret) ;
1333
+ let id = BasicIdentity :: from_signing_key ( sk) ;
1334
+ println ! ( "id: {:?}" , id. sender( ) . unwrap( ) . to_text( ) ) ;
1335
+ // jjn6g-sh75l-r3cxb-wxrkl-frqld-6p6qq-d4ato-wske5-op7s5-n566f-bqe
1336
+
1337
+ let agent = build_agent ( "http://localhost:4943" , Arc :: new ( id) )
1338
+ . await
1339
+ . unwrap ( ) ;
1340
+ let cli = Arc :: new ( Client :: new ( Arc :: new ( agent) , canister, Some ( secret) ) ) ;
1341
+ let storage = ObjectStoreClient :: new ( cli. clone ( ) ) ;
1342
+
1343
+ let location = Path :: from ( NON_EXISTENT_NAME ) ;
1344
+
1345
+ let err = get_nonexistent_object ( & storage, Some ( location) )
1346
+ . await
1347
+ . unwrap_err ( ) ;
1348
+ if let object_store:: Error :: NotFound { path, .. } = err {
1349
+ assert ! ( path. ends_with( NON_EXISTENT_NAME ) ) ;
1350
+ } else {
1351
+ panic ! ( "unexpected error type: {err:?}" ) ;
1352
+ }
1353
+
1354
+ put_get_delete_list ( & storage) . await ;
1355
+ put_get_attributes ( & storage) . await ;
1356
+ get_opts ( & storage) . await ;
1357
+ put_opts ( & storage, true ) . await ;
1358
+ list_uses_directories_correctly ( & storage) . await ;
1359
+ list_with_delimiter ( & storage) . await ;
1360
+ rename_and_copy ( & storage) . await ;
1361
+ copy_if_not_exists ( & storage) . await ;
1362
+ copy_rename_nonexistent_object ( & storage) . await ;
1363
+ // multipart_race_condition(&storage, true).await; // TODO: fix this test?
1364
+ multipart_out_of_order ( & storage) . await ;
1365
+
1366
+ let objs = storage. list ( None ) . collect :: < Vec < _ > > ( ) . await ;
1367
+ for obj in objs {
1368
+ let obj = obj. unwrap ( ) ;
1369
+ storage
1370
+ . delete ( & obj. location )
1371
+ . await
1372
+ . expect ( "failed to delete object" ) ;
1340
1373
}
1374
+ stream_get ( & storage) . await ;
1341
1375
}
1342
1376
}
0 commit comments