Skip to content

Commit dc434a3

Browse files
prollerermolovd
authored andcommitted
Add pagination for the GetJobStderr command
No description --- Co-authored-by: proller <proller@users.noreply.github.com> d538c6346fd862f0cfc76f7ebab84e37c1777c50 Pull Request resolved: ytsaurus/ytsaurus#708 Co-authored-by: ermolovd <ermolovd@yandex-team.com>
1 parent b516b7a commit dc434a3

File tree

11 files changed

+118
-11
lines changed

11 files changed

+118
-11
lines changed

yt/yt/client/api/delegating_client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ class TDelegatingClient
510510
const TGetJobSpecOptions& options),
511511
(jobId, options))
512512

513-
DELEGATE_METHOD(TFuture<TSharedRef>, GetJobStderr, (
513+
DELEGATE_METHOD(TFuture<TGetJobStderrResponse>, GetJobStderr, (
514514
const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
515515
NJobTrackerClient::TJobId jobId,
516516
const TGetJobStderrOptions& options),

yt/yt/client/api/operation_client.cpp

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,5 +227,52 @@ void TListOperationsAccessFilter::Register(TRegistrar registrar)
227227

228228
////////////////////////////////////////////////////////////////////////////////
229229

230+
TGetJobStderrResponse TGetJobStderrResponse::MakeJobStderr(const TSharedRef& data, const TGetJobStderrOptions& options)
231+
{
232+
auto totalSize = std::ssize(data);
233+
auto endOffset = totalSize;
234+
auto offset = options.Offset.value_or(0);
235+
auto limit = options.Limit.value_or(0);
236+
237+
if (!offset && !limit) {
238+
return {
239+
.Data = data,
240+
.TotalSize = totalSize,
241+
.EndOffset = endOffset,
242+
};
243+
};
244+
245+
size_t firstPos = 0;
246+
if (offset > 0) {
247+
firstPos = offset;
248+
}
249+
250+
if (firstPos >= data.size()) {
251+
return {
252+
.Data = TSharedRef{},
253+
.TotalSize = totalSize,
254+
.EndOffset = 0,
255+
};
256+
} else {
257+
auto lastPos = firstPos;
258+
if (limit > 0) {
259+
lastPos += limit;
260+
} else {
261+
lastPos += data.size();
262+
}
263+
if (lastPos > data.size()) {
264+
lastPos = data.size();
265+
}
266+
const auto dataCut = data.Slice(firstPos, lastPos);
267+
return {
268+
.Data = dataCut,
269+
.TotalSize = totalSize,
270+
.EndOffset = limit ? static_cast<i64>(firstPos + dataCut.size()) : endOffset,
271+
};
272+
}
273+
}
274+
275+
////////////////////////////////////////////////////////////////////////////////
276+
230277
} // namespace NYT::NApi
231278

yt/yt/client/api/operation_client.h

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,10 @@ struct TGetJobSpecOptions
8484
struct TGetJobStderrOptions
8585
: public TTimeoutOptions
8686
, public TMasterReadOptions
87-
{ };
87+
{
88+
std::optional<i64> Limit;
89+
std::optional<i64> Offset;
90+
};
8891

8992
struct TGetJobFailContextOptions
9093
: public TTimeoutOptions
@@ -361,6 +364,27 @@ struct TListJobsResult
361364
std::vector<TError> Errors;
362365
};
363366

367+
struct TGetJobStderrResponse
368+
{
369+
// 0
370+
// |<- stderr full log ->|
371+
// [ [<- Data ->] ]
372+
// |<- request.Offset
373+
// |<- request.Limit ->|
374+
// |<- EndOffset
375+
// |<- TotalSize
376+
377+
TSharedRef Data;
378+
379+
// Total current stderr size.
380+
i64 TotalSize = 0;
381+
382+
// Index of the last byte of the result in the full stderr.
383+
i64 EndOffset = 0;
384+
385+
static TGetJobStderrResponse MakeJobStderr(const TSharedRef& data, const TGetJobStderrOptions& options = {});
386+
};
387+
364388
////////////////////////////////////////////////////////////////////////////////
365389

366390
struct IOperationClient
@@ -414,7 +438,7 @@ struct IOperationClient
414438
NJobTrackerClient::TJobId jobId,
415439
const TGetJobSpecOptions& options = {}) = 0;
416440

