@@ -26,6 +26,7 @@ use std::{
26
26
future,
27
27
io:: { self , ErrorKind } ,
28
28
} ;
29
+ use thiserror:: Error ;
29
30
use tokio:: sync:: Mutex ;
30
31
use tokio:: { io:: AsyncBufReadExt , sync:: RwLock } ;
31
32
use tokio_stream:: { wrappers:: LinesStream , Stream } ;
@@ -71,6 +72,11 @@ enum ResourceSpec {
71
72
commit : String ,
72
73
previous : Option < String > ,
73
74
} ,
75
+ AssignIndex {
76
+ domain : String ,
77
+ source_commit : String ,
78
+ target_commit : String ,
79
+ } ,
74
80
CheckTask {
75
81
task_id : String ,
76
82
} ,
@@ -107,6 +113,7 @@ fn query_map(uri: &Uri) -> HashMap<String, String> {
107
113
fn uri_to_spec ( uri : & Uri ) -> Result < ResourceSpec , SpecParseError > {
108
114
lazy_static ! {
109
115
static ref RE_INDEX : Regex = Regex :: new( r"^/index(/?)$" ) . unwrap( ) ;
116
+ static ref RE_ASSIGN : Regex = Regex :: new( r"^/assign(/?)$" ) . unwrap( ) ;
110
117
static ref RE_CHECK : Regex = Regex :: new( r"^/check(/?)$" ) . unwrap( ) ;
111
118
static ref RE_SEARCH : Regex = Regex :: new( r"^/search(/?)$" ) . unwrap( ) ;
112
119
static ref RE_SIMILAR : Regex = Regex :: new( r"^/similar(/?)$" ) . unwrap( ) ;
@@ -127,6 +134,21 @@ fn uri_to_spec(uri: &Uri) -> Result<ResourceSpec, SpecParseError> {
127
134
} ) ,
128
135
_ => Err ( SpecParseError :: NoCommitIdOrDomain ) ,
129
136
}
137
+ } else if RE_ASSIGN . is_match ( path) {
138
+ let query = query_map ( uri) ;
139
+ let domain = query. get ( "domain" ) . map ( |v| v. to_string ( ) ) ;
140
+ let source_commit = query. get ( "source_commit" ) . map ( |v| v. to_string ( ) ) ;
141
+ let target_commit = query. get ( "target_commit" ) . map ( |v| v. to_string ( ) ) ;
142
+ match ( domain, source_commit, target_commit) {
143
+ ( Some ( domain) , Some ( source_commit) , Some ( target_commit) ) => {
144
+ Ok ( ResourceSpec :: AssignIndex {
145
+ domain,
146
+ source_commit,
147
+ target_commit,
148
+ } )
149
+ }
150
+ _ => Err ( SpecParseError :: NoCommitIdOrDomain ) ,
151
+ }
130
152
} else if RE_CHECK . is_match ( path) {
131
153
let query = query_map ( uri) ;
132
154
if let Some ( task_id) = query. get ( "task_id" ) {
@@ -360,6 +382,34 @@ impl Service {
360
382
} ) ;
361
383
}
362
384
385
+ async fn assign_index (
386
+ self : Arc < Self > ,
387
+ domain : String ,
388
+ source_commit : String ,
389
+ target_commit : String ,
390
+ ) -> Result < ( ) , AssignIndexError > {
391
+ let source_name = create_index_name ( & domain, & source_commit) ;
392
+ let target_name = create_index_name ( & domain, & target_commit) ;
393
+
394
+ if self . get_index ( & target_name) . await . is_some ( ) {
395
+ return Err ( AssignIndexError :: TargetCommitAlreadyHasIndex ) ;
396
+ }
397
+ if let Some ( index) = self . get_index ( & source_name) . await {
398
+ let mut indexes = self . indexes . write ( ) . await ;
399
+ indexes. insert ( target_name. clone ( ) , index. clone ( ) ) ;
400
+
401
+ std:: mem:: drop ( indexes) ;
402
+ tokio:: task:: block_in_place ( move || {
403
+ let path = self . path . clone ( ) ;
404
+ serialize_index ( path, & target_name, ( * index) . clone ( ) ) . unwrap ( ) ;
405
+ } ) ;
406
+
407
+ Ok ( ( ) )
408
+ } else {
409
+ Err ( AssignIndexError :: SourceCommitNotFound )
410
+ }
411
+ }
412
+
363
413
async fn process_operation_chunks (
364
414
self : & Arc < Self > ,
365
415
mut opstream : futures:: stream:: Chunks <
@@ -407,6 +457,22 @@ impl Service {
407
457
self . start_indexing ( domain, commit, previous, task_id. clone ( ) ) ;
408
458
Ok ( Response :: builder ( ) . body ( task_id. into ( ) ) . unwrap ( ) )
409
459
}
460
+ Ok ( ResourceSpec :: AssignIndex {
461
+ domain,
462
+ source_commit,
463
+ target_commit,
464
+ } ) => {
465
+ let result = self
466
+ . assign_index ( domain, source_commit, target_commit)
467
+ . await ;
468
+ match result {
469
+ Ok ( ( ) ) => Ok ( Response :: builder ( ) . status ( 204 ) . body ( Body :: empty ( ) ) . unwrap ( ) ) ,
470
+ Err ( e) => Ok ( Response :: builder ( )
471
+ . status ( 400 )
472
+ . body ( e. to_string ( ) . into ( ) )
473
+ . unwrap ( ) ) ,
474
+ }
475
+ }
410
476
Ok ( ResourceSpec :: CheckTask { task_id } ) => {
411
477
if let Some ( state) = self . get_task_status ( & task_id) . await {
412
478
Ok ( Response :: builder ( )
@@ -517,6 +583,16 @@ impl Service {
517
583
}
518
584
}
519
585
586
+ #[ derive( Debug , Error ) ]
587
+ enum AssignIndexError {
588
+ #[ error( "io error: {0}" ) ]
589
+ Io ( #[ from] io:: Error ) ,
590
+ #[ error( "source commit not found" ) ]
591
+ SourceCommitNotFound ,
592
+ #[ error( "target commit already has an index" ) ]
593
+ TargetCommitAlreadyHasIndex ,
594
+ }
595
+
520
596
pub async fn serve < P : Into < PathBuf > > (
521
597
directory : P ,
522
598
port : u16 ,
0 commit comments