|
34 | 34 | #include "mongo/db/query/write_ops/write_ops_exec_util.h"
|
35 | 35 | #include "mongo/db/shard_role.h"
|
36 | 36 | #include "mongo/db/storage/storage_parameters_gen.h"
|
| 37 | +#include "mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.h" |
| 38 | +#include "mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h" |
37 | 39 | #include "mongo/db/timeseries/bucket_catalog/global_bucket_catalog.h"
|
38 | 40 | #include "mongo/db/timeseries/bucket_compression.h"
|
39 | 41 | #include "mongo/db/timeseries/bucket_compression_failure.h"
|
@@ -921,6 +923,138 @@ size_t performOrderedTimeseriesWrites(OperationContext* opCtx,
|
921 | 923 | return request.getDocuments().size();
|
922 | 924 | }
|
923 | 925 |
|
| 926 | +StatusWith<std::vector<bucket_catalog::BatchedInsertContext>> buildBatchedInsertContextsNoMetaField( |
| 927 | + const bucket_catalog::BucketCatalog& bucketCatalog, |
| 928 | + const UUID& collectionUUID, |
| 929 | + const TimeseriesOptions& timeseriesOptions, |
| 930 | + const std::vector<BSONObj>& userMeasurementsBatch, |
| 931 | + bucket_catalog::ExecutionStatsController& stats, |
| 932 | + tracking::Context& trackingContext) { |
| 933 | + |
| 934 | + std::vector<bucket_catalog::BatchedInsertTuple> batchedInsertTupleVector; |
| 935 | + |
| 936 | + // As part of the InsertBatchTuple struct we store the index of the measurement in the original |
| 937 | + // user batch for error reporting and retryability purposes. |
| 938 | + for (size_t i = 0; i < userMeasurementsBatch.size(); i++) { |
| 939 | + auto swTime = |
| 940 | + bucket_catalog::extractTime(userMeasurementsBatch[i], timeseriesOptions.getTimeField()); |
| 941 | + if (!swTime.isOK()) { |
| 942 | + return swTime.getStatus(); |
| 943 | + } |
| 944 | + batchedInsertTupleVector.emplace_back(userMeasurementsBatch[i], swTime.getValue(), i); |
| 945 | + } |
| 946 | + |
| 947 | + // Empty metadata. |
| 948 | + BSONElement metadata; |
| 949 | + auto bucketKey = bucket_catalog::BucketKey{ |
| 950 | + collectionUUID, bucket_catalog::BucketMetadata{trackingContext, metadata, boost::none}}; |
| 951 | + auto stripeNumber = bucket_catalog::internal::getStripeNumber(bucketCatalog, bucketKey); |
| 952 | + |
| 953 | + std::sort( |
| 954 | + batchedInsertTupleVector.begin(), batchedInsertTupleVector.end(), [](auto& lhs, auto& rhs) { |
| 955 | + // Sort measurements on their timeField. |
| 956 | + return std::get<Date_t>(lhs) < std::get<Date_t>(rhs); |
| 957 | + }); |
| 958 | + |
| 959 | + return {{bucket_catalog::BatchedInsertContext( |
| 960 | + bucketKey, stripeNumber, timeseriesOptions, stats, batchedInsertTupleVector)}}; |
| 961 | +}; |
| 962 | + |
| 963 | +StatusWith<std::vector<bucket_catalog::BatchedInsertContext>> |
| 964 | +buildBatchedInsertContextsWithMetaField(const bucket_catalog::BucketCatalog& bucketCatalog, |
| 965 | + const UUID& collectionUUID, |
| 966 | + const TimeseriesOptions& timeseriesOptions, |
| 967 | + const std::vector<BSONObj>& userMeasurementsBatch, |
| 968 | + StringData metaFieldName, |
| 969 | + bucket_catalog::ExecutionStatsController& stats, |
| 970 | + tracking::Context& trackingContext) { |
| 971 | + // Maps from the string representation of a distinct metaField value to a vector of |
| 972 | + // BatchedInsertTuples whose measurements have that same metaField value. |
| 973 | + stdx::unordered_map<std::string, std::vector<bucket_catalog::BatchedInsertTuple>> |
| 974 | + metaFieldToBatchedInsertTuples; |
| 975 | + // Maps from the string representation of a metaField value to the BSONElement of that metaField |
| 976 | + // value. Workaround for the fact that BSONElements are not hashable. |
| 977 | + stdx::unordered_map<std::string, BSONElement> metaFieldStringToBSONElement; |
| 978 | + |
| 979 | + // Go through the vector of user measurements and create a map from each distinct metaField |
| 980 | + // value to a vector of InsertBatchTuples for that metaField. As part of the InsertBatchTuple |
| 981 | + // struct we store the index of the measurement in the original user batch for error reporting |
| 982 | + // and retryability purposes. |
| 983 | + for (size_t i = 0; i < userMeasurementsBatch.size(); i++) { |
| 984 | + auto swTimeAndMeta = |
| 985 | + bucket_catalog::extractTimeAndMeta(userMeasurementsBatch[i], |
| 986 | + timeseriesOptions.getTimeField(), |
| 987 | + timeseriesOptions.getMetaField().get()); |
| 988 | + if (!swTimeAndMeta.isOK()) { |
| 989 | + return swTimeAndMeta.getStatus(); |
| 990 | + } |
| 991 | + auto time = std::get<Date_t>(swTimeAndMeta.getValue()); |
| 992 | + auto meta = std::get<BSONElement>(swTimeAndMeta.getValue()); |
| 993 | + |
| 994 | + metaFieldStringToBSONElement.try_emplace(meta.String(), meta); |
| 995 | + metaFieldToBatchedInsertTuples.try_emplace( |
| 996 | + meta.String(), std::vector<bucket_catalog::BatchedInsertTuple>{}); |
| 997 | + |
| 998 | + metaFieldToBatchedInsertTuples[meta.String()].emplace_back( |
| 999 | + userMeasurementsBatch[i], time, i); |
| 1000 | + } |
| 1001 | + |
| 1002 | + std::vector<bucket_catalog::BatchedInsertContext> batchedInsertContexts; |
| 1003 | + |
| 1004 | + // Go through all meta-unique batches, sort by time, and fill result |
| 1005 | + for (auto& [metaFieldString, batchedInsertTupleVector] : metaFieldToBatchedInsertTuples) { |
| 1006 | + std::sort(batchedInsertTupleVector.begin(), |
| 1007 | + batchedInsertTupleVector.end(), |
| 1008 | + [](auto& lhs, auto& rhs) { |
| 1009 | + // Sort measurements on their timeField. |
| 1010 | + return std::get<Date_t>(lhs) < std::get<Date_t>(rhs); |
| 1011 | + }); |
| 1012 | + auto metadata = metaFieldStringToBSONElement[metaFieldString]; |
| 1013 | + auto bucketKey = bucket_catalog::BucketKey{ |
| 1014 | + collectionUUID, bucket_catalog::BucketMetadata{trackingContext, metadata, boost::none}}; |
| 1015 | + auto stripeNumber = bucket_catalog::internal::getStripeNumber(bucketCatalog, bucketKey); |
| 1016 | + |
| 1017 | + batchedInsertContexts.emplace_back(bucket_catalog::BatchedInsertContext( |
| 1018 | + bucketKey, stripeNumber, timeseriesOptions, stats, batchedInsertTupleVector)); |
| 1019 | + } |
| 1020 | + |
| 1021 | + return batchedInsertContexts; |
| 1022 | +} |
| 1023 | + |
| 1024 | +StatusWith<std::vector<bucket_catalog::BatchedInsertContext>> buildBatchedInsertContexts( |
| 1025 | + bucket_catalog::BucketCatalog& bucketCatalog, |
| 1026 | + const UUID& collectionUUID, |
| 1027 | + const TimeseriesOptions& timeseriesOptions, |
| 1028 | + const std::vector<BSONObj>& userMeasurementsBatch) { |
| 1029 | + |
| 1030 | + auto metaFieldName = timeseriesOptions.getMetaField(); |
| 1031 | + auto& trackingContext = bucket_catalog::getTrackingContext( |
| 1032 | + bucketCatalog.trackingContexts, bucket_catalog::TrackingScope::kOpenBucketsByKey); |
| 1033 | + auto stats = |
| 1034 | + bucket_catalog::internal::getOrInitializeExecutionStats(bucketCatalog, collectionUUID); |
| 1035 | + |
| 1036 | + auto swBatchedInsertContexts = (metaFieldName) |
| 1037 | + ? buildBatchedInsertContextsWithMetaField(bucketCatalog, |
| 1038 | + collectionUUID, |
| 1039 | + timeseriesOptions, |
| 1040 | + userMeasurementsBatch, |
| 1041 | + metaFieldName.get(), |
| 1042 | + stats, |
| 1043 | + trackingContext) |
| 1044 | + : buildBatchedInsertContextsNoMetaField(bucketCatalog, |
| 1045 | + collectionUUID, |
| 1046 | + timeseriesOptions, |
| 1047 | + userMeasurementsBatch, |
| 1048 | + stats, |
| 1049 | + trackingContext); |
| 1050 | + |
| 1051 | + if (!swBatchedInsertContexts.isOK()) { |
| 1052 | + return swBatchedInsertContexts.getStatus(); |
| 1053 | + } |
| 1054 | + |
| 1055 | + return swBatchedInsertContexts.getValue(); |
| 1056 | +} |
| 1057 | + |
924 | 1058 | } // namespace internal
|
925 | 1059 |
|
926 | 1060 | } // namespace mongo::timeseries::write_ops
|
0 commit comments