@@ -48,6 +48,7 @@ pub enum Operation {
48
48
Inserted { string : String , id : String } ,
49
49
Changed { string : String , id : String } ,
50
50
Deleted { id : String } ,
51
+ Error { message : String } ,
51
52
}
52
53
53
54
#[ derive( Deserialize , Debug ) ]
@@ -79,6 +80,11 @@ enum ResourceSpec {
79
80
id : String ,
80
81
count : usize ,
81
82
} ,
83
+ DuplicateCandidates {
84
+ domain : String ,
85
+ commit : String ,
86
+ threshold : f32 ,
87
+ } ,
82
88
}
83
89
84
90
#[ derive( Debug ) ]
@@ -104,6 +110,7 @@ fn uri_to_spec(uri: &Uri) -> Result<ResourceSpec, SpecParseError> {
104
110
static ref RE_CHECK : Regex = Regex :: new( r"^/check(/?)$" ) . unwrap( ) ;
105
111
static ref RE_SEARCH : Regex = Regex :: new( r"^/search(/?)$" ) . unwrap( ) ;
106
112
static ref RE_SIMILAR : Regex = Regex :: new( r"^/similar(/?)$" ) . unwrap( ) ;
113
+ static ref RE_DUPLICATES : Regex = Regex :: new( r"^/duplicates(/?)$" ) . unwrap( ) ;
107
114
}
108
115
let path = uri. path ( ) ;
109
116
@@ -163,6 +170,22 @@ fn uri_to_spec(uri: &Uri) -> Result<ResourceSpec, SpecParseError> {
163
170
}
164
171
_ => Err ( SpecParseError :: NoCommitIdOrDomain ) ,
165
172
}
173
+ } else if RE_DUPLICATES . is_match ( path) {
174
+ let query = query_map ( uri) ;
175
+ let domain = query. get ( "domain" ) . map ( |v| v. to_string ( ) ) ;
176
+ let commit = query. get ( "commit" ) . map ( |v| v. to_string ( ) ) ;
177
+ let threshold = query. get ( "threshold" ) . map ( |v| v. parse :: < f32 > ( ) . unwrap ( ) ) ;
178
+ match ( domain, commit) {
179
+ ( Some ( domain) , Some ( commit) ) => {
180
+ let threshold = threshold. unwrap_or ( 0.0 ) ;
181
+ Ok ( ResourceSpec :: DuplicateCandidates {
182
+ domain,
183
+ commit,
184
+ threshold,
185
+ } )
186
+ }
187
+ _ => Err ( SpecParseError :: NoCommitIdOrDomain ) ,
188
+ }
166
189
} else {
167
190
Err ( SpecParseError :: UnknownPath )
168
191
}
@@ -216,13 +239,20 @@ async fn get_operations_from_terminusdb(
216
239
let lines = StreamReader :: new ( res) . lines ( ) ;
217
240
let lines_stream = LinesStream :: new ( lines) ;
218
241
let fp = lines_stream. and_then ( |l| {
242
+ dbg ! ( & l) ;
219
243
future:: ready (
220
244
serde_json:: from_str ( & l) . map_err ( |e| std:: io:: Error :: new ( ErrorKind :: Other , e) ) ,
221
245
)
222
246
} ) ;
223
247
Ok ( fp)
224
248
}
225
249
250
+ fn add_to_duplicates ( duplicates : & mut HashMap < usize , usize > , id1 : usize , id2 : usize ) {
251
+ if id1 < id2 {
252
+ duplicates. insert ( id1, id2) ;
253
+ }
254
+ }
255
+
226
256
impl Service {
227
257
async fn get_task_status ( & self , task_id : & str ) -> Option < TaskStatus > {
228
258
self . tasks . read ( ) . await . get ( task_id) . cloned ( )
@@ -389,6 +419,33 @@ impl Service {
389
419
. unwrap ( ) )
390
420
}
391
421
}
422
+ Ok ( ResourceSpec :: DuplicateCandidates {
423
+ domain,
424
+ commit,
425
+ threshold,
426
+ } ) => {
427
+ let index_id = create_index_name ( & domain, & commit) ;
428
+ // if None, then return 404
429
+ let hnsw = self . get_index ( & index_id) . await . unwrap ( ) ;
430
+ let mut duplicates: HashMap < usize , usize > = HashMap :: new ( ) ;
431
+ let elts = hnsw. layer_len ( 0 ) ;
432
+ for i in 0 ..elts {
433
+ let current_point = & hnsw. feature ( i) ;
434
+ let results = search ( current_point, 2 , & hnsw) . unwrap ( ) ;
435
+ for result in results. iter ( ) {
436
+ if f32:: from_bits ( result. distance ( ) ) < threshold {
437
+ add_to_duplicates ( & mut duplicates, i, result. internal_id ( ) )
438
+ }
439
+ }
440
+ }
441
+ let mut v: Vec < ( & str , & str ) > = duplicates
442
+ . into_iter ( )
443
+ . map ( |( i, j) | ( hnsw. feature ( i) . id ( ) , hnsw. feature ( j) . id ( ) ) )
444
+ . collect ( ) ;
445
+ Ok ( Response :: builder ( )
446
+ . body ( serde_json:: to_string ( & v) . unwrap ( ) . into ( ) )
447
+ . unwrap ( ) )
448
+ }
392
449
Ok ( ResourceSpec :: Similar {
393
450
domain,
394
451
commit,
0 commit comments