@@ -339,6 +339,31 @@ impl HostIO {
339
339
}
340
340
}
341
341
342
+ /// A block must be at least [HASH_LEN] bytes long.
343
+ pub struct Block ( [ u8 ] ) ;
344
+
345
+ impl Block {
346
+ pub fn as_raw_slice ( & self ) -> & [ u8 ] {
347
+ let block = self as * const Block as * const [ u8 ] ;
348
+ unsafe { & * block }
349
+ }
350
+
351
+ pub fn from_raw_slice ( block : & [ u8 ] ) -> & Self {
352
+ let block2 = block as * const [ u8 ] as * const Block ;
353
+ unsafe { & * block2 }
354
+ }
355
+
356
+ /// Panics if block is illegally short
357
+ pub fn next_block ( & self ) -> SHA256Sum {
358
+ self . as_raw_slice ( ) [ ..HASH_LEN ] . try_into ( ) . unwrap ( )
359
+ }
360
+
361
+ /// Panics if block is illegally short
362
+ pub fn data ( & self ) -> & [ u8 ] {
363
+ & self . as_raw_slice ( ) [ HASH_LEN ..]
364
+ }
365
+ }
366
+
342
367
/// Concrete implementation of the interface defined by
343
368
/// [ledger_parser_combinators::async_parser::Readable] for the block protocol.
344
369
#[ derive( Clone ) ]
@@ -348,28 +373,54 @@ pub struct ByteStream {
348
373
current_offset : usize ,
349
374
}
350
375
376
+ impl ByteStream {
377
+ /// Get the current block.
378
+ fn get_current_block < ' a > ( & ' a mut self ) -> impl ' a + Future < Output = Ref < ' static , Block > > {
379
+ async move {
380
+ if self . current_chunk == [ 0 ; 32 ] {
381
+ let _: ( ) = reject ( ) . await ;
382
+ }
383
+ let chunk_res = self . host_io . get_chunk ( self . current_chunk ) . await ;
384
+ match chunk_res {
385
+ Ok ( a) => Ref :: map ( a, Block :: from_raw_slice) ,
386
+ Err ( _) => reject ( ) . await ,
387
+ }
388
+ //return Ref::map(chunk, |r| &r[self.current_offset + HASH_LEN..]);
389
+ }
390
+ }
391
+
392
+ /// Get the rest of the current block that we have not already processed.
393
+ ///
394
+ /// [block] must be the current block.
395
+ fn slice_from_block < ' a , ' b > ( & ' a mut self , block : & ' b Block ) -> & ' b [ u8 ] {
396
+ & block. data ( ) [ self . current_offset ..]
397
+ }
398
+
399
+ /// Consume [consume] bytes from the current block.
400
+ ///
401
+ /// [block] must be the current block. [consume] must be less than or equal
402
+ /// to `self.slice_from_block(block).len()`.
403
+ fn consume ( & mut self , block : & Block , consume : usize ) {
404
+ self . current_offset += consume;
405
+ debug_assert ! ( self . current_offset <= block. data( ) . len( ) ) ;
406
+ if self . current_offset == block. data ( ) . len ( ) {
407
+ self . current_chunk = block. next_block ( ) ;
408
+ self . current_offset = 0 ;
409
+ }
410
+ }
411
+ }
412
+
351
413
impl Readable for ByteStream {
352
414
type OutFut < ' a , const N : usize > = impl ' a + core:: future:: Future < Output = [ u8 ; N ] > ;
353
415
fn read < ' a : ' b , ' b , const N : usize > ( & ' a mut self ) -> Self :: OutFut < ' b , N > {
354
416
async move {
355
417
let mut buffer = ArrayVec :: < u8 , N > :: new ( ) ;
356
418
while !buffer. is_full ( ) {
357
- if self . current_chunk == [ 0 ; 32 ] {
358
- let _: ( ) = reject ( ) . await ;
359
- }
360
- let chunk_res = self . host_io . get_chunk ( self . current_chunk ) . await ;
361
- let chunk = match chunk_res {
362
- Ok ( a) => a,
363
- Err ( _) => reject ( ) . await ,
364
- } ;
365
- let avail = & chunk[ self . current_offset + HASH_LEN ..] ;
419
+ let block = self . get_current_block ( ) . await ;
420
+ let avail = self . slice_from_block ( & block) ;
366
421
let consuming = core:: cmp:: min ( avail. len ( ) , buffer. remaining_capacity ( ) ) ;
367
422
buffer. try_extend_from_slice ( & avail[ 0 ..consuming] ) . ok ( ) ;
368
- self . current_offset += consuming;
369
- if self . current_offset + HASH_LEN == chunk. len ( ) {
370
- self . current_chunk = chunk[ 0 ..HASH_LEN ] . try_into ( ) . unwrap ( ) ;
371
- self . current_offset = 0 ;
372
- }
423
+ self . consume ( & * block, consuming) ;
373
424
}
374
425
buffer. into_inner ( ) . unwrap ( )
375
426
}
0 commit comments