417-
virtual TFuture<TSharedRef> GetJobStderr(
441+
virtual TFuture<TGetJobStderrResponse> GetJobStderr(
418442
const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
419443
NJobTrackerClient::TJobId jobId,
420444
const TGetJobStderrOptions& options = {}) = 0;

yt/yt/client/api/rpc_proxy/client_impl.cpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,7 +1294,7 @@ TFuture<TYsonString> TClient::GetJobSpec(
12941294
}));
12951295
}
12961296

1297-
TFuture<TSharedRef> TClient::GetJobStderr(
1297+
TFuture<TGetJobStderrResponse> TClient::GetJobStderr(
12981298
const TOperationIdOrAlias& operationIdOrAlias,
12991299
NJobTrackerClient::TJobId jobId,
13001300
const TGetJobStderrOptions& options)
@@ -1306,10 +1306,17 @@ TFuture<TSharedRef> TClient::GetJobStderr(
13061306

13071307
NScheduler::ToProto(req, operationIdOrAlias);
13081308
ToProto(req->mutable_job_id(), jobId);
1309+
if (options.Limit) {
1310+
req->set_limit(*options.Limit);
1311+
}
1312+
if (options.Offset) {
1313+
req->set_offset(*options.Offset);
1314+
}
13091315

1310-
return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspGetJobStderrPtr& rsp) {
1316+
return req->Invoke().Apply(BIND([req = req](const TApiServiceProxy::TRspGetJobStderrPtr& rsp) {
13111317
YT_VERIFY(rsp->Attachments().size() == 1);
1312-
return rsp->Attachments().front();
1318+
TGetJobStderrOptions options{.Limit = req->limit(), .Offset = req->offset()};
1319+
return TGetJobStderrResponse::MakeJobStderr(rsp->Attachments().front(), options);
13131320
}));
13141321
}
13151322

yt/yt/client/api/rpc_proxy/client_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ class TClient
273273
NJobTrackerClient::TJobId jobId,
274274
const NApi::TGetJobSpecOptions& options) override;
275275

276-
TFuture<TSharedRef> GetJobStderr(
276+
TFuture<TGetJobStderrResponse> GetJobStderr(
277277
const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
278278
NJobTrackerClient::TJobId jobId,
279279
const NApi::TGetJobStderrOptions& options) override;

yt/yt/client/driver/scheduler_commands.cpp

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,39 @@ void TGetJobSpecCommand::DoExecute(ICommandContextPtr context)
136136
void TGetJobStderrCommand::Register(TRegistrar registrar)
137137
{
138138
registrar.Parameter("job_id", &TThis::JobId);
139+
140+
registrar.ParameterWithUniversalAccessor<std::optional<i64>>(
141+
"limit",
142+
[] (TThis* command) -> auto& {
143+
return command->Options.Limit;
144+
})
145+
.Optional(/*init*/ true);
146+
147+
registrar.ParameterWithUniversalAccessor<std::optional<i64>>(
148+
"offset",
149+
[] (TThis* command) -> auto& {
150+
return command->Options.Offset;
151+
})
152+
.Optional(/*init*/ true);
153+
}
154+
155+
bool TGetJobStderrCommand::HasResponseParameters() const
156+
{
157+
return true;
139158
}
140159

141160
void TGetJobStderrCommand::DoExecute(ICommandContextPtr context)
142161
{
143162
auto result = WaitFor(context->GetClient()->GetJobStderr(OperationIdOrAlias, JobId, Options))
144163
.ValueOrThrow();
145164

165+
ProduceResponseParameters(context, [&] (NYson::IYsonConsumer* consumer) {
166+
BuildYsonMapFragmentFluently(consumer)
167+
.Item("total_size").Value(result.TotalSize)
168+
.Item("end_offset").Value(result.EndOffset);
169+
});
146170
auto output = context->Request().OutputStream;
147-
WaitFor(output->Write(result))
171+
WaitFor(output->Write(result.Data))
148172
.ThrowOnError();
149173
}
150174

yt/yt/client/driver/scheduler_commands.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ class TGetJobStderrCommand
132132
NJobTrackerClient::TJobId JobId;
133133

134134
void DoExecute(ICommandContextPtr context) override;
135+
bool HasResponseParameters() const override;
135136
};
136137

