1
+ use timely:: dataflow:: operators:: probe:: Handle ;
2
+
3
+ use differential_dataflow:: {
4
+ Collection ,
5
+ input:: InputSession ,
6
+ operators:: { Join , Iterate , Reduce } ,
7
+ } ;
8
+
9
+ // We want to represent collections of ast nodes, perhaps as `(Id, (Op, Vec<Id>))`.
10
+ // Here the first `Id` is a unique identifier for the node, and the `Vec<Id>` are
11
+ // identifiers for the children. We'll want to be able to mutate the child identifiers.
12
+
13
+ // Perhaps we represent this as collections
14
+ // 1. Ops: (Id, Op),
15
+ // 2. Args: (Id, usize, Id),
16
+ // Congruence amounts to
17
+
18
+ fn main ( ) {
19
+ // Define a timely dataflow computation
20
+ timely:: execute_from_args ( std:: env:: args ( ) , move |worker| {
21
+
22
+ // Create an input collection of data
23
+ let mut input = InputSession :: new ( ) ;
24
+ let mut equivs = InputSession :: new ( ) ;
25
+
26
+ // Probe to determine progress / completion.
27
+ let mut probe = Handle :: new ( ) ;
28
+
29
+ // Define a new computation
30
+ worker. dataflow ( |scope| {
31
+
32
+ // Create a new collection from our input
33
+ // Our AST nodes will be modeled as a collection of `(Id, (Op, Vec<Id>))` records.
34
+ // Each entry is an AST node, with an identifier, an operator, and a list of child identifiers.
35
+ let ast_nodes: Collection < _ , ( usize , ( String , Vec < usize > ) ) > = input. to_collection ( scope) ;
36
+
37
+ // Exogenous equivalences, mapping identifiers to (lesser) canonical identifiers.
38
+ // This map is not necessarily transitively closed, nor complete for all identifiers.
39
+ let equivs = equivs. to_collection ( scope) ;
40
+
41
+ // Iteratively develop a map from `Id` to `Id` that canonicalizes identifiers.
42
+ // This involves both exogenous equivalences, and those from equivalent AST nodes.
43
+ ast_nodes
44
+ . map ( |( id, _) | ( id, id) )
45
+ . iterate ( |canonical| {
46
+
47
+ // Collection is loop invariant, but must be brought in scope.
48
+ let ast_nodes = ast_nodes. enter ( & canonical. scope ( ) ) ;
49
+ let equivs = equivs. enter ( & canonical. scope ( ) ) ;
50
+
51
+ // Separate AST node operators and their arguments.
52
+ let ops = ast_nodes. map ( |( id, ( op, _) ) | ( id, op) ) ;
53
+ let args = ast_nodes. flat_map ( |( id, ( _, args) ) | args. into_iter ( ) . enumerate ( ) . map ( move |( index, arg) | ( arg, ( id, index) ) ) ) ;
54
+
55
+ // Update argument identifiers, and then equate `(Ops, Args)` tuples to inform equivalences.
56
+ let equivalent_asts =
57
+ args. join_map ( canonical, |_child, & ( node, index) , & canonical| ( node, ( index, canonical) ) )
58
+ . reduce ( |_node, input, output| {
59
+ let mut args = Vec :: new ( ) ;
60
+ for ( ( _index, canonical) , _) in input. iter ( ) {
61
+ args. push ( * canonical) ;
62
+ }
63
+ output. push ( ( args, 1isize ) ) ;
64
+ } )
65
+ . join_map ( & ops, |node, children, op| ( ( children. clone ( ) , op. clone ( ) ) , * node) )
66
+ . concat ( & ast_nodes. filter ( |( _, ( _, args) ) | args. is_empty ( ) ) . map ( |( node, ( op, _) ) | ( ( vec ! [ ] , op) , node) ) )
67
+ . reduce ( |_key, input, output| {
68
+ for node in input. iter ( ) {
69
+ output. push ( ( ( * ( node. 0 ) , * input[ 0 ] . 0 ) , 1 ) ) ;
70
+ }
71
+ } )
72
+ . map ( |( _key, ( node, canonical) ) | ( node, canonical) ) ;
73
+
74
+ // Blend together the two forms of equivalence, and compute the transitive closure.
75
+ equivalent_asts
76
+ . concat ( & equivs)
77
+ . reduce ( |_node, input, output| { output. push ( ( * input[ 0 ] . 0 , 1 ) ) ; } )
78
+ . iterate ( |inner| {
79
+ inner. map ( |( node, canonical) | ( canonical, node) )
80
+ . join_map ( & inner, |_canonical, & node, & canonical| ( node, canonical) )
81
+ } )
82
+ } )
83
+ . consolidate ( )
84
+ . inspect ( |x| println ! ( "{:?}" , x) )
85
+ . probe_with ( & mut probe) ;
86
+ } ) ;
87
+
88
+ input. advance_to ( 0 ) ;
89
+ input. insert ( ( 0 , ( "a" . to_string ( ) , vec ! [ ] ) ) ) ;
90
+ input. insert ( ( 1 , ( "b" . to_string ( ) , vec ! [ ] ) ) ) ;
91
+ input. insert ( ( 2 , ( "c" . to_string ( ) , vec ! [ ] ) ) ) ;
92
+ input. insert ( ( 3 , ( "add" . to_string ( ) , vec ! [ 0 , 2 ] ) ) ) ;
93
+ input. insert ( ( 4 , ( "add" . to_string ( ) , vec ! [ 1 , 2 ] ) ) ) ;
94
+
95
+ equivs. advance_to ( 0 ) ;
96
+
97
+ input. advance_to ( 1 ) ;
98
+ equivs. advance_to ( 1 ) ;
99
+ input. flush ( ) ;
100
+ equivs. flush ( ) ;
101
+
102
+ worker. step_while ( || probe. less_than ( & input. time ( ) ) ) ;
103
+ println ! ( "" ) ;
104
+ println ! ( "Marking 0 equivalent to 1" ) ;
105
+
106
+ equivs. insert ( ( 1 , 0 ) ) ;
107
+ ]
108
+ input. advance_to ( 2 ) ;
109
+ equivs. advance_to ( 2 ) ;
110
+ input. flush ( ) ;
111
+ equivs. flush ( ) ;
112
+
113
+ worker. step_while ( || probe. less_than ( & input. time ( ) ) ) ;
114
+ println ! ( "" ) ;
115
+ println ! ( "Un-marking 0 equivalent to 1" ) ;
116
+
117
+ equivs. remove ( ( 1 , 0 ) ) ;
118
+
119
+ } ) . expect ( "Computation terminated abnormally" ) ;
120
+ }
0 commit comments