@@ -16,7 +16,7 @@ use tracing::{debug, error, error_span, trace, warn};
16
16
17
17
use crate :: {
18
18
ranger:: Message ,
19
- store:: { self , DownloadPolicy , ImportNamespaceOutcome , Query } ,
19
+ store:: { fs :: StoreInstance , DownloadPolicy , ImportNamespaceOutcome , Query , Store } ,
20
20
Author , AuthorHeads , AuthorId , Capability , CapabilityKind , ContentStatus ,
21
21
ContentStatusCallback , Event , NamespaceId , NamespaceSecret , PeerIdBytes , Replica , SignedEntry ,
22
22
SyncOutcome ,
@@ -170,8 +170,8 @@ pub struct OpenState {
170
170
}
171
171
172
172
#[ derive( Debug ) ]
173
- struct OpenReplica < S : store :: Store > {
174
- replica : Replica < S :: Instance > ,
173
+ struct OpenReplica {
174
+ replica : Replica < StoreInstance > ,
175
175
handles : usize ,
176
176
sync : bool ,
177
177
}
@@ -218,8 +218,8 @@ impl OpenOpts {
218
218
#[ allow( missing_docs) ]
219
219
impl SyncHandle {
220
220
/// Spawn a sync actor and return a handle.
221
- pub fn spawn < S : store :: Store > (
222
- store : S ,
221
+ pub fn spawn (
222
+ store : Store ,
223
223
content_status_callback : Option < ContentStatusCallback > ,
224
224
me : String ,
225
225
) -> SyncHandle {
@@ -524,14 +524,14 @@ impl Drop for SyncHandle {
524
524
}
525
525
}
526
526
527
- struct Actor < S : store :: Store > {
528
- store : S ,
529
- states : OpenReplicas < S > ,
527
+ struct Actor {
528
+ store : Store ,
529
+ states : OpenReplicas ,
530
530
action_rx : flume:: Receiver < Action > ,
531
531
content_status_callback : Option < ContentStatusCallback > ,
532
532
}
533
533
534
- impl < S : store :: Store > Actor < S > {
534
+ impl Actor {
535
535
fn run ( & mut self ) -> Result < ( ) > {
536
536
while let Ok ( action) = self . action_rx . recv ( ) {
537
537
trace ! ( %action, "tick" ) ;
@@ -741,24 +741,22 @@ impl<S: store::Store> Actor<S> {
741
741
}
742
742
}
743
743
744
- struct OpenReplicas < S : store:: Store > ( HashMap < NamespaceId , OpenReplica < S > > ) ;
744
+ #[ derive( Default ) ]
745
+ struct OpenReplicas ( HashMap < NamespaceId , OpenReplica > ) ;
745
746
746
- // We need a manual impl here because the derive won't work unless we'd restrict to S: Default.
747
- impl < S : store:: Store > Default for OpenReplicas < S > {
748
- fn default ( ) -> Self {
749
- Self ( Default :: default ( ) )
750
- }
751
- }
752
- impl < S : store:: Store > OpenReplicas < S > {
753
- fn replica ( & mut self , namespace : & NamespaceId ) -> Result < & mut Replica < S :: Instance > > {
747
+ impl OpenReplicas {
748
+ fn replica ( & mut self , namespace : & NamespaceId ) -> Result < & mut Replica < StoreInstance > > {
754
749
self . get_mut ( namespace) . map ( |state| & mut state. replica )
755
750
}
756
751
757
- fn get_mut ( & mut self , namespace : & NamespaceId ) -> Result < & mut OpenReplica < S > > {
752
+ fn get_mut ( & mut self , namespace : & NamespaceId ) -> Result < & mut OpenReplica > {
758
753
self . 0 . get_mut ( namespace) . context ( "replica not open" )
759
754
}
760
755
761
- fn replica_if_syncing ( & mut self , namespace : & NamespaceId ) -> Result < & mut Replica < S :: Instance > > {
756
+ fn replica_if_syncing (
757
+ & mut self ,
758
+ namespace : & NamespaceId ,
759
+ ) -> Result < & mut Replica < StoreInstance > > {
762
760
let state = self . get_mut ( namespace) ?;
763
761
if !state. sync {
764
762
Err ( anyhow ! ( "sync is not enabled for replica" ) )
@@ -781,7 +779,7 @@ impl<S: store::Store> OpenReplicas<S> {
781
779
& mut self ,
782
780
namespace : NamespaceId ,
783
781
opts : OpenOpts ,
784
- open_cb : impl Fn ( ) -> Result < Replica < S :: Instance > > ,
782
+ open_cb : impl Fn ( ) -> Result < Replica < StoreInstance > > ,
785
783
) -> Result < ( ) > {
786
784
match self . 0 . entry ( namespace) {
787
785
hash_map:: Entry :: Vacant ( e) => {
@@ -811,7 +809,7 @@ impl<S: store::Store> OpenReplicas<S> {
811
809
fn close_with (
812
810
& mut self ,
813
811
namespace : NamespaceId ,
814
- on_close : impl Fn ( Replica < S :: Instance > ) ,
812
+ on_close : impl Fn ( Replica < StoreInstance > ) ,
815
813
) -> bool {
816
814
match self . 0 . entry ( namespace) {
817
815
hash_map:: Entry :: Vacant ( _e) => {
@@ -833,7 +831,7 @@ impl<S: store::Store> OpenReplicas<S> {
833
831
}
834
832
}
835
833
836
- fn close_all_with ( & mut self , on_close : impl Fn ( Replica < S :: Instance > ) ) {
834
+ fn close_all_with ( & mut self , on_close : impl Fn ( Replica < StoreInstance > ) ) {
837
835
for ( _namespace, state) in self . 0 . drain ( ) {
838
836
on_close ( state. replica )
839
837
}
@@ -855,7 +853,7 @@ fn iter_to_channel<T: Send + 'static>(
855
853
Ok ( ( ) )
856
854
}
857
855
858
- fn get_author < S : store :: Store > ( store : & S , id : & AuthorId ) -> Result < Author > {
856
+ fn get_author ( store : & Store , id : & AuthorId ) -> Result < Author > {
859
857
store. get_author ( id) ?. context ( "author not found" )
860
858
}
861
859
@@ -866,10 +864,10 @@ fn send_reply<T>(sender: oneshot::Sender<T>, value: T) -> Result<(), SendReplyEr
866
864
sender. send ( value) . map_err ( send_reply_error)
867
865
}
868
866
869
- fn send_reply_with < T , S : store :: Store > (
867
+ fn send_reply_with < T > (
870
868
sender : oneshot:: Sender < Result < T > > ,
871
- this : & mut Actor < S > ,
872
- f : impl FnOnce ( & mut Actor < S > ) -> Result < T > ,
869
+ this : & mut Actor ,
870
+ f : impl FnOnce ( & mut Actor ) -> Result < T > ,
873
871
) -> Result < ( ) , SendReplyError > {
874
872
sender. send ( f ( this) ) . map_err ( send_reply_error)
875
873
}
@@ -880,10 +878,12 @@ fn send_reply_error<T>(_err: T) -> SendReplyError {
880
878
881
879
#[ cfg( test) ]
882
880
mod tests {
881
+ use crate :: store;
882
+
883
883
use super :: * ;
884
884
#[ tokio:: test]
885
885
async fn open_close ( ) -> anyhow:: Result < ( ) > {
886
- let store = store:: memory :: Store :: default ( ) ;
886
+ let store = store:: Store :: memory ( ) ;
887
887
let sync = SyncHandle :: spawn ( store, None , "foo" . into ( ) ) ;
888
888
let namespace = NamespaceSecret :: new ( & mut rand:: rngs:: OsRng { } ) ;
889
889
let id = namespace. id ( ) ;
0 commit comments