Skip to content

Commit 145d1a0

Browse files
committed
YT-22455: Introduce list jobs continuation token
[nodiff:caesar] commit_hash:d45b3da99e7b19120e02298ca6e87c02cc800ea2
1 parent 06ad4bc commit 145d1a0

File tree

11 files changed

+222
-10
lines changed

11 files changed

+222
-10
lines changed

yt/cpp/mapreduce/interface/operation.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2855,6 +2855,18 @@ struct TListJobsOptions
28552855
/// @brief Return only jobs with monitoring descriptor.
28562856
FLUENT_FIELD_OPTION(bool, WithMonitoringDescriptor);
28572857

2858+
///
2859+
/// @brief Search for jobs with start time >= `FromTime`.
2860+
FLUENT_FIELD_OPTION(TInstant, FromTime);
2861+
2862+
///
2863+
/// @brief Search for jobs with start time <= `ToTime`.
2864+
FLUENT_FIELD_OPTION(TInstant, ToTime);
2865+
2866+
///
2867+
/// @brief Search for jobs with filters encoded in token.
2868+
FLUENT_FIELD_OPTION(TString, ContinuationToken);
2869+
28582870
/// @}
28592871

28602872
///

yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,15 @@ TNode SerializeParamsForListJobs(
562562
if (options.WithMonitoringDescriptor_) {
563563
result["with_monitoring_descriptor"] = *options.WithMonitoringDescriptor_;
564564
}
565+
if (options.FromTime_) {
566+
result["from_time"] = ToString(options.FromTime_);
567+
}
568+
if (options.ToTime_) {
569+
result["to_time"] = ToString(options.ToTime_);
570+
}
571+
if (options.ContinuationToken_) {
572+
result["continuation_token"] = *options.ContinuationToken_;
573+
}
565574

566575
if (options.SortField_) {
567576
result["sort_field"] = ToString(*options.SortField_);

yt/yt/client/api/operation_client.cpp

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,115 @@
33
#include <yt/yt/client/job_tracker_client/helpers.h>
44

55
#include <yt/yt/core/ytree/fluent.h>
6+
#include <yt/yt/core/ytree/yson_struct.h>
7+
8+
#include <library/cpp/string_utils/base64/base64.h>
69

710
namespace NYT::NApi {
811

912
using namespace NYTree;
1013
using namespace NJobTrackerClient;
14+
using namespace NYson;
1115

1216
////////////////////////////////////////////////////////////////////////////////
1317

18+
void TListJobsContinuationTokenSerializer::Register(TRegistrar registrar)
19+
{
20+
registrar.ExternalClassParameter("version", &TThat::Version)
21+
.Default(0)
22+
.DontSerializeDefault();
23+
24+
registrar.ExternalBaseClassParameter("job_competition_id", &TListJobsOptions::JobCompetitionId)
25+
.Default()
26+
.DontSerializeDefault();
27+
28+
registrar.ExternalBaseClassParameter("type", &TThat::Type)
29+
.Default()
30+
.DontSerializeDefault();
31+
32+
registrar.ExternalBaseClassParameter("state", &TThat::State)
33+
.Default()
34+
.DontSerializeDefault();
35+
36+
registrar.ExternalBaseClassParameter("address", &TThat::Address)
37+
.Default()
38+
.DontSerializeDefault();
39+
40+
registrar.ExternalBaseClassParameter("with_stderr", &TThat::WithStderr)
41+
.Default()
42+
.DontSerializeDefault();
43+
44+
registrar.ExternalBaseClassParameter("with_fail_context", &TThat::WithFailContext)
45+
.Default()
46+
.DontSerializeDefault();
47+
48+
registrar.ExternalBaseClassParameter("with_spec", &TThat::WithSpec)
49+
.Default()
50+
.DontSerializeDefault();
51+
52+
registrar.ExternalBaseClassParameter("with_competitors", &TThat::WithCompetitors)
53+
.Default()
54+
.DontSerializeDefault();
55+
56+
registrar.ExternalBaseClassParameter("with_monitoring_gescriptor", &TThat::WithMonitoringDescriptor)
57+
.Default()
58+
.DontSerializeDefault();
59+
60+
registrar.ExternalBaseClassParameter("task_name", &TThat::TaskName)
61+
.Default()
62+
.DontSerializeDefault();
63+
64+
registrar.ExternalBaseClassParameter("running_jobs_lookbehind_period", &TThat::RunningJobsLookbehindPeriod)
65+
.Default(TDuration::Max())
66+
.DontSerializeDefault();
67+
68+
registrar.ExternalBaseClassParameter("sort_field", &TThat::SortField)
69+
.Default()
70+
.DontSerializeDefault();
71+
72+
registrar.ExternalBaseClassParameter("sort_order", &TThat::SortOrder)
73+
.Default()
74+
.DontSerializeDefault();
75+
76+
registrar.ExternalBaseClassParameter("offset", &TThat::Offset)
77+
.Default(0)
78+
.DontSerializeDefault();
79+
80+
registrar.ExternalBaseClassParameter("limit", &TThat::Limit)
81+
.Default(1000)
82+
.DontSerializeDefault();
83+
84+
registrar.ExternalBaseClassParameter("include_archive", &TThat::IncludeArchive)
85+
.Default(false)
86+
.DontSerializeDefault();
87+
88+
registrar.ExternalBaseClassParameter("include_cypress", &TThat::IncludeCypress)
89+
.Default(false)
90+
.DontSerializeDefault();
91+
92+
registrar.ExternalBaseClassParameter("include_controller_agent", &TThat::IncludeControllerAgent)
93+
.Default(false)
94+
.DontSerializeDefault();
95+
}
96+
97+
TString EncodeNewToken(TListJobsOptions&& options, int jobCount)
98+
{
99+
options.Offset += jobCount;
100+
options.ContinuationToken.reset();
101+
102+
TListJobsContinuationToken token;
103+
static_cast<TListJobsOptions&>(token) = std::move(options);
104+
105+
auto optionsYson = ConvertToYsonString(token);
106+
return Base64Encode(optionsYson.ToString());
107+
}
108+
109+
TListJobsOptions DecodeListJobsOptionsFromToken(const TString& continuationToken)
110+
{
111+
auto optionsYson = TYsonString(Base64StrictDecode(continuationToken));
112+
return ConvertTo<TListJobsContinuationToken>(optionsYson);
113+
}
114+
14115
void Serialize(
15116
const TOperation& operation,
16117
NYson::IYsonConsumer* consumer,

yt/yt/client/api/operation_client.h

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -166,16 +166,16 @@ struct TPollJobShellResponse
166166
};
167167

168168
DEFINE_ENUM(EJobSortField,
169-
((None) (0))
170-
((Type) (1))
171-
((State) (2))
172-
((StartTime) (3))
173-
((FinishTime) (4))
174-
((Address) (5))
175-
((Duration) (6))
176-
((Progress) (7))
177-
((Id) (8))
178-
((TaskName) (9))
169+
((None) (0))
170+
((Type) (1))
171+
((State) (2))
172+
((StartTime) (3))
173+
((FinishTime) (4))
174+
((Address) (5))
175+
((Duration) (6))
176+
((Progress) (7))
177+
((Id) (8))
178+
((TaskName) (9))
179179
);
180180

181181
DEFINE_ENUM(EJobSortDirection,
@@ -206,6 +206,11 @@ struct TListJobsOptions
206206
std::optional<bool> WithMonitoringDescriptor;
207207
std::optional<TString> TaskName;
208208

209+
std::optional<TInstant> FromTime;
210+
std::optional<TInstant> ToTime;
211+
212+
std::optional<TString> ContinuationToken;
213+
209214
TDuration RunningJobsLookbehindPeriod = TDuration::Max();
210215

211216
EJobSortField SortField = EJobSortField::None;
@@ -221,6 +226,32 @@ struct TListJobsOptions
221226
EDataSource DataSource = EDataSource::Auto;
222227
};
223228

229+
struct TListJobsContinuationToken
230+
: public TListJobsOptions
231+
{
232+
int Version = 0;
233+
};
234+
235+
////////////////////////////////////////////////////////////////////////////////
236+
237+
class TListJobsContinuationTokenSerializer
238+
: public virtual NYTree::TExternalizedYsonStruct
239+
{
240+
public:
241+
REGISTER_EXTERNALIZED_YSON_STRUCT(TListJobsContinuationToken, TListJobsContinuationTokenSerializer);
242+
243+
static void Register(TRegistrar registrar);
244+
};
245+
246+
ASSIGN_EXTERNAL_YSON_SERIALIZER(TListJobsContinuationToken, TListJobsContinuationTokenSerializer);
247+
248+
////////////////////////////////////////////////////////////////////////////////
249+
250+
TString EncodeNewToken(TListJobsOptions&& options, int jobCount);
251+
TListJobsOptions DecodeListJobsOptionsFromToken(const TString& continuationToken);
252+
253+
////////////////////////////////////////////////////////////////////////////////
254+
224255
struct TAbandonJobOptions
225256
: public TTimeoutOptions
226257
{ };
@@ -387,6 +418,8 @@ struct TListJobsResult
387418
TListJobsStatistics Statistics;
388419

389420
std::vector<TError> Errors;
421+
422+
std::optional<TString> ContinuationToken;
390423
};
391424

392425
struct TGetJobStderrResponse

yt/yt/client/api/rpc_proxy/client_impl.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1486,6 +1486,15 @@ TFuture<TListJobsResult> TClient::ListJobs(
14861486
if (options.TaskName) {
14871487
req->set_task_name(*options.TaskName);
14881488
}
1489+
if (options.FromTime) {
1490+
req->set_from_time(NYT::ToProto(*options.FromTime));
1491+
}
1492+
if (options.ToTime) {
1493+
req->set_to_time(NYT::ToProto(*options.ToTime));
1494+
}
1495+
if (options.ContinuationToken) {
1496+
req->set_continuation_token(*options.ContinuationToken);
1497+
}
14891498

14901499
req->set_sort_field(static_cast<NProto::EJobSortField>(options.SortField));
14911500
req->set_sort_order(static_cast<NProto::EJobSortDirection>(options.SortOrder));

yt/yt/client/api/rpc_proxy/helpers.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,9 @@ void ToProto(
407407
if (result.ArchiveJobCount) {
408408
proto->set_archive_job_count(*result.ArchiveJobCount);
409409
}
410+
if (result.ContinuationToken) {
411+
proto->set_continuation_token(*result.ContinuationToken);
412+
}
410413

411414
ToProto(proto->mutable_statistics(), result.Statistics);
412415
ToProto(proto->mutable_errors(), result.Errors);
@@ -433,6 +436,11 @@ void FromProto(
433436
} else {
434437
result->ArchiveJobCount.reset();
435438
}
439+
if (proto.has_continuation_token()) {
440+
result->ContinuationToken = proto.continuation_token();
441+
} else {
442+
result->ContinuationToken.reset();
443+
}
436444

437445
FromProto(&result->Statistics, proto.statistics());
438446
FromProto(&result->Errors, proto.errors());

yt/yt/client/driver/scheduler_commands.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,21 @@ void TListJobsCommand::Register(TRegistrar registrar)
490490
[] (TThis* command) -> auto& { return command->Options.WithMonitoringDescriptor; })
491491
.Optional(/*init*/ false);
492492

493+
registrar.ParameterWithUniversalAccessor<std::optional<TInstant>>(
494+
"from_time",
495+
[] (TThis* command) -> auto& { return command->Options.FromTime; })
496+
.Optional(/*init*/ false);
497+
498+
registrar.ParameterWithUniversalAccessor<std::optional<TInstant>>(
499+
"to_time",
500+
[] (TThis* command) -> auto& { return command->Options.ToTime; })
501+
.Optional(/*init*/ false);
502+
503+
registrar.ParameterWithUniversalAccessor<std::optional<TString>>(
504+
"continuation_token",
505+
[] (TThis* command) -> auto& { return command->Options.ContinuationToken; })
506+
.Optional(/*init*/ false);
507+
493508
registrar.ParameterWithUniversalAccessor<TJobId>(
494509
"job_competition_id",
495510
[] (TThis* command) -> auto& { return command->Options.JobCompetitionId; })
@@ -593,6 +608,7 @@ void TListJobsCommand::DoExecute(ICommandContextPtr context)
593608
}
594609
})
595610
.Item("errors").Value(result.Errors)
611+
.Item("continuation_token").Value(result.ContinuationToken)
596612
.EndMap());
597613
}
598614

