Skip to content

Commit c872ef1

Browse files
committed
YT-22307: Empty template for dist write api
e9554dea04d2a2fb0c9d8cbee012afa88382e715
1 parent 7e4fd8b commit c872ef1

29 files changed

+725
-7
lines changed

yt/yt/client/api/client.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "accounting_client.h"
55
#include "admin_client.h"
66
#include "cypress_client.h"
7+
#include "distributed_table_client.h"
78
#include "etc_client.h"
89
#include "file_client.h"
910
#include "journal_client.h"
@@ -37,6 +38,7 @@ struct IClientBase
3738
, public IJournalClientBase
3839
, public IQueueClientBase
3940
, public IEtcClientBase
41+
, public IDistributedTableClientBase
4042
{
4143
virtual IConnectionPtr GetConnection() = 0;
4244
};
@@ -70,6 +72,7 @@ struct IClient
7072
, public IEtcClient
7173
, public NBundleControllerClient::IBundleControllerClient
7274
, public IFlowClient
75+
, public IDistributedTableClient
7376
{
7477
//! Terminates all channels.
7578
//! Aborts all pending uncommitted transactions.

yt/yt/client/api/delegating_client.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,22 @@ class TDelegatingClient
840840
const TGetFlowViewOptions& options),
841841
(pipelinePath, viewPath, options))
842842

843+
// Distributed client
844+
DELEGATE_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (
845+
const NYPath::TRichYPath& path,
846+
const TDistributedWriteSessionStartOptions& options),
847+
(path, options))
848+
849+
DELEGATE_METHOD(TFuture<void>, FinishDistributedWriteSession, (
850+
TDistributedWriteSessionPtr session,
851+
const TDistributedWriteSessionFinishOptions& options),
852+
(std::move(session), options))
853+
854+
DELEGATE_METHOD(TFuture<ITableWriterPtr>, CreateParticipantTableWriter, (
855+
const TDistributedWriteCookiePtr& cookie,
856+
const TParticipantTableWriterOptions& options),
857+
(cookie, options))
858+
843859
#undef DELEGATE_METHOD
844860

845861
protected:

yt/yt/client/api/delegating_transaction.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,16 @@ DELEGATE_METHOD(TFuture<TPushQueueProducerResult>, PushQueueProducer, (
312312
const TPushQueueProducerOptions& options),
313313
(producerPath, queuePath, sessionId, epoch, nameTable, serializedRows, options))
314314

315+
DELEGATE_METHOD(TFuture<TDistributedWriteSessionPtr>, StartDistributedWriteSession, (
316+
const NYPath::TRichYPath& path,
317+
const TDistributedWriteSessionStartOptions& options),
318+
(path, options))
319+
320+
DELEGATE_METHOD(TFuture<void>, FinishDistributedWriteSession, (
321+
TDistributedWriteSessionPtr session,
322+
const TDistributedWriteSessionFinishOptions& options),
323+
(std::move(session), options))
324+
315325
#undef DELEGATE_METHOD
316326

317327
////////////////////////////////////////////////////////////////////////////////

yt/yt/client/api/delegating_transaction.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,15 @@ class TDelegatingTransaction
256256
const std::vector<TSharedRef>& serializedRows,
257257
const TPushQueueProducerOptions& options) override;
258258