137138
////////////////////////////////////////////////////////////////////////////////

yt/yt/client/federated/client.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ class TClient
406406
UNIMPLEMENTED_METHOD(TFuture<NConcurrency::IAsyncZeroCopyInputStreamPtr>, GetJobInput, (NJobTrackerClient::TJobId, const TGetJobInputOptions&));
407407
UNIMPLEMENTED_METHOD(TFuture<NYson::TYsonString>, GetJobInputPaths, (NJobTrackerClient::TJobId, const TGetJobInputPathsOptions&));
408408
UNIMPLEMENTED_METHOD(TFuture<NYson::TYsonString>, GetJobSpec, (NJobTrackerClient::TJobId, const TGetJobSpecOptions&));
409-
UNIMPLEMENTED_METHOD(TFuture<TSharedRef>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&));
409+
UNIMPLEMENTED_METHOD(TFuture<TGetJobStderrResponse>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&));
410410
UNIMPLEMENTED_METHOD(TFuture<TSharedRef>, GetJobFailContext, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobFailContextOptions&));
411411
UNIMPLEMENTED_METHOD(TFuture<TListOperationsResult>, ListOperations, (const TListOperationsOptions&));
412412
UNIMPLEMENTED_METHOD(TFuture<TListJobsResult>, ListJobs, (const NScheduler::TOperationIdOrAlias&, const TListJobsOptions&));

yt/yt/client/hedging/hedging.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ class THedgingClient
165165
UNSUPPORTED_METHOD(TFuture<NConcurrency::IAsyncZeroCopyInputStreamPtr>, GetJobInput, (NJobTrackerClient::TJobId, const TGetJobInputOptions&));
166166
UNSUPPORTED_METHOD(TFuture<NYson::TYsonString>, GetJobInputPaths, (NJobTrackerClient::TJobId, const TGetJobInputPathsOptions&));
167167
UNSUPPORTED_METHOD(TFuture<NYson::TYsonString>, GetJobSpec, (NJobTrackerClient::TJobId, const TGetJobSpecOptions&));
168-
UNSUPPORTED_METHOD(TFuture<TSharedRef>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&));
168+
UNSUPPORTED_METHOD(TFuture<TGetJobStderrResponse>, GetJobStderr, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobStderrOptions&));
169169
UNSUPPORTED_METHOD(TFuture<TSharedRef>, GetJobFailContext, (const NScheduler::TOperationIdOrAlias&, NJobTrackerClient::TJobId, const TGetJobFailContextOptions&));
170170
UNSUPPORTED_METHOD(TFuture<TListOperationsResult>, ListOperations, (const TListOperationsOptions&));
171171
UNSUPPORTED_METHOD(TFuture<TListJobsResult>, ListJobs, (const NScheduler::TOperationIdOrAlias&, const TListJobsOptions&));

yt/yt/client/unittests/mock/client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@ class TMockClient
620620
const TGetJobSpecOptions& options),
621621
(override));
622622

623-
MOCK_METHOD(TFuture<TSharedRef>, GetJobStderr, (
623+
MOCK_METHOD(TFuture<TGetJobStderrResponse>, GetJobStderr, (
624624
const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
625625
NJobTrackerClient::TJobId jobId,
626626
const TGetJobStderrOptions& options),

0 commit comments

Comments
 (0)