6
6
package query
7
7
8
8
import (
9
+ "sync"
10
+
9
11
"github.com/golang/glog"
10
12
"github.com/pkg/errors"
11
13
@@ -16,7 +18,7 @@ type mathTree struct {
16
18
Fn string
17
19
Var string
18
20
Const types.Val // If its a const value node.
19
- Val map [ uint64 ] types.Val
21
+ Val * types.ShardedMap
20
22
Child []* mathTree
21
23
}
22
24
@@ -36,24 +38,23 @@ var (
36
38
// processBinary handles the binary operands like
37
39
// +, -, *, /, %, max, min, logbase, dot
38
40
func processBinary (mNode * mathTree ) error {
39
- destMap := make (map [uint64 ]types.Val )
40
41
aggName := mNode .Fn
41
42
42
43
mpl := mNode .Child [0 ].Val
43
44
mpr := mNode .Child [1 ].Val
44
45
cl := mNode .Child [0 ].Const
45
46
cr := mNode .Child [1 ].Const
46
47
47
- f := func (k uint64 ) error {
48
+ f := func (k uint64 , lshard , rshard , destMapi * map [ uint64 ]types. Val ) error {
48
49
ag := aggregator {
49
50
name : aggName ,
50
51
}
51
- lVal := mpl [k ]
52
+ lVal := ( * lshard ) [k ]
52
53
if cl .Value != nil {
53
54
// Use the constant value that was supplied.
54
55
lVal = cl
55
56
}
56
- rVal := mpr [k ]
57
+ rVal := ( * rshard ) [k ]
57
58
if cr .Value != nil {
58
59
// Use the constant value that was supplied.
59
60
rVal = cr
@@ -66,7 +67,7 @@ func processBinary(mNode *mathTree) error {
66
67
if err != nil {
67
68
return err
68
69
}
69
- destMap [k ], err = ag .Value ()
70
+ ( * destMapi ) [k ], err = ag .Value ()
70
71
if err != nil {
71
72
return err
72
73
}
@@ -75,12 +76,12 @@ func processBinary(mNode *mathTree) error {
75
76
76
77
// If mpl or mpr have 0 and just 0 in it, that means it's an output of aggregation somewhere.
77
78
// This value would need to be applied to all.
78
- checkAggrResult := func (value map [ uint64 ] types.Val ) (types.Val , bool ) {
79
- if len ( value ) != 1 {
79
+ checkAggrResult := func (value * types.ShardedMap ) (types.Val , bool ) {
80
+ if value . Len ( ) != 1 {
80
81
return types.Val {}, false
81
82
}
82
83
83
- val , ok := value [ 0 ]
84
+ val , ok := value . Get ( 0 )
84
85
return val , ok
85
86
}
86
87
@@ -92,21 +93,31 @@ func processBinary(mNode *mathTree) error {
92
93
mpr = nil
93
94
}
94
95
95
- if len (mpl ) != 0 || len (mpr ) != 0 {
96
- for k := range mpr {
97
- if err := f (k ); err != nil {
98
- return err
99
- }
100
- }
101
- for k := range mpl {
102
- if _ , ok := mpr [k ]; ok {
103
- continue
104
- }
105
- if err := f (k ); err != nil {
106
- return err
107
- }
96
+ if mpl .Len () != 0 || mpr .Len () != 0 {
97
+ var wg sync.WaitGroup
98
+ returnMap := types .NewShardedMap ()
99
+
100
+ for i := range types .NumShards {
101
+ wg .Add (1 )
102
+ mlps := mpl .GetShardOrNil (i )
103
+ mprs := mpr .GetShardOrNil (i )
104
+ destMapi := returnMap .GetShardOrNil (i )
105
+ go func (i int ) {
106
+ defer wg .Done ()
107
+ for k := range mlps {
108
+ f (k , & mlps , & mprs , & destMapi )
109
+ }
110
+ for k := range mprs {
111
+ if _ , ok := mlps [k ]; ok {
112
+ continue
113
+ }
114
+ f (k , & mlps , & mprs , & destMapi )
115
+ }
116
+ }(i )
108
117
}
109
- mNode .Val = destMap
118
+
119
+ wg .Wait ()
120
+ mNode .Val = returnMap
110
121
return nil
111
122
}
112
123
@@ -132,8 +143,8 @@ func processBinary(mNode *mathTree) error {
132
143
// processUnary handles the unary operands like
133
144
// u-, log, exp, since, floor, ceil
134
145
func processUnary (mNode * mathTree ) error {
135
- destMap := make (map [uint64 ]types.Val )
136
146
srcMap := mNode .Child [0 ].Val
147
+ destMap := types .NewShardedMap ()
137
148
aggName := mNode .Fn
138
149
ch := mNode .Child [0 ]
139
150
ag := aggregator {
@@ -149,16 +160,18 @@ func processUnary(mNode *mathTree) error {
149
160
return err
150
161
}
151
162
152
- for k , val := range srcMap {
163
+ srcMap . Iterate ( func ( k uint64 , val types. Val ) error {
153
164
err := ag .ApplyVal (val )
154
165
if err != nil {
155
166
return err
156
167
}
157
- destMap [ k ] , err = ag .Value ()
168
+ value , err : = ag .Value ()
158
169
if err != nil {
159
170
return err
160
171
}
161
- }
172
+ destMap .Set (k , value )
173
+ return nil
174
+ })
162
175
mNode .Val = destMap
163
176
return nil
164
177
@@ -168,14 +181,14 @@ func processUnary(mNode *mathTree) error {
168
181
// return a boolean value.
169
182
// All the inequality operators (<, >, <=, >=, !=, ==)
170
183
func processBinaryBoolean (mNode * mathTree ) error {
171
- destMap := make (map [uint64 ]types.Val )
172
184
srcMap := mNode .Child [0 ].Val
185
+ destMap := types .NewShardedMap ()
173
186
aggName := mNode .Fn
174
187
175
188
ch := mNode .Child [1 ]
176
189
curMap := ch .Val
177
- for k , val := range srcMap {
178
- curVal := curMap [ k ]
190
+ srcMap . Iterate ( func ( k uint64 , val types. Val ) error {
191
+ curVal , _ := curMap . Get ( k )
179
192
if ch .Const .Value != nil {
180
193
// Use the constant value that was supplied.
181
194
curVal = ch .Const
@@ -184,28 +197,29 @@ func processBinaryBoolean(mNode *mathTree) error {
184
197
if err != nil {
185
198
return errors .Wrapf (err , "Wrong values in comparison function." )
186
199
}
187
- destMap [ k ] = types.Val {
200
+ destMap . Set ( k , types.Val {
188
201
Tid : types .BoolID ,
189
202
Value : res ,
190
- }
191
- }
203
+ })
204
+ return nil
205
+ })
192
206
mNode .Val = destMap
193
207
return nil
194
208
}
195
209
196
210
// processTernary handles the ternary operand cond()
197
211
func processTernary (mNode * mathTree ) error {
198
- destMap := make ( map [ uint64 ] types.Val )
212
+ destMap := types .NewShardedMap ( )
199
213
aggName := mNode .Fn
200
214
condMap := mNode .Child [0 ].Val
201
- if len ( condMap ) == 0 {
215
+ if condMap . IsEmpty () {
202
216
return errors .Errorf ("Expected a value variable in %v but missing." , aggName )
203
217
}
204
218
varOne := mNode .Child [1 ].Val
205
219
varTwo := mNode .Child [2 ].Val
206
220
constOne := mNode .Child [1 ].Const
207
221
constTwo := mNode .Child [2 ].Const
208
- for k , val := range condMap {
222
+ condMap . Iterate ( func ( k uint64 , val types. Val ) error {
209
223
var res types.Val
210
224
v , ok := val .Value .(bool )
211
225
if ! ok {
@@ -216,18 +230,19 @@ func processTernary(mNode *mathTree) error {
216
230
if constOne .Value != nil {
217
231
res = constOne
218
232
} else {
219
- res = varOne [ k ]
233
+ res , _ = varOne . Get ( k )
220
234
}
221
235
} else {
222
236
// Pick the value of second map.
223
237
if constTwo .Value != nil {
224
238
res = constTwo
225
239
} else {
226
- res = varTwo [ k ]
240
+ res , _ = varTwo . Get ( k )
227
241
}
228
242
}
229
- destMap [k ] = res
230
- }
243
+ destMap .Set (k , res )
244
+ return nil
245
+ })
231
246
mNode .Val = destMap
232
247
return nil
233
248
}
@@ -237,7 +252,7 @@ func evalMathTree(mNode *mathTree) error {
237
252
return nil
238
253
}
239
254
if mNode .Var != "" {
240
- if len ( mNode .Val ) == 0 {
255
+ if mNode .Val . IsEmpty () {
241
256
glog .V (2 ).Infof ("Variable %v not yet populated or missing." , mNode .Var )
242
257
}
243
258
// This is a leaf node whose value is already populated. So return.
@@ -246,13 +261,13 @@ func evalMathTree(mNode *mathTree) error {
246
261
247
262
for _ , child := range mNode .Child {
248
263
// Process the child nodes first.
249
- err := evalMathTree (child )
250
- if err != nil {
264
+ if err := evalMathTree (child ); err != nil {
251
265
return err
252
266
}
253
267
}
254
268
255
269
aggName := mNode .Fn
270
+
256
271
if isUnary (aggName ) {
257
272
if len (mNode .Child ) != 1 {
258
273
return errors .Errorf ("Function %v expects 1 argument. But got: %v" , aggName ,
0 commit comments