Skip to content

Commit 4034575

Browse files
committed
YT-21709: Sign shuffle handle
Just signature generation, no validation yet commit_hash:90925bff99dffd796a070b523b12af87802dc96e
1 parent b04fa4a commit 4034575

File tree

10 files changed

+39
-35
lines changed

10 files changed

+39
-35
lines changed

yt/yt/client/api/delegating_client.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -899,22 +899,22 @@ class TDelegatingClient
899899
(cookie, options))
900900

901901
// Shuffle Service
902-
DELEGATE_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (
902+
DELEGATE_METHOD(TFuture<TSignedShuffleHandlePtr>, StartShuffle, (
903903
const std::string& account,
904904
int partitionCount,
905905
NObjectClient::TTransactionId transactionId,
906906
const TStartShuffleOptions& options),
907907
(account, partitionCount, transactionId, options))
908908

909909
DELEGATE_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (
910-
const TShuffleHandlePtr& shuffleHandle,
910+
const TSignedShuffleHandlePtr& shuffleHandle,
911911
int partitionIndex,
912912
std::optional<std::pair<int, int>> writerIndexRange,
913913
const TShuffleReaderOptions& options),
914914
(shuffleHandle, partitionIndex, writerIndexRange, options))
915915

916916
DELEGATE_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, (
917-
const TShuffleHandlePtr& shuffleHandle,
917+
const TSignedShuffleHandlePtr& shuffleHandle,
918918
const std::string& partitionColumn,
919919
std::optional<int> writerIndex,
920920
const TShuffleWriterOptions& options),

yt/yt/client/api/rpc_proxy/client_impl.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2781,7 +2781,7 @@ TFuture<TFlowExecuteResult> TClient::FlowExecute(
27812781
}));
27822782
}
27832783

2784-
TFuture<TShuffleHandlePtr> TClient::StartShuffle(
2784+
TFuture<TSignedShuffleHandlePtr> TClient::StartShuffle(
27852785
const std::string& account,
27862786
int partitionCount,
27872787
TTransactionId parentTransactionId,
@@ -2803,12 +2803,12 @@ TFuture<TShuffleHandlePtr> TClient::StartShuffle(
28032803
}
28042804

28052805
return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspStartShufflePtr& rsp) {
2806-
return ConvertTo<TShuffleHandlePtr>(TYsonString(rsp->shuffle_handle()));
2806+
return ConvertTo<TSignedShuffleHandlePtr>(TYsonStringBuf(rsp->signed_shuffle_handle()));
28072807
}));
28082808
}
28092809

