67
67
//!
68
68
#![ no_std]
69
69
#![ feature( type_alias_impl_trait) ]
70
+ #![ feature( cell_filter_map) ]
70
71
#![ feature( cfg_version) ]
71
72
#![ cfg_attr(
72
73
all( target_family = "bolos" , not( version( "1.64" ) ) ) ,
@@ -339,6 +340,35 @@ impl HostIO {
339
340
}
340
341
}
341
342
343
+ /// A block must be at least [HASH_LEN] bytes long.
344
+ pub struct Block ( [ u8 ] ) ;
345
+
346
+ impl Block {
347
+ pub fn as_raw_slice ( & self ) -> & [ u8 ] {
348
+ let block = self as * const Block as * const [ u8 ] ;
349
+ unsafe { & * block }
350
+ }
351
+
352
+ pub fn from_raw_slice_opt ( block : & [ u8 ] ) -> Option < & Self > {
353
+ if block. len ( ) >= HASH_LEN {
354
+ let block2 = block as * const [ u8 ] as * const Block ;
355
+ Some ( unsafe { & * block2 } )
356
+ } else {
357
+ None
358
+ }
359
+ }
360
+
361
+ /// Panics if block is illegally short
362
+ pub fn next_block ( & self ) -> SHA256Sum {
363
+ self . as_raw_slice ( ) [ ..HASH_LEN ] . try_into ( ) . unwrap ( )
364
+ }
365
+
366
+ /// Panics if block is illegally short
367
+ pub fn data ( & self ) -> & [ u8 ] {
368
+ & self . as_raw_slice ( ) [ HASH_LEN ..]
369
+ }
370
+ }
371
+
342
372
/// Concrete implementation of the interface defined by
343
373
/// [ledger_parser_combinators::async_parser::Readable] for the block protocol.
344
374
#[ derive( Clone ) ]
@@ -348,28 +378,56 @@ pub struct ByteStream {
348
378
current_offset : usize ,
349
379
}
350
380
381
+ impl ByteStream {
382
+ /// Get the current block.
383
+ fn get_current_block < ' a > ( & ' a mut self ) -> impl ' a + Future < Output = Ref < ' static , Block > > {
384
+ async move {
385
+ if self . current_chunk == [ 0 ; 32 ] {
386
+ let _: ( ) = reject ( ) . await ;
387
+ }
388
+ let chunk_res = self . host_io . get_chunk ( self . current_chunk ) . await ;
389
+ match chunk_res {
390
+ Ok ( a) => match Ref :: filter_map ( a, Block :: from_raw_slice_opt) {
391
+ Ok ( r) => r,
392
+ Err ( _) => reject ( ) . await ,
393
+ }
394
+ Err ( _) => reject ( ) . await ,
395
+ }
396
+ }
397
+ }
398
+
399
+ /// Get the rest of the current block that we have not already processed.
400
+ ///
401
+ /// [block] must be the current block.
402
+ fn slice_from_block < ' a , ' b > ( & ' a mut self , block : & ' b Block ) -> & ' b [ u8 ] {
403
+ & block. data ( ) [ self . current_offset ..]
404
+ }
405
+
406
+ /// Consume [consume] bytes from the current block.
407
+ ///
408
+ /// [block] must be the current block. [consume] must be less than or equal
409
+ /// to `self.slice_from_block(block).len()`.
410
+ fn consume ( & mut self , block : & Block , consume : usize ) {
411
+ self . current_offset += consume;
412
+ debug_assert ! ( self . current_offset <= block. data( ) . len( ) ) ;
413
+ if self . current_offset == block. data ( ) . len ( ) {
414
+ self . current_chunk = block. next_block ( ) ;
415
+ self . current_offset = 0 ;
416
+ }
417
+ }
418
+ }
419
+
351
420
impl Readable for ByteStream {
352
421
type OutFut < ' a , const N : usize > = impl ' a + core:: future:: Future < Output = [ u8 ; N ] > ;
353
422
fn read < ' a : ' b , ' b , const N : usize > ( & ' a mut self ) -> Self :: OutFut < ' b , N > {
354
423
async move {
355
424
let mut buffer = ArrayVec :: < u8 , N > :: new ( ) ;
356
425
while !buffer. is_full ( ) {
357
- if self . current_chunk == [ 0 ; 32 ] {
358
- let _: ( ) = reject ( ) . await ;
359
- }
360
- let chunk_res = self . host_io . get_chunk ( self . current_chunk ) . await ;
361
- let chunk = match chunk_res {
362
- Ok ( a) => a,
363
- Err ( _) => reject ( ) . await ,
364
- } ;
365
- let avail = & chunk[ self . current_offset + HASH_LEN ..] ;
426
+ let block = self . get_current_block ( ) . await ;
427
+ let avail = self . slice_from_block ( & block) ;
366
428
let consuming = core:: cmp:: min ( avail. len ( ) , buffer. remaining_capacity ( ) ) ;
367
429
buffer. try_extend_from_slice ( & avail[ 0 ..consuming] ) . ok ( ) ;
368
- self . current_offset += consuming;
369
- if self . current_offset + HASH_LEN == chunk. len ( ) {
370
- self . current_chunk = chunk[ 0 ..HASH_LEN ] . try_into ( ) . unwrap ( ) ;
371
- self . current_offset = 0 ;
372
- }
430
+ self . consume ( & * block, consuming) ;
373
431
}
374
432
buffer. into_inner ( ) . unwrap ( )
375
433
}
0 commit comments