@@ -260,7 +260,15 @@ bool pushDownPipelineStageIfCompatible(
260
260
const CompatiblePipelineStages& allowedStages,
261
261
bool isLastSource,
262
262
std::vector<std::unique_ptr<InnerPipelineStageInterface>>& stagesForPushdown) {
263
- if (auto groupStage = dynamic_cast <DocumentSourceGroup*>(stage.get ())) {
263
+ if (auto matchStage = dynamic_cast <DocumentSourceMatch*>(stage.get ())) {
264
+ if (!allowedStages.match || matchStage->sbeCompatibility () < minRequiredCompatibility) {
265
+ return false ;
266
+ }
267
+
268
+ stagesForPushdown.emplace_back (
269
+ std::make_unique<InnerPipelineStageImpl>(matchStage, isLastSource));
270
+ return true ;
271
+ } else if (auto groupStage = dynamic_cast <DocumentSourceGroup*>(stage.get ())) {
264
272
if (!allowedStages.group || groupStage->doingMerge () ||
265
273
groupStage->sbeCompatibility () < minRequiredCompatibility) {
266
274
return false ;
@@ -277,6 +285,14 @@ bool pushDownPipelineStageIfCompatible(
277
285
stagesForPushdown.emplace_back (
278
286
std::make_unique<InnerPipelineStageImpl>(lookupStage, isLastSource));
279
287
return true ;
288
+ } else if (auto unwindStage = dynamic_cast <DocumentSourceUnwind*>(stage.get ())) {
289
+ if (!allowedStages.unwind || unwindStage->sbeCompatibility () < minRequiredCompatibility) {
290
+ return false ;
291
+ }
292
+
293
+ stagesForPushdown.emplace_back (
294
+ std::make_unique<InnerPipelineStageImpl>(unwindStage, isLastSource));
295
+ return true ;
280
296
} else if (auto transformStage =
281
297
dynamic_cast <DocumentSourceSingleDocumentTransformation*>(stage.get ())) {
282
298
if (!allowedStages.transform ) {
@@ -294,14 +310,6 @@ bool pushDownPipelineStageIfCompatible(
294
310
return true ;
295
311
}
296
312
return false ;
297
- } else if (auto matchStage = dynamic_cast <DocumentSourceMatch*>(stage.get ())) {
298
- if (!allowedStages.match || matchStage->sbeCompatibility () < minRequiredCompatibility) {
299
- return false ;
300
- }
301
-
302
- stagesForPushdown.emplace_back (
303
- std::make_unique<InnerPipelineStageImpl>(matchStage, isLastSource));
304
- return true ;
305
313
} else if (auto sortStage = dynamic_cast <DocumentSourceSort*>(stage.get ())) {
306
314
if (!allowedStages.sort || !isSortSbeCompatible (sortStage->getSortKeyPattern ())) {
307
315
return false ;
@@ -346,44 +354,88 @@ bool pushDownPipelineStageIfCompatible(
346
354
stagesForPushdown.emplace_back (
347
355
std::make_unique<InnerPipelineStageImpl>(unpackBucketStage, isLastSource));
348
356
return true ;
349
- } else if (auto unwindStage = dynamic_cast <DocumentSourceUnwind*>(stage.get ())) {
350
- if (!allowedStages.unwind || unwindStage->sbeCompatibility () < minRequiredCompatibility) {
351
- return false ;
352
- }
357
+ }
353
358
354
- stagesForPushdown.emplace_back (
355
- std::make_unique<InnerPipelineStageImpl>(unwindStage, isLastSource));
356
- return true ;
359
+ return false ;
360
+ }
361
+
362
+ /* *
363
+ * Prunes $addFields from 'stagesForPushdown' if it is the last stage, subject to additional
364
+ * conditions. (Must be called repeatedly until it returns false.) When splitting a pipeline between
365
+ * SBE and Classic DocumentSource stages, there is often a performance penalty for executing an
366
+ * $addFields in SBE only to immediately translate its output to MutableDocument form for the
367
+ * Classic DocumentSource execution phase. Instead, we keep the $addFields as a DocumentSource.
368
+ *
369
+ * 'alreadyPruned' tells whether the pipeline has had stages pruned away already.
370
+ *
371
+ * Returns true iff it pruned a stage.
372
+ */
373
+ bool pruneTrailingAddFields (
374
+ std::vector<std::unique_ptr<InnerPipelineStageInterface>>& stagesForPushdown,
375
+ bool alreadyPruned) {
376
+ // Push down the entire pipeline when possible. (It's not possible if 'alreadyPruned' is true.)
377
+ if (stagesForPushdown.empty () || (!alreadyPruned && stagesForPushdown.back ()->isLastSource ())) {
378
+ return false ;
357
379
}
358
380
381
+ auto projectionStage =
382
+ dynamic_cast <DocumentSourceInternalProjection*>(stagesForPushdown.back ()->documentSource ());
383
+ if (projectionStage &&
384
+ projectionStage->projection ().type () == projection_ast::ProjectType::kAddition ) {
385
+ stagesForPushdown.pop_back ();
386
+ return true ;
387
+ }
359
388
return false ;
360
389
}
361
390
362
391
/* *
363
- * After copying as many pipeline stages as possible into the 'stagesForPushdown' pipeline, this
364
- * second pass takes off any stages that may not benefit from execution in SBE.
392
+ * Prunes $unwind from 'stagesForPushdown' if it is the last stage. (Must be called repeatedly until
393
+ * it returns false.) This pruning is done because $unwind performance is bottlenecked by processing
394
+ * of EExpressions for sbe::ProjectStages in the SBE VM, which is slower than Classic's native C++
395
+ * projection implementation. Pushing $unwind down only has a performance benefit when doing so
396
+ * allows additional non-$unwind stages to be pushed down after it.
397
+ *
398
+ * Returns true iff it pruned a stage.
365
399
*/
366
- void reconsiderStagesForPushdown (
400
+ bool pruneTrailingUnwind (
367
401
std::vector<std::unique_ptr<InnerPipelineStageInterface>>& stagesForPushdown) {
368
- // Always push down the entire pipeline when possible.
369
- if (stagesForPushdown.empty () || stagesForPushdown.back ()->isLastSource ()) {
370
- return ;
402
+ if (!stagesForPushdown.empty () &&
403
+ dynamic_cast <DocumentSourceUnwind*>(stagesForPushdown.back ()->documentSource ())) {
404
+ stagesForPushdown.pop_back ();
405
+ return true ;
371
406
}
407
+ return false ;
408
+ }
372
409
373
- // When splitting a pipeline between SBE and Classic DocumentSource stages, there is often a
374
- // performance penalty for executing an $addFields in SBE only to immediately translate its
375
- // output to MutableDocument form for the Classic DocumentSource execution phase. Instead, we
376
- // keep the $addFields as a DocumentSource.
410
+ /* *
411
+ * After copying as many pipeline stages as possible into the 'stagesForPushdown' pipeline, this
412
+ * second pass takes off any stages that may not benefit from execution in SBE.
413
+ */
414
+ void prunePushdownStages (
415
+ std::vector<std::unique_ptr<InnerPipelineStageInterface>>& stagesForPushdown,
416
+ SbeCompatibility minRequiredCompatibility) {
417
+ bool pruned = false ; // have any stages been pruned?
418
+ bool prunedThisIteration; // were any stages pruned in the current loop iteration?
377
419
do {
378
- auto projectionStage = dynamic_cast <DocumentSourceInternalProjection*>(
379
- stagesForPushdown.back ()->documentSource ());
380
- if (!projectionStage ||
381
- projectionStage->projection ().type () != projection_ast::ProjectType::kAddition ) {
382
- return ;
420
+ prunedThisIteration = false ;
421
+ if (SbeCompatibility::flagGuarded >= minRequiredCompatibility) {
422
+ // When 'minRequiredCompatibility' is permissive enough (because featureFlagSbeFull is
423
+ // enabled), do not remove trailing $addFields stages.
424
+ } else {
425
+ // Otherwise, remove trailing $addFields stages that we don't expect to improve
426
+ // performance when they execute in SBE.
427
+ if (pruneTrailingAddFields (stagesForPushdown, pruned)) {
428
+ prunedThisIteration = true ;
429
+ pruned = true ;
430
+ }
383
431
}
384
432
385
- stagesForPushdown.pop_back ();
386
- } while (!stagesForPushdown.empty ());
433
+ // $unwind should not be the last stage pushed down as it is more expensive in SBE.
434
+ if (pruneTrailingUnwind (stagesForPushdown)) {
435
+ prunedThisIteration = true ;
436
+ pruned = true ;
437
+ };
438
+ } while (prunedThisIteration);
387
439
}
388
440
389
441
// Limit the number of aggregation pipeline stages that can be "pushed down" to the SBE stage
@@ -543,17 +595,11 @@ std::vector<std::unique_ptr<InnerPipelineStageInterface>> findSbeCompatibleStage
543
595
}
544
596
}
545
597
546
- if (SbeCompatibility::flagGuarded >= minRequiredCompatibility) {
547
- // When 'minRequiredCompatibility' is permissive enough (because featureFlagSbeFull is
548
- // enabled), return 'stagesForPushdown' as is, pushing down as many stages as possible.
549
- } else {
550
- // Otherwise, "reconsider" stages that we don't expect to improve performance when they
551
- // execute in SBE.
552
- reconsiderStagesForPushdown (stagesForPushdown);
553
- }
598
+ // Remove stage patterns where pushing down may degrade performance.
599
+ prunePushdownStages (stagesForPushdown, minRequiredCompatibility);
554
600
555
601
return stagesForPushdown;
556
- }
602
+ } // findSbeCompatibleStagesForPushdown
557
603
558
604
/* *
559
605
* Removes the first 'stagesToRemove' stages from the pipeline. This function is meant to be paired
0 commit comments