28102810
TFuture<IRowBatchReaderPtr> TClient::CreateShuffleReader(
2811-
const TShuffleHandlePtr& shuffleHandle,
2811+
const TSignedShuffleHandlePtr& signedShuffleHandle,
28122812
int partitionIndex,
28132813
std::optional<std::pair<int, int>> writerIndexRange,
28142814
const TShuffleReaderOptions& options)
@@ -2818,7 +2818,7 @@ TFuture<IRowBatchReaderPtr> TClient::CreateShuffleReader(
28182818
auto req = proxy.ReadShuffleData();
28192819
InitStreamingRequest(*req);
28202820

2821-
req->set_shuffle_handle(ConvertToYsonString(shuffleHandle).ToString());
2821+
req->set_signed_shuffle_handle(ConvertToYsonString(signedShuffleHandle).ToString());
28222822
req->set_partition_index(partitionIndex);
28232823
if (options.Config) {
28242824
req->set_reader_config(ConvertToYsonString(options.Config).ToString());
@@ -2836,7 +2836,7 @@ TFuture<IRowBatchReaderPtr> TClient::CreateShuffleReader(
28362836
}
28372837

28382838
TFuture<IRowBatchWriterPtr> TClient::CreateShuffleWriter(
2839-
const TShuffleHandlePtr& shuffleHandle,
2839+
const TSignedShuffleHandlePtr& signedShuffleHandle,
28402840
const std::string& partitionColumn,
28412841
std::optional<int> writerIndex,
28422842
const TShuffleWriterOptions& options)
@@ -2845,7 +2845,7 @@ TFuture<IRowBatchWriterPtr> TClient::CreateShuffleWriter(
28452845
auto req = proxy.WriteShuffleData();
28462846
InitStreamingRequest(*req);
28472847

2848-
req->set_shuffle_handle(ConvertToYsonString(shuffleHandle).ToString());
2848+
req->set_signed_shuffle_handle(ConvertToYsonString(signedShuffleHandle).ToString());
28492849
req->set_partition_column(ToProto(partitionColumn));
28502850
if (options.Config) {
28512851
req->set_writer_config(ConvertToYsonString(options.Config).ToString());

yt/yt/client/api/rpc_proxy/client_impl.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -599,20 +599,20 @@ class TClient
599599
const TFlowExecuteOptions& options = {}) override;
600600

601601
// Shuffle service client
602-
TFuture<TShuffleHandlePtr> StartShuffle(
602+
TFuture<TSignedShuffleHandlePtr> StartShuffle(
603603
const std::string& account,
604604
int partitionCount,
605605
NObjectClient::TTransactionId parentTransactionId,
606606
const TStartShuffleOptions& options) override;
607607

608608
TFuture<IRowBatchReaderPtr> CreateShuffleReader(
609-
const TShuffleHandlePtr& shuffleHandle,
609+
const TSignedShuffleHandlePtr& shuffleHandle,
610610
int partitionIndex,
611611
std::optional<std::pair<int, int>> writerIndexRange,
612612
const TShuffleReaderOptions& options) override;
613613

614614
TFuture<IRowBatchWriterPtr> CreateShuffleWriter(
615-
const TShuffleHandlePtr& shuffleHandle,
615+
const TSignedShuffleHandlePtr& shuffleHandle,
616616
const std::string& partitionColumn,
617617
std::optional<int> writerIndex,
618618
const TShuffleWriterOptions& options) override;

yt/yt/client/api/shuffle_client.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ struct TShuffleHandle
2323

2424
DEFINE_REFCOUNTED_TYPE(TShuffleHandle)
2525

26+
YT_DEFINE_STRONG_TYPEDEF(TSignedShuffleHandlePtr, NSignature::TSignaturePtr);
27+
2628
void FormatValue(TStringBuilderBase* builder, const TShuffleHandlePtr& shuffleHandle, TStringBuf spec);
2729

2830
////////////////////////////////////////////////////////////////////////////////
@@ -51,20 +53,20 @@ struct IShuffleClient
5153
{
5254
virtual ~IShuffleClient() = default;
5355

54-
virtual TFuture<TShuffleHandlePtr> StartShuffle(
56+
virtual TFuture<TSignedShuffleHandlePtr> StartShuffle(
5557
const std::string& account,
5658
int partitionCount,
5759
NObjectClient::TTransactionId parentTransactionId,
5860
const TStartShuffleOptions& options) = 0;
5961

6062
virtual TFuture<IRowBatchReaderPtr> CreateShuffleReader(
61-
const TShuffleHandlePtr& shuffleHandle,
63+
const TSignedShuffleHandlePtr& shuffleHandle,
6264
int partitionIndex,
6365
std::optional<std::pair<int, int>> writerIndexRange = {},
6466
const TShuffleReaderOptions& options = {}) = 0;
6567

6668
virtual TFuture<IRowBatchWriterPtr> CreateShuffleWriter(
67-
const TShuffleHandlePtr& shuffleHandle,
69+
const TSignedShuffleHandlePtr& shuffleHandle,
6870
const std::string& partitionColumn,
6971
std::optional<int> writerIndex = {},
7072
const TShuffleWriterOptions& options = {}) = 0;

yt/yt/client/driver/shuffle_commands.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
#include <yt/yt/client/formats/config.h>
88

9+
#include <yt/yt/client/signature/signature.h>
10+
911
#include <yt/yt/client/table_client/adapters.h>
1012
#include <yt/yt/client/table_client/table_output.h>
1113
#include <yt/yt/client/table_client/value_consumer.h>
@@ -42,16 +44,16 @@ void TStartShuffleCommand::DoExecute(ICommandContextPtr context)
4244
{
4345
auto client = context->GetClient();
4446
auto asyncResult = client->StartShuffle(Account, PartitionCount, ParentTransactionId, Options);
45-
auto shuffleHandle = WaitFor(asyncResult).ValueOrThrow();
47+
auto signedShuffleHandle = WaitFor(asyncResult).ValueOrThrow();
4648

47-
context->ProduceOutputValue(ConvertToYsonString(shuffleHandle));
49+
context->ProduceOutputValue(ConvertToYsonString(signedShuffleHandle));
4850
}
4951

5052
//////////////////////////////////////////////////////////////////////////////
5153

5254
void TReadShuffleDataCommand::Register(TRegistrar registrar)
5355
{
54-
registrar.Parameter("shuffle_handle", &TThis::ShuffleHandle);
56+
registrar.Parameter("signed_shuffle_handle", &TThis::SignedShuffleHandle);
5557
registrar.Parameter("partition_index", &TThis::PartitionIndex);
5658
registrar.Parameter("writer_index_begin", &TThis::WriterIndexBegin)
5759
.Default()
@@ -84,7 +86,7 @@ void TReadShuffleDataCommand::DoExecute(ICommandContextPtr context)
8486
}
8587

8688
auto reader = WaitFor(context->GetClient()->CreateShuffleReader(
87-
ShuffleHandle,
89+
SignedShuffleHandle,
8890
PartitionIndex,
8991
writerIndexRange,
9092
Options))
@@ -117,7 +119,7 @@ void TReadShuffleDataCommand::DoExecute(ICommandContextPtr context)
117119

118120
void TWriteShuffleDataCommand::Register(TRegistrar registrar)
119121
{
120-
registrar.Parameter("shuffle_handle", &TThis::ShuffleHandle);
122+
registrar.Parameter("signed_shuffle_handle", &TThis::SignedShuffleHandle);
121123
registrar.Parameter("partition_column", &TThis::PartitionColumn);
122124
registrar.Parameter("max_row_buffer_size", &TThis::MaxRowBufferSize)
123125
.Default(1_MB);
@@ -141,7 +143,7 @@ void TWriteShuffleDataCommand::DoExecute(ICommandContextPtr context)
141143
Options.OverwriteExistingWriterData = OverwriteExistingWriterData;
142144

143145
auto writer = WaitFor(context->GetClient()->CreateShuffleWriter(
144-
ShuffleHandle,
146+
SignedShuffleHandle,
145147
PartitionColumn,
146148
WriterIndex,
147149
Options))

yt/yt/client/driver/shuffle_commands.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class TReadShuffleDataCommand
3535
static void Register(TRegistrar registrar);
3636

3737
private:
38-
NApi::TShuffleHandlePtr ShuffleHandle;
38+
NApi::TSignedShuffleHandlePtr SignedShuffleHandle;
3939
int PartitionIndex;
4040
std::optional<int> WriterIndexBegin;
4141
std::optional<int> WriterIndexEnd;
@@ -54,7 +54,7 @@ class TWriteShuffleDataCommand
5454
static void Register(TRegistrar registrar);
5555

5656
private:
57-
NApi::TShuffleHandlePtr ShuffleHandle;
57+
NApi::TSignedShuffleHandlePtr SignedShuffleHandle;
5858
std::string PartitionColumn;
5959
i64 MaxRowBufferSize;
6060
std::optional<int> WriterIndex;

yt/yt/client/federated/client.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -501,9 +501,9 @@ class TClient
501501
UNIMPLEMENTED_METHOD(TFuture<TDistributedWriteSessionWithCookies>, StartDistributedWriteSession, (const NYPath::TRichYPath&, const TDistributedWriteSessionStartOptions&));
502502
UNIMPLEMENTED_METHOD(TFuture<void>, FinishDistributedWriteSession, (const TDistributedWriteSessionWithResults&, const TDistributedWriteSessionFinishOptions&));
503503
UNIMPLEMENTED_METHOD(TFuture<ITableFragmentWriterPtr>, CreateTableFragmentWriter, (const TSignedWriteFragmentCookiePtr&, const TTableFragmentWriterOptions&));
504-
UNIMPLEMENTED_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (const std::string& , int, NObjectClient::TTransactionId, const TStartShuffleOptions&));
505-
UNIMPLEMENTED_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (const TShuffleHandlePtr&, int, std::optional<std::pair<int, int>>, const TShuffleReaderOptions&));
506-
UNIMPLEMENTED_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, (const TShuffleHandlePtr&, const std::string&, std::optional<int>, const TShuffleWriterOptions&));
504+
UNIMPLEMENTED_METHOD(TFuture<TSignedShuffleHandlePtr>, StartShuffle, (const std::string& , int, NObjectClient::TTransactionId, const TStartShuffleOptions&));
505+
UNIMPLEMENTED_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (const TSignedShuffleHandlePtr&, int, std::optional<std::pair<int, int>>, const TShuffleReaderOptions&));
506+
UNIMPLEMENTED_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, (const TSignedShuffleHandlePtr&, const std::string&, std::optional<int>, const TShuffleWriterOptions&));
507507

508508
private:
509509
friend class TTransaction;

yt/yt/client/hedging/hedging.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -235,9 +235,9 @@ class THedgingClient
235235
UNSUPPORTED_METHOD(TFuture<TPipelineState>, GetPipelineState, (const TYPath&, const TGetPipelineStateOptions&));
236236
UNSUPPORTED_METHOD(TFuture<TGetFlowViewResult>, GetFlowView, (const NYPath::TYPath&, const NYPath::TYPath&, const TGetFlowViewOptions&));
237237
UNSUPPORTED_METHOD(TFuture<TFlowExecuteResult>, FlowExecute, (const NYPath::TYPath&, const TString&, const NYson::TYsonString&, const TFlowExecuteOptions&));
238-
UNSUPPORTED_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (const std::string&, int, NObjectClient::TTransactionId, const TStartShuffleOptions&));
239-
UNSUPPORTED_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (const TShuffleHandlePtr&, int, std::optional<std::pair<int, int>>, const TShuffleReaderOptions&));
240-
UNSUPPORTED_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, (const TShuffleHandlePtr&, const std::string&, std::optional<int>, const TShuffleWriterOptions&));
238+
UNSUPPORTED_METHOD(TFuture<TSignedShuffleHandlePtr>, StartShuffle, (const std::string&, int, NObjectClient::TTransactionId, const TStartShuffleOptions&));
239+
UNSUPPORTED_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (const TSignedShuffleHandlePtr&, int, std::optional<std::pair<int, int>>, const TShuffleReaderOptions&));
240+
UNSUPPORTED_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, (const TSignedShuffleHandlePtr&, const std::string&, std::optional<int>, const TShuffleWriterOptions&));
241241

