@@ -426,6 +426,168 @@ pub mod rc_blanket_impls {
426
426
}
427
427
}
428
428
429
+ /// Blanket implementations for atomic reference counted batches.
430
+ pub mod abomonated_arc_blanket_impls {
431
+ use std:: io:: Write ;
432
+ use std:: mem;
433
+ use std:: ops:: Deref ;
434
+ use std:: sync:: Arc ;
435
+
436
+ use abomonation:: Abomonation ;
437
+
438
+ /// Wrapper over Arc that can be safely Abomonated.
439
+ pub enum AbomArc < T > {
440
+ /// An Arc that has been constructed normally
441
+ Owned ( Arc < T > ) ,
442
+ /// The result of decoding an abomonated AbomArc
443
+ Abomonated ( Box < T > ) ,
444
+ }
445
+
446
+ impl < T > AbomArc < T > {
447
+ fn new ( inner : T ) -> Self {
448
+ Self :: Owned ( Arc :: new ( inner) )
449
+ }
450
+ }
451
+
452
+ impl < T : Clone > Clone for AbomArc < T > {
453
+ fn clone ( & self ) -> Self {
454
+ match self {
455
+ Self :: Owned ( arc) => Self :: Owned ( Arc :: clone ( arc) ) ,
456
+ Self :: Abomonated ( val) => Self :: Owned ( Arc :: new ( T :: clone ( & * val) ) ) ,
457
+ }
458
+ }
459
+ }
460
+
461
+ impl < T > Deref for AbomArc < T > {
462
+ type Target = T ;
463
+ fn deref ( & self ) -> & Self :: Target {
464
+ match self {
465
+ Self :: Owned ( arc) => & * * arc,
466
+ Self :: Abomonated ( val) => & * val,
467
+ }
468
+ }
469
+ }
470
+
471
+ impl < T : Abomonation > Abomonation for AbomArc < T > {
472
+ unsafe fn entomb < W : Write > ( & self , bytes : & mut W ) -> std:: io:: Result < ( ) > {
473
+ bytes. write_all ( std:: slice:: from_raw_parts ( mem:: transmute ( & * * self ) , mem:: size_of :: < T > ( ) ) ) ?;
474
+ ( * * self ) . entomb ( bytes)
475
+ }
476
+ unsafe fn exhume < ' a , ' b > ( & ' a mut self , bytes : & ' b mut [ u8 ] ) -> Option < & ' b mut [ u8 ] > {
477
+ let binary_len = mem:: size_of :: < T > ( ) ;
478
+ if binary_len > bytes. len ( ) {
479
+ None
480
+ } else {
481
+ let ( mine, rest) = bytes. split_at_mut ( binary_len) ;
482
+ let mut value = Box :: from_raw ( mine. as_mut_ptr ( ) as * mut T ) ;
483
+ let rest = ( * value) . exhume ( rest) ?;
484
+ std:: ptr:: write ( self , Self :: Abomonated ( value) ) ;
485
+ Some ( rest)
486
+ }
487
+ }
488
+ fn extent ( & self ) -> usize {
489
+ mem:: size_of :: < T > ( ) + ( & * * self ) . extent ( )
490
+ }
491
+ }
492
+
493
+ use timely:: progress:: { Antichain , frontier:: AntichainRef } ;
494
+ use super :: { Batch , BatchReader , Batcher , Builder , Merger , Cursor , Description } ;
495
+
496
+ impl < K , V , T , R , B : BatchReader < K , V , T , R > > BatchReader < K , V , T , R > for AbomArc < B > {
497
+
498
+ /// The type used to enumerate the batch's contents.
499
+ type Cursor = ArcBatchCursor < K , V , T , R , B > ;
500
+ /// Acquires a cursor to the batch's contents.
501
+ fn cursor ( & self ) -> Self :: Cursor {
502
+ ArcBatchCursor :: new ( ( & * * self ) . cursor ( ) )
503
+ }
504
+
505
+ /// The number of updates in the batch.
506
+ fn len ( & self ) -> usize { ( & * * self ) . len ( ) }
507
+ /// Describes the times of the updates in the batch.
508
+ fn description ( & self ) -> & Description < T > { ( & * * self ) . description ( ) }
509
+ }
510
+
511
+ /// Wrapper to provide cursor to nested scope.
512
+ pub struct ArcBatchCursor < K , V , T , R , B : BatchReader < K , V , T , R > > {
513
+ phantom : :: std:: marker:: PhantomData < ( K , V , T , R ) > ,
514
+ cursor : B :: Cursor ,
515
+ }
516
+
517
+ impl < K , V , T , R , B : BatchReader < K , V , T , R > > ArcBatchCursor < K , V , T , R , B > {
518
+ fn new ( cursor : B :: Cursor ) -> Self {
519
+ ArcBatchCursor {
520
+ cursor,
521
+ phantom : :: std:: marker:: PhantomData ,
522
+ }
523
+ }
524
+ }
525
+
526
+ impl < K , V , T , R , B : BatchReader < K , V , T , R > > Cursor < K , V , T , R > for ArcBatchCursor < K , V , T , R , B > {
527
+
528
+ type Storage = AbomArc < B > ;
529
+
530
+ #[ inline] fn key_valid ( & self , storage : & Self :: Storage ) -> bool { self . cursor . key_valid ( storage) }
531
+ #[ inline] fn val_valid ( & self , storage : & Self :: Storage ) -> bool { self . cursor . val_valid ( storage) }
532
+
533
+ #[ inline] fn key < ' a > ( & self , storage : & ' a Self :: Storage ) -> & ' a K { self . cursor . key ( storage) }
534
+ #[ inline] fn val < ' a > ( & self , storage : & ' a Self :: Storage ) -> & ' a V { self . cursor . val ( storage) }
535
+
536
+ #[ inline]
537
+ fn map_times < L : FnMut ( & T , & R ) > ( & mut self , storage : & Self :: Storage , logic : L ) {
538
+ self . cursor . map_times ( storage, logic)
539
+ }
540
+
541
+ #[ inline] fn step_key ( & mut self , storage : & Self :: Storage ) { self . cursor . step_key ( storage) }
542
+ #[ inline] fn seek_key ( & mut self , storage : & Self :: Storage , key : & K ) { self . cursor . seek_key ( storage, key) }
543
+
544
+ #[ inline] fn step_val ( & mut self , storage : & Self :: Storage ) { self . cursor . step_val ( storage) }
545
+ #[ inline] fn seek_val ( & mut self , storage : & Self :: Storage , val : & V ) { self . cursor . seek_val ( storage, val) }
546
+
547
+ #[ inline] fn rewind_keys ( & mut self , storage : & Self :: Storage ) { self . cursor . rewind_keys ( storage) }
548
+ #[ inline] fn rewind_vals ( & mut self , storage : & Self :: Storage ) { self . cursor . rewind_vals ( storage) }
549
+ }
550
+
551
+ /// An immutable collection of updates.
552
+ impl < K , V , T , R , B : Batch < K , V , T , R > > Batch < K , V , T , R > for AbomArc < B > {
553
+ type Batcher = ArcBatcher < K , V , T , R , B > ;
554
+ type Builder = ArcBuilder < K , V , T , R , B > ;
555
+ type Merger = ArcMerger < K , V , T , R , B > ;
556
+ }
557
+
558
+ /// Wrapper type for batching reference counted batches.
559
+ pub struct ArcBatcher < K , V , T , R , B : Batch < K , V , T , R > > { batcher : B :: Batcher }
560
+
561
+ /// Functionality for collecting and batching updates.
562
+ impl < K , V , T , R , B : Batch < K , V , T , R > > Batcher < K , V , T , R , AbomArc < B > > for ArcBatcher < K , V , T , R , B > {
563
+ fn new ( ) -> Self { ArcBatcher { batcher : <B :: Batcher as Batcher < K , V , T , R , B > >:: new ( ) } }
564
+ fn push_batch ( & mut self , batch : & mut Vec < ( ( K , V ) , T , R ) > ) { self . batcher . push_batch ( batch) }
565
+ fn seal ( & mut self , upper : Antichain < T > ) -> AbomArc < B > { AbomArc :: new ( self . batcher . seal ( upper) ) }
566
+ fn frontier ( & mut self ) -> timely:: progress:: frontier:: AntichainRef < T > { self . batcher . frontier ( ) }
567
+ }
568
+
569
+ /// Wrapper type for building reference counted batches.
570
+ pub struct ArcBuilder < K , V , T , R , B : Batch < K , V , T , R > > { builder : B :: Builder }
571
+
572
+ /// Functionality for building batches from ordered update sequences.
573
+ impl < K , V , T , R , B : Batch < K , V , T , R > > Builder < K , V , T , R , AbomArc < B > > for ArcBuilder < K , V , T , R , B > {
574
+ fn new ( ) -> Self { ArcBuilder { builder : <B :: Builder as Builder < K , V , T , R , B > >:: new ( ) } }
575
+ fn with_capacity ( cap : usize ) -> Self { ArcBuilder { builder : <B :: Builder as Builder < K , V , T , R , B > >:: with_capacity ( cap) } }
576
+ fn push ( & mut self , element : ( K , V , T , R ) ) { self . builder . push ( element) }
577
+ fn done ( self , lower : Antichain < T > , upper : Antichain < T > , since : Antichain < T > ) -> AbomArc < B > { AbomArc :: new ( self . builder . done ( lower, upper, since) ) }
578
+ }
579
+
580
+ /// Wrapper type for merging reference counted batches.
581
+ pub struct ArcMerger < K , V , T , R , B : Batch < K , V , T , R > > { merger : B :: Merger }
582
+
583
+ /// Represents a merge in progress.
584
+ impl < K , V , T , R , B : Batch < K , V , T , R > > Merger < K , V , T , R , AbomArc < B > > for ArcMerger < K , V , T , R , B > {
585
+ fn new ( source1 : & AbomArc < B > , source2 : & AbomArc < B > , compaction_frontier : Option < AntichainRef < T > > ) -> Self { ArcMerger { merger : B :: begin_merge ( source1, source2, compaction_frontier) } }
586
+ fn work ( & mut self , source1 : & AbomArc < B > , source2 : & AbomArc < B > , fuel : & mut isize ) { self . merger . work ( source1, source2, fuel) }
587
+ fn done ( self ) -> AbomArc < B > { AbomArc :: new ( self . merger . done ( ) ) }
588
+ }
589
+ }
590
+
429
591
430
592
/// Blanket implementations for reference counted batches.
431
593
pub mod abomonated_blanket_impls {
0 commit comments