yt/yt/client/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ PEERDIR(
220220
yt/yt/library/quantile_digest
221221
yt/yt_proto/yt/client
222222
library/cpp/json
223+
library/cpp/string_utils/base64
223224
contrib/libs/pfr
224225
)
225226

yt/yt/core/ytree/yson_struct-inl.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,19 @@ void TYsonStructRegistrar<TStruct>::ExternalPostprocessor(TExternalPostprocessor
248248
});
249249
}
250250

251+
template <class TStruct>
252+
template <class TBase, class TValue>
253+
TYsonStructParameter<TValue>& TYsonStructRegistrar<TStruct>::ExternalBaseClassParameter(const TString& key, TValue(TBase::*field))
254+
{
255+
static_assert(std::derived_from<TStruct, TExternalizedYsonStruct>);
256+
static_assert(std::derived_from<typename TStruct::TExternal, TBase>);
257+
auto universalAccessor = [field] (TStruct* serializer) -> auto& {
258+
return serializer->That_->*field;
259+
};
260+
261+
return ParameterWithUniversalAccessor<TValue>(key, universalAccessor);
262+
}
263+
251264
template <class TStruct>
252265
void TYsonStructRegistrar<TStruct>::UnrecognizedStrategy(EUnrecognizedStrategy strategy)
253266
{

yt/yt/core/ytree/yson_struct.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,9 @@ class TYsonStructRegistrar
313313
// requires std::derived_from<TStruct, TExternalizedYsonStruct<TExternal, TStruct>>
314314
TYsonStructParameter<TValue>& ExternalClassParameter(const TString& key, TValue(TExternal::*field));
315315

316+
template <class TBase, class TValue>
317+
TYsonStructParameter<TValue>& ExternalBaseClassParameter(const TString& key, TValue(TBase::*field));
318+
316319
template <class TExternalPreprocessor>
317320
// requires (CInvocable<TExternalPreprocessor, void(typename TStruct::TExternal*)>)
318321
void ExternalPreprocessor(TExternalPreprocessor preprocessor);

0 commit comments

Comments
 (0)