Skip to content

Commit 388c177

Browse files
authored
CSHARP-5478: Add support for $rankFusion aggregation stage (#1636)
1 parent a3a2c7e commit 388c177

16 files changed

+909
-2
lines changed

src/MongoDB.Driver/AggregateFluent.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,28 @@ public override IAggregateFluent<TNewResult> Project<TNewResult>(ProjectionDefin
257257
return WithPipeline(_pipeline.Project(projection));
258258
}
259259

260+
public override IAggregateFluent<TNewResult> RankFusion<TNewResult>(
261+
Dictionary<string, PipelineDefinition<TResult, TNewResult>> pipelines,
262+
Dictionary<string, double> weights = null,
263+
RankFusionOptions<TNewResult> options = null)
264+
{
265+
return WithPipeline(_pipeline.RankFusion(pipelines, weights, options));
266+
}
267+
268+
public override IAggregateFluent<TNewResult> RankFusion<TNewResult>(
269+
PipelineDefinition<TResult, TNewResult>[] pipelines,
270+
RankFusionOptions<TNewResult> options = null)
271+
{
272+
return WithPipeline(_pipeline.RankFusion(pipelines, options));
273+
}
274+
275+
public override IAggregateFluent<TNewResult> RankFusion<TNewResult>(
276+
(PipelineDefinition<TResult, TNewResult>, double?)[] pipelinesWithWeights,
277+
RankFusionOptions<TNewResult> options = null)
278+
{
279+
return WithPipeline(_pipeline.RankFusion(pipelinesWithWeights, options));
280+
}
281+
260282
public override IAggregateFluent<TNewResult> ReplaceRoot<TNewResult>(AggregateExpressionDefinition<TResult, TNewResult> newRoot)
261283
{
262284
return WithPipeline(_pipeline.ReplaceRoot(newRoot));

src/MongoDB.Driver/AggregateFluentBase.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,29 @@ public virtual Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> ou
225225
/// <inheritdoc />
226226
public abstract IAggregateFluent<TNewResult> Project<TNewResult>(ProjectionDefinition<TResult, TNewResult> projection);
227227

228+
/// <inheritdoc />
229+
public virtual IAggregateFluent<TNewResult> RankFusion<TNewResult>(
230+
Dictionary<string, PipelineDefinition<TResult, TNewResult>> pipelines,
231+
Dictionary<string, double> weights = null,
232+
RankFusionOptions<TNewResult> options = null)
233+
{
234+
throw new NotImplementedException();
235+
}
236+
237+
/// <inheritdoc />
238+
public virtual IAggregateFluent<TNewResult> RankFusion<TNewResult>(PipelineDefinition<TResult, TNewResult>[] pipelines, RankFusionOptions<TNewResult> options = null)
239+
{
240+
throw new NotImplementedException();
241+
}
242+
243+
/// <inheritdoc />
244+
public virtual IAggregateFluent<TNewResult> RankFusion<TNewResult>(
245+
(PipelineDefinition<TResult, TNewResult>, double?)[] pipelinesWithWeights,
246+
RankFusionOptions<TNewResult> options = null)
247+
{
248+
throw new NotImplementedException();
249+
}
250+
228251
/// <inheritdoc />
229252
public virtual IAggregateFluent<TNewResult> ReplaceRoot<TNewResult>(AggregateExpressionDefinition<TResult, TNewResult> newRoot)
230253
{

src/MongoDB.Driver/Core/Misc/Feature.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public class Feature
8282
private static readonly Feature __loookupDocuments= new Feature("LoookupDocuments", WireVersion.Server60);
8383
private static readonly Feature __mmapV1StorageEngine = new Feature("MmapV1StorageEngine", WireVersion.Zero, WireVersion.Server42);
8484
private static readonly Feature __pickAccumulatorsNewIn52 = new Feature("PickAccumulatorsNewIn52", WireVersion.Server52);
85+
private static readonly Feature __rankFusionStage = new Feature("RankFusionStage", WireVersion.Server81);
8586
private static readonly Feature __regexMatch = new Feature("RegexMatch", WireVersion.Server42);
8687
private static readonly Feature __round = new Feature("Round", WireVersion.Server42);
8788
private static readonly Feature __scramSha256Authentication = new Feature("ScramSha256Authentication", WireVersion.Server40);
@@ -386,6 +387,11 @@ public class Feature
386387
/// </summary>
387388
public static Feature PickAccumulatorsNewIn52 => __pickAccumulatorsNewIn52;
388389

390+
/// <summary>
391+
/// Gets the $rankFusion feature.
392+
/// </summary>
393+
public static Feature RankFusionStage => __rankFusionStage;
394+
389395
/// <summary>
390396
/// Gets the regex match feature.
391397
/// </summary>

src/MongoDB.Driver/IAggregateFluent.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,41 @@ IAggregateFluent<TNewResult> Lookup<TForeignDocument, TAsElement, TAs, TNewResul
365365
/// </returns>
366366
IAggregateFluent<TNewResult> Project<TNewResult>(ProjectionDefinition<TResult, TNewResult> projection);
367367

368+
/// <summary>
369+
/// Appends a $rankFusion stage to the pipeline.
370+
/// </summary>
371+
/// <typeparam name="TNewResult">The type of the new result.</typeparam>
372+
/// <param name="pipelines">The map of named pipelines whose results will be combined. The pipelines must operate on the same collection.</param>
373+
/// <param name="weights">The map of pipeline names to non-negative numerical weights determining result importance during combination. Default weight is 1 when unspecified.</param>
374+
/// <param name="options">The rankFusion options.</param>
375+
/// <returns>The fluent aggregate interface.</returns>
376+
IAggregateFluent<TNewResult> RankFusion<TNewResult>(
377+
Dictionary<string, PipelineDefinition<TResult, TNewResult>> pipelines,
378+
Dictionary<string, double> weights = null,
379+
RankFusionOptions<TNewResult> options = null);
380+
381+
/// <summary>
382+
/// Appends a $rankFusion stage to the pipeline. Pipelines will be automatically named as "pipeline1", "pipeline2", etc.
383+
/// </summary>
384+
/// <typeparam name="TNewResult">The type of the new result.</typeparam>
385+
/// <param name="pipelines">The collection of pipelines whose results will be combined. The pipelines must operate on the same collection.</param>
386+
/// <param name="options">The rankFusion options.</param>
387+
/// <returns>The fluent aggregate interface.</returns>
388+
IAggregateFluent<TNewResult> RankFusion<TNewResult>(
389+
PipelineDefinition<TResult, TNewResult>[] pipelines,
390+
RankFusionOptions<TNewResult> options = null);
391+
392+
/// <summary>
393+
/// Appends a $rankFusion stage to the pipeline. Pipelines will be automatically named as "pipeline1", "pipeline2", etc.
394+
/// </summary>
395+
/// <typeparam name="TNewResult">The type of the new result.</typeparam>
396+
/// <param name="pipelinesWithWeights">The collection of tuples containing (pipeline, weight) pairs. The pipelines must operate on the same collection.</param>
397+
/// <param name="options">The rankFusion options.</param>
398+
/// <returns>The fluent aggregate interface.</returns>
399+
IAggregateFluent<TNewResult> RankFusion<TNewResult>(
400+
(PipelineDefinition<TResult, TNewResult>, double?)[] pipelinesWithWeights,
401+
RankFusionOptions<TNewResult> options = null);
402+
368403
/// <summary>
369404
/// Appends a $replaceRoot stage to the pipeline.
370405
/// </summary>

src/MongoDB.Driver/PipelineDefinitionBuilder.cs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -981,6 +981,65 @@ public static PipelineDefinition<TInput, TOutput> Project<TInput, TIntermediate,
981981
return pipeline.AppendStage(PipelineStageDefinitionBuilder.Project(projection));
982982
}
983983

984+
/// <summary>
985+
/// Appends a $rankFusion stage to the pipeline.
986+
/// </summary>
987+
/// <typeparam name="TInput">The type of the documents.</typeparam>
988+
/// <typeparam name="TOutput">The type of the output documents.</typeparam>
989+
/// <typeparam name="TIntermediate">The type of the intermediate documents.</typeparam>
990+
/// <param name="pipeline">The pipeline.</param>
991+
/// <param name="pipelines">The map of named pipelines whose results will be combined. The pipelines must operate on the same collection.</param>
992+
/// <param name="weights">The map of pipeline names to non-negative numerical weights determining result importance during combination. Default weight is 1 when unspecified.</param>
993+
/// <param name="options">The rankFusion options.</param>
994+
/// <returns>A new pipeline with an additional stage.</returns>
995+
public static PipelineDefinition<TInput, TOutput> RankFusion<TInput, TIntermediate, TOutput>(
996+
this PipelineDefinition<TInput, TIntermediate> pipeline,
997+
Dictionary<string, PipelineDefinition<TIntermediate, TOutput>> pipelines,
998+
Dictionary<string, double> weights = null,
999+
RankFusionOptions<TOutput> options = null)
1000+
{
1001+
Ensure.IsNotNull(pipeline, nameof(pipeline));
1002+
return pipeline.AppendStage(PipelineStageDefinitionBuilder.RankFusion(pipelines, weights, options));
1003+
}
1004+
1005+
/// <summary>
1006+
/// Appends a $rankFusion stage to the pipeline. Pipelines will be automatically named as "pipeline1", "pipeline2", etc.
1007+
/// </summary>
1008+
/// <typeparam name="TInput">The type of the documents.</typeparam>
1009+
/// <typeparam name="TOutput">The type of the output documents.</typeparam>
1010+
/// <typeparam name="TIntermediate">The type of the intermediate documents.</typeparam>
1011+
/// <param name="pipeline">The pipeline.</param>
1012+
/// <param name="pipelines">The collection of pipelines whose results will be combined. The pipelines must operate on the same collection.</param>
1013+
/// <param name="options">The rankFusion options.</param>
1014+
/// <returns>A new pipeline with an additional stage.</returns>
1015+
public static PipelineDefinition<TInput, TOutput> RankFusion<TInput, TIntermediate, TOutput>(
1016+
this PipelineDefinition<TInput, TIntermediate> pipeline,
1017+
PipelineDefinition<TIntermediate, TOutput>[] pipelines,
1018+
RankFusionOptions<TOutput> options = null)
1019+
{
1020+
Ensure.IsNotNull(pipeline, nameof(pipeline));
1021+
return pipeline.AppendStage(PipelineStageDefinitionBuilder.RankFusion(pipelines, options));
1022+
}
1023+
1024+
/// <summary>
1025+
/// Appends a $rankFusion stage to the pipeline. Pipelines will be automatically named as "pipeline1", "pipeline2", etc.
1026+
/// </summary>
1027+
/// <typeparam name="TInput">The type of the documents.</typeparam>
1028+
/// <typeparam name="TOutput">The type of the output documents.</typeparam>
1029+
/// <typeparam name="TIntermediate">The type of the intermediate documents.</typeparam>
1030+
/// <param name="pipeline">The pipeline.</param>
1031+
/// <param name="pipelinesWithWeights">The collection of tuples containing (pipeline, weight) pairs. The pipelines must operate on the same collection.</param>
1032+
/// <param name="options">The rankFusion options.</param>
1033+
/// <returns>A new pipeline with an additional stage.</returns>
1034+
public static PipelineDefinition<TInput, TOutput> RankFusion<TInput, TIntermediate, TOutput>(
1035+
this PipelineDefinition<TInput, TIntermediate> pipeline,
1036+
(PipelineDefinition<TIntermediate, TOutput>, double?)[] pipelinesWithWeights,
1037+
RankFusionOptions<TOutput> options = null)
1038+
{
1039+
Ensure.IsNotNull(pipeline, nameof(pipeline));
1040+
return pipeline.AppendStage(PipelineStageDefinitionBuilder.RankFusion(pipelinesWithWeights, options));
1041+
}
1042+
9841043
/// <summary>
9851044
/// Appends a $replaceRoot stage to the pipeline.
9861045
/// </summary>

src/MongoDB.Driver/PipelineStageDefinitionBuilder.cs

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1431,6 +1431,111 @@ public static PipelineStageDefinition<TInput, SearchMetaResult> SearchMeta<TInpu
14311431
return stage;
14321432
}
14331433

1434+
/// <summary>
1435+
/// Creates a $rankFusion stage.
1436+
/// </summary>
1437+
/// <typeparam name="TInput">The type of the input documents.</typeparam>
1438+
/// <typeparam name="TOutput">The type of the output documents.</typeparam>
1439+
/// <param name="pipelines">The map of named pipelines whose results will be combined. The pipelines must operate on the same collection.</param>
1440+
/// <param name="weights">The map of pipeline names to non-negative numerical weights determining result importance during combination. Default weight is 1 when unspecified.</param>
1441+
/// <param name="options">The rankFusion options.</param>
1442+
/// <returns>The stage.</returns>
1443+
public static PipelineStageDefinition<TInput, TOutput> RankFusion<TInput, TOutput>(
1444+
Dictionary<string, PipelineDefinition<TInput, TOutput>> pipelines,
1445+
Dictionary<string, double> weights = null,
1446+
RankFusionOptions<TOutput> options = null)
1447+
{
1448+
Ensure.IsNotNull(pipelines, nameof(pipelines));
1449+
if (pipelines.Any(pipeline => pipeline.Value == null))
1450+
{
1451+
throw new ArgumentNullException(nameof(pipelines), "Pipelines cannot contain a null pipeline.");
1452+
}
1453+
1454+
const string operatorName = "$rankFusion";
1455+
var stage = new DelegatedPipelineStageDefinition<TInput, TOutput>(
1456+
operatorName,
1457+
args =>
1458+
{
1459+
ClientSideProjectionHelper.ThrowIfClientSideProjection(args.DocumentSerializer, operatorName);
1460+
var renderedPipelines = new BsonDocument();
1461+
foreach (var pipeline in pipelines)
1462+
{
1463+
renderedPipelines.Add(pipeline.Key, new BsonArray(pipeline.Value.Render(args).Documents));
1464+
}
1465+
1466+
var document = new BsonDocument
1467+
{
1468+
{ "input", new BsonDocument("pipelines", renderedPipelines) },
1469+
{
1470+
"combination", () => new BsonDocument
1471+
{
1472+
{ "weights", new BsonDocument(weights.Select(w => new BsonElement(w.Key, w.Value))) }
1473+
},
1474+
weights != null
1475+
},
1476+
{ "scoreDetails", true, options?.ScoreDetails == true }
1477+
};
1478+
1479+
return new RenderedPipelineStageDefinition<TOutput>(
1480+
operatorName,
1481+
new BsonDocument(operatorName, document),
1482+
options?.OutputSerializer ?? args.SerializerRegistry.GetSerializer<TOutput>());
1483+
});
1484+
1485+
return stage;
1486+
}
1487+
1488+
/// <summary>
1489+
/// Creates a $rankFusion stage. Pipelines will be automatically named as "pipeline1", "pipeline2", etc.
1490+
/// </summary>
1491+
/// <typeparam name="TInput">The type of the input documents.</typeparam>
1492+
/// <typeparam name="TOutput">The type of the output documents.</typeparam>
1493+
/// <param name="pipelines">The collection of pipelines whose results will be combined. The pipelines must operate on the same collection.</param>
1494+
/// <param name="options">The rankFusion options.</param>
1495+
/// <returns>The stage.</returns>
1496+
public static PipelineStageDefinition<TInput, TOutput> RankFusion<TInput, TOutput>(
1497+
PipelineDefinition<TInput, TOutput>[] pipelines,
1498+
RankFusionOptions<TOutput> options = null)
1499+
{
1500+
Ensure.IsNotNull(pipelines, nameof(pipelines));
1501+
1502+
var pipelinesMap = new Dictionary<string, PipelineDefinition<TInput, TOutput>>();
1503+
for (var i = 0; i < pipelines.Length; i++)
1504+
{
1505+
pipelinesMap[$"pipeline{i + 1}"] = pipelines[i];
1506+
}
1507+
return RankFusion(pipelinesMap, null, options);
1508+
}
1509+
1510+
/// <summary>
1511+
/// Creates a $rankFusion stage. Pipelines will be automatically named as "pipeline1", "pipeline2", etc.
1512+
/// </summary>
1513+
/// <typeparam name="TInput">The type of the input documents.</typeparam>
1514+
/// <typeparam name="TOutput">The type of the output documents.</typeparam>
1515+
/// <param name="pipelinesWithWeights">The collection of tuples containing (pipeline, weight) pairs. The pipelines must operate on the same collection.</param>
1516+
/// <param name="options">The rankFusion options.</param>
1517+
/// <returns>The stage.</returns>
1518+
public static PipelineStageDefinition<TInput, TOutput> RankFusion<TInput, TOutput>(
1519+
(PipelineDefinition<TInput, TOutput> Pipeline, double? Weight)[] pipelinesWithWeights,
1520+
RankFusionOptions<TOutput> options = null)
1521+
{
1522+
Ensure.IsNotNull(pipelinesWithWeights, nameof(pipelinesWithWeights));
1523+
1524+
var pipelinesMap = new Dictionary<string, PipelineDefinition<TInput, TOutput>>();
1525+
var weightsMap = new Dictionary<string, double>();
1526+
for (var i = 0; i < pipelinesWithWeights.Length; i++)
1527+
{
1528+
var pipelineName = $"pipeline{i + 1}";
1529+
pipelinesMap[pipelineName] = pipelinesWithWeights[i].Pipeline;
1530+
1531+
if (pipelinesWithWeights[i].Weight.HasValue)
1532+
{
1533+
weightsMap[pipelineName] = pipelinesWithWeights[i].Weight.Value;
1534+
}
1535+
}
1536+
return RankFusion(pipelinesMap, weightsMap, options);
1537+
}
1538+
14341539
/// <summary>
14351540
/// Creates a $replaceRoot stage.
14361541
/// </summary>

0 commit comments

Comments
 (0)