@@ -16,23 +16,50 @@ pub(crate) fn join_into<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
16
16
mut logic : impl FnMut ( & Key , & Val1 , & Val2 ) -> Result ,
17
17
) {
18
18
let mut results = Vec :: new ( ) ;
19
+ let push_result = |k : & Key , v1 : & Val1 , v2 : & Val2 | results. push ( logic ( k, v1, v2) ) ;
19
20
21
+ join_delta ( input1, input2, push_result) ;
22
+
23
+ output. insert ( Relation :: from_vec ( results) ) ;
24
+ }
25
+
26
+ pub ( crate ) fn join_and_filter_into < ' me , Key : Ord , Val1 : Ord , Val2 : Ord , Result : Ord > (
27
+ input1 : & Variable < ( Key , Val1 ) > ,
28
+ input2 : impl JoinInput < ' me , ( Key , Val2 ) > ,
29
+ output : & Variable < Result > ,
30
+ mut logic : impl FnMut ( & Key , & Val1 , & Val2 ) -> Option < Result > ,
31
+ ) {
32
+ let mut results = Vec :: new ( ) ;
33
+ let push_result = |k : & Key , v1 : & Val1 , v2 : & Val2 | {
34
+ if let Some ( result) = logic ( k, v1, v2) {
35
+ results. push ( result) ;
36
+ }
37
+ } ;
38
+
39
+ join_delta ( input1, input2, push_result) ;
40
+
41
+ output. insert ( Relation :: from_vec ( results) ) ;
42
+ }
43
+
44
+ /// Joins the `recent` tuples of each input with the `stable` tuples of the other, then the
45
+ /// `recent` tuples of *both* inputs.
46
+ fn join_delta < ' me , Key : Ord , Val1 : Ord , Val2 : Ord > (
47
+ input1 : & Variable < ( Key , Val1 ) > ,
48
+ input2 : impl JoinInput < ' me , ( Key , Val2 ) > ,
49
+ mut result : impl FnMut ( & Key , & Val1 , & Val2 ) ,
50
+ ) {
20
51
let recent1 = input1. recent ( ) ;
21
52
let recent2 = input2. recent ( ) ;
22
53
23
- let mut closure = |k : & Key , v1 : & Val1 , v2 : & Val2 | results. push ( logic ( k, v1, v2) ) ;
24
-
25
54
for batch2 in input2. stable ( ) . iter ( ) {
26
- join_helper ( & recent1, & batch2, & mut closure ) ;
55
+ join_helper ( & recent1, & batch2, & mut result ) ;
27
56
}
28
57
29
58
for batch1 in input1. stable ( ) . iter ( ) {
30
- join_helper ( & batch1, & recent2, & mut closure ) ;
59
+ join_helper ( & batch1, & recent2, & mut result ) ;
31
60
}
32
61
33
- join_helper ( & recent1, & recent2, & mut closure) ;
34
-
35
- output. insert ( Relation :: from_vec ( results) ) ;
62
+ join_helper ( & recent1, & recent2, & mut result) ;
36
63
}
37
64
38
65
/// Join, but for two relations.
0 commit comments