259+
// Distributed table client
260+
TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession(
261+
const NYPath::TRichYPath& path,
262+
const TDistributedWriteSessionStartOptions& options = {}) override;
263+
264+
TFuture<void> FinishDistributedWriteSession(
265+
TDistributedWriteSessionPtr session,
266+
const TDistributedWriteSessionFinishOptions& options = {}) override;
267+
259268
protected:
260269
const ITransactionPtr Underlying_;
261270
};
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#pragma once
2+
3+
#include "table_client.h"
4+
5+
#include <yt/yt/client/table_client/config.h>
6+
7+
namespace NYT::NApi {
8+
9+
////////////////////////////////////////////////////////////////////////////////
10+
11+
struct TDistributedWriteSessionStartOptions
12+
: public TTransactionalOptions
13+
{ };
14+
15+
struct TDistributedWriteSessionFinishOptions
16+
: public TTransactionalOptions
17+
{ };
18+
19+
struct TParticipantTableWriterOptions
20+
: public TTableWriterOptions
21+
{ };
22+
23+
////////////////////////////////////////////////////////////////////////////////
24+
25+
struct IDistributedTableClientBase
26+
{
27+
virtual ~IDistributedTableClientBase() = default;
28+
29+
virtual TFuture<TDistributedWriteSessionPtr> StartDistributedWriteSession(
30+
const NYPath::TRichYPath& path,
31+
const TDistributedWriteSessionStartOptions& options = {}) = 0;
32+
33+
virtual TFuture<void> FinishDistributedWriteSession(
34+
TDistributedWriteSessionPtr session,
35+
const TDistributedWriteSessionFinishOptions& options = {}) = 0;
36+
};
37+
38+
////////////////////////////////////////////////////////////////////////////////
39+
40+
struct IDistributedTableClient
41+
{
42+
virtual ~IDistributedTableClient() = default;
43+
44+
virtual TFuture<ITableWriterPtr> CreateParticipantTableWriter(
45+
const TDistributedWriteCookiePtr& cookie,
46+
const TParticipantTableWriterOptions& options = {}) = 0;
47+
};
48+
49+
////////////////////////////////////////////////////////////////////////////////
50+
51+
} // namespace NYT::NApi
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#include "distributed_table_sessions.h"
2+
3+
namespace NYT::NApi {
4+
5+
////////////////////////////////////////////////////////////////////////////////
6+
7+
void TDistributedWriteCookie::Register(TRegistrar /*registrar*/)
8+
{ }
9+
10+
////////////////////////////////////////////////////////////////////////////////
11+
12+
void TDistributedWriteSession::Register(TRegistrar /*registrar*/)
13+
{ }
14+
15+
////////////////////////////////////////////////////////////////////////////////
16+
17+
} // namespace NYT::NApi
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#pragma once
2+
3+
#include "public.h"
4+
5+
#include <yt/yt/core/ytree/yson_struct.h>
6+
7+
namespace NYT::NApi {
8+
9+
////////////////////////////////////////////////////////////////////////////////
10+
11+
YT_DEFINE_STRONG_TYPEDEF(TDistributedWriteSessionId, TGuid);
12+
13+
////////////////////////////////////////////////////////////////////////////////
14+
15+
class TDistributedWriteCookie
16+
: public NYTree::TYsonStruct
17+
{
18+
public:
19+
REGISTER_YSON_STRUCT(TDistributedWriteCookie);
20+
21+
static void Register(TRegistrar registrar);
22+
};
23+
24+
DEFINE_REFCOUNTED_TYPE(TDistributedWriteCookie);
25+
26+
////////////////////////////////////////////////////////////////////////////////
27+
28+
class TDistributedWriteSession
29+
: public NYTree::TYsonStruct
30+
{
31+
public:
32+
REGISTER_YSON_STRUCT(TDistributedWriteSession);
33+
34+
static void Register(TRegistrar registrar);
35+
};
36+
37+
DEFINE_REFCOUNTED_TYPE(TDistributedWriteSession);
38+
39+
////////////////////////////////////////////////////////////////////////////////
40+
41+
} // namespace NYT::NApi

yt/yt/client/api/public.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,9 @@ DECLARE_REFCOUNTED_STRUCT(TBackupManifest)
184184

185185
DECLARE_REFCOUNTED_STRUCT(TListOperationsAccessFilter)
186186

187+
DECLARE_REFCOUNTED_CLASS(TDistributedWriteSession)
188+
DECLARE_REFCOUNTED_CLASS(TDistributedWriteCookie)
189+
187190
////////////////////////////////////////////////////////////////////////////////
188191

189192
inline const TString ClusterNamePath("//sys/@cluster_name");

yt/yt/client/api/rpc_proxy/api_service_proxy.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,12 @@ class TApiServiceProxy
204204
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, AlterQuery);
205205
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetQueryTrackerInfo);
206206

207+
// Distributed table client
208+
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, StartDistributedWriteSession);
209+
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, FinishDistributedWriteSession);
210+
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ParticipantWriteTable,
211+
.SetStreamingEnabled(true));
212+
207213
// Misc
208214
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, CheckClusterLiveness);
209215
};

yt/yt/client/api/rpc_proxy/client_base.cpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "table_writer.h"
1313
#include "transaction.h"
1414

15+
#include <yt/yt/client/api/distributed_table_sessions.h>
1516
#include <yt/yt/client/api/file_reader.h>
1617
#include <yt/yt/client/api/file_writer.h>
1718
#include <yt/yt/client/api/journal_reader.h>
@@ -788,6 +789,38 @@ TFuture<ITableWriterPtr> TClientBase::CreateTableWriter(
788789

789790
////////////////////////////////////////////////////////////////////////////////
790791

792+
TFuture<TDistributedWriteSessionPtr> TClientBase::StartDistributedWriteSession(
793+
const NYPath::TRichYPath& path,
794+
const TDistributedWriteSessionStartOptions& options)
795+
{
796+
using TRsp = TIntrusivePtr<NRpc::TTypedClientResponse<NProto::TRspStartDistributedWriteSession>>;
797+
798+
auto proxy = CreateApiServiceProxy();
799+
800+
auto req = proxy.StartDistributedWriteSession();
801+
FillRequest(req.Get(), path, options);
802+
803+
return req->Invoke()
804+
.ApplyUnique(BIND([] (TRsp&& result) -> TDistributedWriteSessionPtr {
805+
return ConvertTo<TDistributedWriteSessionPtr>(TYsonString(result->session()));
806+
}));
807+
}
808+
809+
TFuture<void> TClientBase::FinishDistributedWriteSession(
810+
TDistributedWriteSessionPtr session,
811+
const TDistributedWriteSessionFinishOptions& options)
812+
{
813+
auto proxy = CreateApiServiceProxy();
814+
815+
auto req = proxy.FinishDistributedWriteSession();
816+
817+
FillRequest(req.Get(), std::move(session), options);
818+
819+
return req->Invoke().AsVoid();
820+
}
821+
822+
////////////////////////////////////////////////////////////////////////////////
823+
791824
TFuture<TUnversionedLookupRowsResult> TClientBase::LookupRows(
792825
const TYPath& path,
793826
TNameTablePtr nameTable,

0 commit comments

Comments
 (0)