242242
private:
243243
const THedgingExecutorPtr Executor_;

yt/yt/client/unittests/mock/client.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -887,22 +887,22 @@ class TMockClient
887887
const TTableFragmentWriterOptions& options),
888888
(override));
889889

890-
MOCK_METHOD(TFuture<TShuffleHandlePtr>, StartShuffle, (
890+
MOCK_METHOD(TFuture<TSignedShuffleHandlePtr>, StartShuffle, (
891891
const std::string& account,
892892
int partitionCount,
893893
NObjectClient::TTransactionId parentTransactionId,
894894
const TStartShuffleOptions& options),
895895
(override));
896896

897897
MOCK_METHOD(TFuture<IRowBatchReaderPtr>, CreateShuffleReader, (
898-
const TShuffleHandlePtr& shuffleHandle,
898+
const TSignedShuffleHandlePtr& shuffleHandle,
899899
int partitionIndex,
900900
std::optional<TRange> writerIndexRange,
901901
const TShuffleReaderOptions& options),
902902
(override));
903903

904904
MOCK_METHOD(TFuture<IRowBatchWriterPtr>, CreateShuffleWriter, (
905-
const TShuffleHandlePtr& shuffleHandle,
905+
const TSignedShuffleHandlePtr& shuffleHandle,
906906
const std::string& partitionColumn,
907907
std::optional<int> writerIndex,
908908
const TShuffleWriterOptions& options),

yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3592,7 +3592,7 @@ message TReqStartShuffle
35923592

35933593
message TRspStartShuffle
35943594
{
3595-
required bytes shuffle_handle = 1; // YSON-serialized TShuffleHandle
3595+
required bytes signed_shuffle_handle = 1; // Opaque TSignature with YSON-serialized TShuffleHandle
35963596
}
35973597

35983598
////////////////////////////////////////////////////////////////////////////////
@@ -3607,7 +3607,7 @@ message TReqReadShuffleData
36073607
required int32 end = 2;
36083608
}
36093609

3610-
required bytes shuffle_handle = 1; // YSON-serialized TShuffleHandle
3610+
required bytes signed_shuffle_handle = 1; // Opaque TSignature with YSON-serialized TShuffleHandle
36113611
optional bytes reader_config = 2; // YSON-serialized TTableReaderConfig
36123612
required int32 partition_index = 3;
36133613
optional IndexRange writer_index_range = 4;
@@ -3621,7 +3621,7 @@ message TRspReadShuffleData
36213621

36223622
message TReqWriteShuffleData
36233623
{
3624-
required bytes shuffle_handle = 1; // YSON-serialized TShuffleHandle
3624+
required bytes signed_shuffle_handle = 1; // Opaque TSignature with YSON-serialized TShuffleHandle
36253625
optional bytes writer_config = 2; // YSON-serialized TTableWriterConfig
36263626
required string partition_column = 3;
36273627
optional int32 writer_index = 4;

0 commit comments

Comments
 (0)