@@ -24,12 +24,15 @@ template<bool IsWide>
24
24
class TDqInputUnionStreamValue : public TComputationValue <TDqInputUnionStreamValue<IsWide>> {
25
25
using TBase = TComputationValue<TDqInputUnionStreamValue<IsWide>>;
26
26
public:
27
- TDqInputUnionStreamValue (TMemoryUsageInfo* memInfo, const NKikimr::NMiniKQL::TType* type, TVector<IDqInput::TPtr>&& inputs, TDqMeteringStats::TInputStatsMeter stats)
27
+ TDqInputUnionStreamValue (TMemoryUsageInfo* memInfo, const NKikimr::NMiniKQL::TType* type, TVector<IDqInput::TPtr>&& inputs,
28
+ TDqMeteringStats::TInputStatsMeter stats, TInstant& startTs, bool & inputConsumed)
28
29
: TBase(memInfo)
29
30
, Inputs(std::move(inputs))
30
31
, Alive(Inputs.size())
31
32
, Batch(type)
32
33
, Stats(stats)
34
+ , StartTs(startTs)
35
+ , InputConsumed(inputConsumed)
33
36
{}
34
37
35
38
private:
@@ -41,6 +44,10 @@ class TDqInputUnionStreamValue : public TComputationValue<TDqInputUnionStreamVal
41
44
case NUdf::EFetchStatus::Ok:
42
45
break ;
43
46
case NUdf::EFetchStatus::Finish:
47
+ if (Y_UNLIKELY (!StartTs)) {
48
+ StartTs = Now ();
49
+ }
50
+ [[fallthrough]];
44
51
case NUdf::EFetchStatus::Yield:
45
52
return status;
46
53
}
@@ -52,6 +59,10 @@ class TDqInputUnionStreamValue : public TComputationValue<TDqInputUnionStreamVal
52
59
if (Stats) {
53
60
Stats.Add (result);
54
61
}
62
+ if (Y_UNLIKELY (!StartTs)) {
63
+ StartTs = Now ();
64
+ }
65
+ InputConsumed = true ;
55
66
return NUdf::EFetchStatus::Ok;
56
67
}
57
68
@@ -63,6 +74,10 @@ class TDqInputUnionStreamValue : public TComputationValue<TDqInputUnionStreamVal
63
74
case NUdf::EFetchStatus::Ok:
64
75
break ;
65
76
case NUdf::EFetchStatus::Finish:
77
+ if (Y_UNLIKELY (!StartTs)) {
78
+ StartTs = Now ();
79
+ }
80
+ [[fallthrough]];
66
81
case NUdf::EFetchStatus::Yield:
67
82
return status;
68
83
}
@@ -76,6 +91,10 @@ class TDqInputUnionStreamValue : public TComputationValue<TDqInputUnionStreamVal
76
91
if (Stats) {
77
92
Stats.Add (result, width);
78
93
}
94
+ if (Y_UNLIKELY (!StartTs)) {
95
+ StartTs = Now ();
96
+ }
97
+ InputConsumed = true ;
79
98
return NUdf::EFetchStatus::Ok;
80
99
}
81
100
@@ -108,19 +127,23 @@ class TDqInputUnionStreamValue : public TComputationValue<TDqInputUnionStreamVal
108
127
size_t Index = 0 ;
109
128
TUnboxedValueBatch Batch;
110
129
TDqMeteringStats::TInputStatsMeter Stats;
130
+ TInstant& StartTs;
131
+ bool & InputConsumed;
111
132
};
112
133
113
134
template <bool IsWide>
114
135
class TDqInputMergeStreamValue : public TComputationValue <TDqInputMergeStreamValue<IsWide>> {
115
136
using TBase = TComputationValue<TDqInputMergeStreamValue<IsWide>>;
116
137
public:
117
138
TDqInputMergeStreamValue (TMemoryUsageInfo* memInfo, const NKikimr::NMiniKQL::TType* type, TVector<IDqInput::TPtr>&& inputs,
118
- TVector<TSortColumnInfo>&& sortCols, TDqMeteringStats::TInputStatsMeter stats)
139
+ TVector<TSortColumnInfo>&& sortCols, TDqMeteringStats::TInputStatsMeter stats, TInstant& startTs, bool & inputConsumed )
119
140
: TBase(memInfo)
120
141
, Inputs(std::move(inputs))
121
142
, Width(type->IsMulti () ? static_cast<const NMiniKQL::TMultiType*>(type)->GetElementsCount() : TMaybe<ui32>())
122
143
, SortCols(std::move(sortCols))
123
144
, Stats(stats)
145
+ , StartTs(startTs)
146
+ , InputConsumed(inputConsumed)
124
147
{
125
148
YQL_ENSURE (!IsWide ^ Width.Defined ());
126
149
CurrentBuffers.reserve (Inputs.size ());
@@ -196,6 +219,10 @@ class TDqInputMergeStreamValue : public TComputationValue<TDqInputMergeStreamVal
196
219
case NUdf::EFetchStatus::Ok:
197
220
break ;
198
221
case NUdf::EFetchStatus::Finish:
222
+ if (Y_UNLIKELY (!StartTs)) {
223
+ StartTs = Now ();
224
+ }
225
+ [[fallthrough]];
199
226
case NUdf::EFetchStatus::Yield:
200
227
return status;
201
228
}
@@ -204,6 +231,10 @@ class TDqInputMergeStreamValue : public TComputationValue<TDqInputMergeStreamVal
204
231
if (Stats) {
205
232
Stats.Add (result);
206
233
}
234
+ if (Y_UNLIKELY (!StartTs)) {
235
+ StartTs = Now ();
236
+ }
237
+ InputConsumed = true ;
207
238
return NUdf::EFetchStatus::Ok;
208
239
}
209
240
@@ -223,6 +254,7 @@ class TDqInputMergeStreamValue : public TComputationValue<TDqInputMergeStreamVal
223
254
if (Stats) {
224
255
Stats.Add (result, width);
225
256
}
257
+ InputConsumed = true ;
226
258
return NUdf::EFetchStatus::Ok;
227
259
}
228
260
@@ -309,6 +341,8 @@ class TDqInputMergeStreamValue : public TComputationValue<TDqInputMergeStreamVal
309
341
ui32 InitializationIndex = 0 ;
310
342
TMap<ui32, EDataSlot> SortColTypes;
311
343
TDqMeteringStats::TInputStatsMeter Stats;
344
+ TInstant& StartTs;
345
+ bool & InputConsumed;
312
346
};
313
347
314
348
TVector<NKikimr::NMiniKQL::TType*> ExtractBlockItemTypes (const NKikimr::NMiniKQL::TType* type) {
@@ -380,7 +414,7 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
380
414
public:
381
415
TDqInputMergeBlockStreamValue (TMemoryUsageInfo* memInfo, const NKikimr::NMiniKQL::TType* type, TVector<IDqInput::TPtr>&& inputs,
382
416
TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats,
383
- NUdf::IPgBuilder* pgBuilder)
417
+ TInstant& startTs, bool & inputConsumed, NUdf::IPgBuilder* pgBuilder)
384
418
: TBase(memInfo)
385
419
, SortCols_(std::move(sortCols))
386
420
, ItemTypes_(ExtractBlockItemTypes(type))
@@ -389,6 +423,8 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
389
423
, Builders_(MakeBuilders(MaxOutputBlockLen_, ItemTypes_, pgBuilder))
390
424
, Factory_(factory)
391
425
, Stats_(stats)
426
+ , StartTs(startTs)
427
+ , InputConsumed(inputConsumed)
392
428
{
393
429
YQL_ENSURE (MaxOutputBlockLen_ > 0 );
394
430
InputData_.reserve (inputs.size ());
@@ -552,6 +588,9 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
552
588
NUdf::EFetchStatus WideFetch (NKikimr::NUdf::TUnboxedValue* result, ui32 width) final {
553
589
YQL_ENSURE (width == ItemTypes_.size () + 1 );
554
590
if (IsFinished_) {
591
+ if (Y_UNLIKELY (!StartTs)) {
592
+ StartTs = Now ();
593
+ }
555
594
return NUdf::EFetchStatus::Finish;
556
595
}
557
596
@@ -576,6 +615,10 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
576
615
if (Stats_) {
577
616
Stats_.Add (result, width);
578
617
}
618
+ if (Y_UNLIKELY (!StartTs)) {
619
+ StartTs = Now ();
620
+ }
621
+ InputConsumed = true ;
579
622
return NUdf::EFetchStatus::Ok;
580
623
}
581
624
@@ -645,7 +688,7 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
645
688
if (status == NUdf::EFetchStatus::Ok) {
646
689
std::make_heap (InputRows_.begin (), InputRows_.end ());
647
690
}
648
- }
691
+ }
649
692
}
650
693
651
694
if (!OutputBlockLen_) {
@@ -685,9 +728,11 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
685
728
686
729
const NKikimr::NMiniKQL::THolderFactory& Factory_;
687
730
TDqMeteringStats::TInputStatsMeter Stats_;
688
-
731
+
689
732
std::unique_ptr<TArgsDechunker> Output_;
690
733
bool IsFinished_ = false ;
734
+ TInstant& StartTs;
735
+ bool & InputConsumed;
691
736
};
692
737
693
738
void ValidateInputTypes (const NKikimr::NMiniKQL::TType* type, const TVector<IDqInput::TPtr>& inputs) {
@@ -723,7 +768,7 @@ void TDqMeteringStats::TInputStatsMeter::Add(const NKikimr::NUdf::TUnboxedValue*
723
768
} else {
724
769
Stats->RowsConsumed += 1 ;
725
770
}
726
-
771
+
727
772
NYql::NDq::TDqDataSerializer::TEstimateSizeSettings settings;
728
773
settings.DiscardUnsupportedTypes = true ;
729
774
settings.WithHeaders = false ;
@@ -740,18 +785,18 @@ void TDqMeteringStats::TInputStatsMeter::Add(const NKikimr::NUdf::TUnboxedValue*
740
785
}
741
786
742
787
NUdf::TUnboxedValue CreateInputUnionValue (const NKikimr::NMiniKQL::TType* type, TVector<IDqInput::TPtr>&& inputs,
743
- const NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats)
788
+ const NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats, TInstant& startTs, bool & inputConsumed )
744
789
{
745
790
ValidateInputTypes (type, inputs);
746
791
if (type->IsMulti ()) {
747
- return factory.Create <TDqInputUnionStreamValue<true >>(type, std::move (inputs), stats);
792
+ return factory.Create <TDqInputUnionStreamValue<true >>(type, std::move (inputs), stats, startTs, inputConsumed );
748
793
}
749
- return factory.Create <TDqInputUnionStreamValue<false >>(type, std::move (inputs), stats);
794
+ return factory.Create <TDqInputUnionStreamValue<false >>(type, std::move (inputs), stats, startTs, inputConsumed );
750
795
}
751
796
752
797
NKikimr::NUdf::TUnboxedValue CreateInputMergeValue (const NKikimr::NMiniKQL::TType* type, TVector<IDqInput::TPtr>&& inputs,
753
798
TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats,
754
- NUdf::IPgBuilder* pgBuilder)
799
+ TInstant& startTs, bool & inputConsumed, NUdf::IPgBuilder* pgBuilder)
755
800
{
756
801
ValidateInputTypes (type, inputs);
757
802
YQL_ENSURE (!inputs.empty ());
@@ -760,13 +805,13 @@ NKikimr::NUdf::TUnboxedValue CreateInputMergeValue(const NKikimr::NMiniKQL::TTyp
760
805
// we can ignore scalar columns, since all they have exactly the same value in all inputs
761
806
EraseIf (sortCols, [](const auto & sortCol) { return *sortCol.IsScalar ; });
762
807
if (sortCols.empty ()) {
763
- return factory.Create <TDqInputUnionStreamValue<true >>(type, std::move (inputs), stats);
808
+ return factory.Create <TDqInputUnionStreamValue<true >>(type, std::move (inputs), stats, startTs, inputConsumed );
764
809
}
765
- return factory.Create <TDqInputMergeBlockStreamValue>(type, std::move (inputs), std::move (sortCols), factory, stats, pgBuilder);
810
+ return factory.Create <TDqInputMergeBlockStreamValue>(type, std::move (inputs), std::move (sortCols), factory, stats, startTs, inputConsumed, pgBuilder);
766
811
}
767
- return factory.Create <TDqInputMergeStreamValue<true >>(type, std::move (inputs), std::move (sortCols), stats);
812
+ return factory.Create <TDqInputMergeStreamValue<true >>(type, std::move (inputs), std::move (sortCols), stats, startTs, inputConsumed );
768
813
}
769
- return factory.Create <TDqInputMergeStreamValue<false >>(type, std::move (inputs), std::move (sortCols), stats);
814
+ return factory.Create <TDqInputMergeStreamValue<false >>(type, std::move (inputs), std::move (sortCols), stats, startTs, inputConsumed );
770
815
}
771
816
772
817
} // namespace NYql::NDq
0 commit comments