1
+ use timely:: dataflow:: operators:: probe:: Handle ;
2
+
3
+ use differential_dataflow:: {
4
+ input:: InputSession ,
5
+ operators:: { Join , Iterate , Reduce , Threshold } ,
6
+ } ;
7
+
8
+ // Types for representing an AST as a collection of data.
9
+ type AstName = usize ;
10
+ type AstNode = ( String , Vec < AstName > ) ;
11
+
12
+ fn main ( ) {
13
+ // Define a timely dataflow runtime
14
+ timely:: execute_from_args ( std:: env:: args ( ) , move |worker| {
15
+
16
+ // Input AST as pairs of `name` and `node`.
17
+ let mut nodes = InputSession :: < _ , ( AstName , AstNode ) , _ > :: new ( ) ;
18
+ // Exogenous equivalences associating AST nodes by name.
19
+ let mut equiv = InputSession :: < _ , ( AstName , AstName ) , _ > :: new ( ) ;
20
+
21
+ // Probe to determine progress / completion.
22
+ let mut probe = Handle :: new ( ) ;
23
+
24
+ // Set up a new computation
25
+ worker. dataflow ( |scope| {
26
+
27
+ let nodes = nodes. to_collection ( scope) ;
28
+ let equiv = equiv. to_collection ( scope) ;
29
+
30
+ // Iteratively develop a map from `Name` to `Name` that closes `equiv` under congruence.
31
+ // Specifically, pairs `(a, b)` where a >= b and b names the equivalence class of a.
32
+ nodes
33
+ . map ( |( name, _) | ( name, name) )
34
+ . iterate ( |eq_class| {
35
+
36
+ // Collection is loop invariant, but must be brought in scope.
37
+ let nodes = nodes. enter ( & eq_class. scope ( ) ) ;
38
+ let equiv = equiv. enter ( & eq_class. scope ( ) ) ;
39
+
40
+ // Separate AST node operators and their arguments.
41
+ let ops = nodes. map ( |( name, ( op, _) ) | ( name, op) ) ;
42
+ let args = nodes. flat_map ( |( name, ( _, args) ) | args. into_iter ( ) . enumerate ( ) . map ( move |( index, arg) | ( arg, ( name, index) ) ) ) ;
43
+
44
+ // Update argument identifiers, and then equate `(Ops, Args)` tuples to inform equivalences.
45
+ let equivalent_asts =
46
+ args. join_map ( eq_class, |_child, & ( node, index) , & eq_class| ( node, ( index, eq_class) ) )
47
+ . reduce ( |_node, input, output| {
48
+ let mut args = Vec :: new ( ) ;
49
+ for ( ( _index, eq_class) , _) in input. iter ( ) {
50
+ args. push ( * eq_class) ;
51
+ }
52
+ output. push ( ( args, 1isize ) ) ;
53
+ } )
54
+ . join_map ( & ops, |node, children, op| ( ( children. clone ( ) , op. clone ( ) ) , * node) )
55
+ . concat ( & nodes. filter ( |( _, ( _, args) ) | args. is_empty ( ) ) . map ( |( node, ( op, _) ) | ( ( vec ! [ ] , op) , node) ) )
56
+ . reduce ( |_key, input, output| {
57
+ for node in input. iter ( ) {
58
+ output. push ( ( ( * ( node. 0 ) , * input[ 0 ] . 0 ) , 1 ) ) ;
59
+ }
60
+ } )
61
+ . map ( |( _key, ( node, eq_class) ) | ( node, eq_class) ) ;
62
+
63
+ // Blend exogenous and endogenous equivalence; find connected components.
64
+ // NB: don't *actually* write connected components this way
65
+ let edges = equivalent_asts. concat ( & equiv) ;
66
+ let symms = edges. map ( |( x, y) |( y, x) ) . concat ( & edges) ;
67
+ symms. iterate ( |reach|
68
+ reach. join_map ( & reach, |_b, a, c| ( * a, * c) )
69
+ . distinct ( )
70
+ )
71
+ . reduce ( |_a, input, output| output. push ( ( * input[ 0 ] . 0 , 1 ) ) )
72
+
73
+ } )
74
+ . consolidate ( )
75
+ . inspect ( |x| println ! ( "{:?}" , x) )
76
+ . probe_with ( & mut probe) ;
77
+ } ) ;
78
+
79
+ nodes. advance_to ( 0 ) ;
80
+ equiv. advance_to ( 0 ) ;
81
+
82
+ println ! ( "Insert `(a x 2) / 2`" ) ;
83
+ nodes. insert ( ( 0 , ( "a" . to_string ( ) , vec ! [ ] ) ) ) ;
84
+ nodes. insert ( ( 1 , ( "2" . to_string ( ) , vec ! [ ] ) ) ) ;
85
+ nodes. insert ( ( 2 , ( "mul" . to_string ( ) , vec ! [ 0 , 1 ] ) ) ) ;
86
+ nodes. insert ( ( 3 , ( "div" . to_string ( ) , vec ! [ 2 , 1 ] ) ) ) ;
87
+
88
+ nodes. advance_to ( 1 ) ; nodes. flush ( ) ;
89
+ equiv. advance_to ( 1 ) ; equiv. flush ( ) ;
90
+
91
+ worker. step_while ( || probe. less_than ( & nodes. time ( ) ) ) ;
92
+ println ! ( "" ) ;
93
+
94
+
95
+ println ! ( "Insert `a x (2 / 2)`" ) ;
96
+ nodes. insert ( ( 4 , ( "2" . to_string ( ) , vec ! [ ] ) ) ) ;
97
+ nodes. insert ( ( 5 , ( "div" . to_string ( ) , vec ! [ 4 , 4 ] ) ) ) ;
98
+ nodes. insert ( ( 6 , ( "a" . to_string ( ) , vec ! [ ] ) ) ) ;
99
+ nodes. insert ( ( 7 , ( "mul" . to_string ( ) , vec ! [ 6 , 5 ] ) ) ) ;
100
+ println ! ( "Equate with the prior term" ) ;
101
+ equiv. insert ( ( 3 , 7 ) ) ;
102
+
103
+ nodes. advance_to ( 2 ) ; nodes. flush ( ) ;
104
+ equiv. advance_to ( 2 ) ; equiv. flush ( ) ;
105
+
106
+ worker. step_while ( || probe. less_than ( & nodes. time ( ) ) ) ;
107
+ println ! ( "" ) ;
108
+
109
+
110
+ println ! ( "Insert `(2 / 2)` and `1` and equate them." ) ;
111
+ nodes. insert ( ( 8 , ( "2" . to_string ( ) , vec ! [ ] ) ) ) ;
112
+ nodes. insert ( ( 9 , ( "div" . to_string ( ) , vec ! [ 8 , 8 ] ) ) ) ;
113
+ nodes. insert ( ( 10 , ( "1" . to_string ( ) , vec ! [ ] ) ) ) ;
114
+ equiv. insert ( ( 9 , 10 ) ) ;
115
+
116
+ nodes. advance_to ( 3 ) ; nodes. flush ( ) ;
117
+ equiv. advance_to ( 3 ) ; equiv. flush ( ) ;
118
+
119
+ worker. step_while ( || probe. less_than ( & nodes. time ( ) ) ) ;
120
+ println ! ( "" ) ;
121
+
122
+
123
+ println ! ( "Insert `a * 1` and `a` and equate them." ) ;
124
+ nodes. insert ( ( 11 , ( "a" . to_string ( ) , vec ! [ ] ) ) ) ;
125
+ nodes. insert ( ( 12 , ( "1" . to_string ( ) , vec ! [ ] ) ) ) ;
126
+ nodes. insert ( ( 13 , ( "mul" . to_string ( ) , vec ! [ 11 , 12 ] ) ) ) ;
127
+ equiv. insert ( ( 11 , 13 ) ) ;
128
+
129
+ nodes. advance_to ( 4 ) ; nodes. flush ( ) ;
130
+ equiv. advance_to ( 4 ) ; equiv. flush ( ) ;
131
+
132
+ worker. step_while ( || probe. less_than ( & nodes. time ( ) ) ) ;
133
+ println ! ( "" ) ;
134
+
135
+
136
+ println ! ( "Oh shoot; '2' could equal zero; undo '2'/'2' == '1')" ) ;
137
+ equiv. remove ( ( 9 , 10 ) ) ;
138
+
139
+ } ) . expect ( "Computation terminated abnormally" ) ;
140
+ }
0 commit comments