@@ -30,6 +30,67 @@ app.all('/search', function(req, res, next)
30
30
doTemplateQuery ( queryArgs , req . body . db , res , next ) ;
31
31
} ) ;
32
32
33
+ // State for queries in flight. As results come it, acts as a semaphore and sends the results back
34
+ var requestIdCounter = 0
35
+ // Map of request id -> array of results. Results is
36
+ // { query, err, output }
37
+ var requestsPending = { }
38
+
39
+ // Called when a query finishes with an error
40
+ function queryError ( requestId , err , next )
41
+ {
42
+ // We only 1 return error per query so it may have been removed from the list
43
+ if ( requestId in requestsPending )
44
+ {
45
+ // Remove request
46
+ delete requestsPending [ requestId ]
47
+ // Send back error
48
+ next ( err )
49
+ }
50
+ }
51
+
52
+ // Called when query finished
53
+ function queryFinished ( requestId , queryId , results , res , next )
54
+ {
55
+ // We only 1 return error per query so it may have been removed from the list
56
+ if ( requestId in requestsPending )
57
+ {
58
+ var queryStatus = requestsPending [ requestId ]
59
+ // Mark this as finished
60
+ queryStatus [ queryId ] . pending = false
61
+ queryStatus [ queryId ] . results = results
62
+
63
+ // See if we're all done
64
+ var done = true
65
+ for ( var i = 0 ; i < queryStatus . length ; i ++ )
66
+ {
67
+ if ( queryStatus [ i ] . pending == true )
68
+ {
69
+ done = false
70
+ break
71
+ }
72
+ }
73
+
74
+ // If query done, send back results
75
+ if ( done )
76
+ {
77
+ // Concatenate results
78
+ output = [ ]
79
+ for ( var i = 0 ; i < queryStatus . length ; i ++ )
80
+ {
81
+ if ( queryStatus [ i ] . results . datapoints . length > 0 )
82
+ {
83
+ output . push ( queryStatus [ i ] . results )
84
+ }
85
+ }
86
+ res . json ( output ) ;
87
+ next ( )
88
+ // Remove request
89
+ delete requestsPending [ requestId ]
90
+ }
91
+ }
92
+ }
93
+
33
94
// Called to get graph points
34
95
app . all ( '/query' , function ( req , res , next )
35
96
{
@@ -41,26 +102,41 @@ app.all('/query', function(req, res, next)
41
102
"$to" : new Date ( req . body . range . to ) ,
42
103
"$dateBucketCount" : getBucketCount ( req . body . range . from , req . body . range . to , req . body . intervalMs )
43
104
}
44
- tg = req . body . targets [ 0 ] . target
45
- queryArgs = parseQuery ( tg , substitutions )
46
- if ( queryArgs . err != null )
47
- {
48
- next ( queryArgs . err )
49
- }
50
- else
105
+
106
+ // Generate an id to track requests
107
+ const requestId = ++ requestIdCounter
108
+ // Add state for the queries in this request
109
+ var queryStates = [ ]
110
+ requestsPending [ requestId ] = queryStates
111
+ var error = false
112
+
113
+ for ( var queryId = 0 ; queryId < req . body . targets . length && ! error ; queryId ++ )
51
114
{
52
- // Run the query
53
- runAggregateQuery ( req . body , queryArgs , res , next )
115
+ tg = req . body . targets [ queryId ] . target
116
+ queryArgs = parseQuery ( tg , substitutions )
117
+ if ( queryArgs . err != null )
118
+ {
119
+ queryError ( requestId , queryArgs . err , next )
120
+ error = true
121
+ }
122
+ else
123
+ {
124
+ // Add to the state
125
+ queryStates . push ( { pending : true } )
126
+
127
+ // Run the query
128
+ runAggregateQuery ( requestId , queryId , req . body , queryArgs , res , next )
129
+ }
54
130
}
55
131
}
56
132
) ;
57
133
58
- app . use ( function ( error , req , res , next )
59
- {
60
- // Any request to this server will get here, and will send an HTTP
61
- // response with the error message
62
- res . status ( 500 ) . json ( { message : error . message } ) ;
63
- } ) ;
134
+ // app.use(function(error, req, res, next)
135
+ // {
136
+ // // Any request to this server will get here, and will send an HTTP
137
+ // // response with the error message
138
+ // res.status(500).json({ message: error.message });
139
+ // });
64
140
65
141
// Get config from server/default.json
66
142
var serverConfig = config . get ( 'server' ) ;
@@ -169,13 +245,13 @@ function parseQuery(query, substitutions)
169
245
// Run an aggregate query. Must return documents of the form
170
246
// { value : 0.34334, ts : <epoch time in seconds> }
171
247
172
- function runAggregateQuery ( body , queryArgs , res , next )
248
+ function runAggregateQuery ( requestId , queryId , body , queryArgs , res , next )
173
249
{
174
250
MongoClient . connect ( body . db . url , function ( err , client )
175
251
{
176
252
if ( err != null )
177
253
{
178
- next ( err )
254
+ queryError ( requestId , err , next )
179
255
}
180
256
else
181
257
{
@@ -191,7 +267,7 @@ function runAggregateQuery(body, queryArgs, res, next )
191
267
if ( err != null )
192
268
{
193
269
client . close ( ) ;
194
- next ( err )
270
+ queryError ( requestState . requestId , err , next )
195
271
}
196
272
else
197
273
{
@@ -207,15 +283,14 @@ function runAggregateQuery(body, queryArgs, res, next )
207
283
208
284
client . close ( ) ;
209
285
var elapsedTimeMs = stopwatch . stop ( )
210
- output = [ ]
211
- output . push ( { 'target' : tg , 'datapoints' : datapoints } )
286
+ var results = { 'target' : tg , 'datapoints' : datapoints }
212
287
logTiming ( body , elapsedTimeMs , datapoints )
213
- res . json ( output ) ;
214
- next ( )
288
+ // Mark query as finished - will send back results when all queries finished
289
+ queryFinished ( requestId , queryId , results , res , next )
215
290
}
216
291
catch ( err )
217
292
{
218
- next ( err )
293
+ queryError ( requestId , err , next )
219
294
}
220
295
}
221
296
} )
0 commit comments