Skip to content

Commit 99b3eab

Browse files
robot-pigletblinkov
authored andcommitted
Intermediate changes
commit_hash:e2f7171fc4070059f585443adfecfb9c7ca93e17
1 parent 3cfdc0d commit 99b3eab

24 files changed

+649
-301
lines changed

yql/essentials/utils/runnable.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#include <util/generic/ptr.h>
2+
3+
namespace NYql {
4+
5+
class IRunnable: public TThrRefBase {
6+
public:
7+
using TPtr = THolder<IRunnable>;
8+
9+
virtual ~IRunnable() = default;
10+
11+
virtual void Start() = 0;
12+
13+
virtual void Stop() = 0;
14+
};
15+
16+
} // namespace NYql

yql/essentials/utils/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ SRCS(
2929
resetable_setting.h
3030
retry.cpp
3131
retry.h
32+
runnable.h
3233
sort.cpp
3334
sort.h
3435
swap_bytes.cpp
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
yql_yt_coordinator_client.cpp
5+
)
6+
7+
PEERDIR(
8+
library/cpp/http/simple
9+
library/cpp/threading/future
10+
yt/yql/providers/yt/fmr/coordinator/interface
11+
yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers
12+
yt/yql/providers/yt/fmr/proto
13+
yql/essentials/utils
14+
)
15+
16+
YQL_LAST_ABI_VERSION()
17+
18+
END()
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
#include <library/cpp/http/simple/http_client.h>
2+
3+
#include <yt/yql/providers/yt/fmr/proto/coordinator.pb.h>
4+
#include <yt/yql/providers/yt/fmr/coordinator/interface/proto_helpers/yql_yt_coordinator_proto_helpers.h>
5+
6+
#include <yql/essentials/utils/yql_panic.h>
7+
8+
#include "yql_yt_coordinator_client.h"
9+
10+
namespace NYql::NFmr {
11+
12+
namespace {
13+
14+
class TFmrCoordinatorClient: public IFmrCoordinator {
15+
public:
16+
TFmrCoordinatorClient(const TFmrCoordinatorClientSettings& settings): Host_(settings.Host), Port_(settings.Port)
17+
{
18+
Headers_["Content-Type"] = "application/x-protobuf";
19+
}
20+
21+
NThreading::TFuture<TStartOperationResponse> StartOperation(const TStartOperationRequest& startOperationRequest) override {
22+
NProto::TStartOperationRequest protoStartOperationRequest = StartOperationRequestToProto(startOperationRequest);
23+
TString startOperationRequestUrl = "/operation";
24+
auto httpClient = TKeepAliveHttpClient(Host_, Port_);
25+
TStringStream outputStream;
26+
27+
httpClient.DoPost(startOperationRequestUrl, protoStartOperationRequest.SerializeAsString(), &outputStream, Headers_);
28+
TString serializedResponse = outputStream.ReadAll();
29+
NProto::TStartOperationResponse protoStartOperationResponse;
30+
YQL_ENSURE(protoStartOperationResponse.ParseFromString(serializedResponse));
31+
return NThreading::MakeFuture(StartOperationResponseFromProto(protoStartOperationResponse));
32+
}
33+
34+
NThreading::TFuture<TGetOperationResponse> GetOperation(const TGetOperationRequest& getOperationRequest) override {
35+
TString getOperationRequestUrl = "/operation/" + getOperationRequest.OperationId;
36+
auto httpClient = TKeepAliveHttpClient(Host_, Port_);
37+
TStringStream outputStream;
38+
39+
httpClient.DoGet(getOperationRequestUrl, &outputStream, Headers_);
40+
TString serializedResponse = outputStream.ReadAll();
41+
NProto::TGetOperationResponse protoGetOperationResponse;
42+
YQL_ENSURE(protoGetOperationResponse.ParseFromString(serializedResponse));
43+
return NThreading::MakeFuture(GetOperationResponseFromProto(protoGetOperationResponse));
44+
}
45+
46+
NThreading::TFuture<TDeleteOperationResponse> DeleteOperation(const TDeleteOperationRequest& deleteOperationRequest) override {
47+
TString deleteOperationRequestUrl = "/operation/" + deleteOperationRequest.OperationId;
48+
auto httpClient = TKeepAliveHttpClient(Host_, Port_);
49+
TStringStream outputStream;
50+
51+
httpClient.DoRequest("DELETE", deleteOperationRequestUrl, "", &outputStream, Headers_);
52+
TString serializedResponse = outputStream.ReadAll();
53+
NProto::TDeleteOperationResponse protoDeleteOperationResponse;
54+
YQL_ENSURE(protoDeleteOperationResponse.ParseFromString(serializedResponse));
55+
return NThreading::MakeFuture(DeleteOperationResponseFromProto(protoDeleteOperationResponse));
56+
}
57+
58+
NThreading::TFuture<THeartbeatResponse> SendHeartbeatResponse(const THeartbeatRequest& heartbeatRequest) override {
59+
NProto::THeartbeatRequest protoSendHeartbeatRequest = HeartbeatRequestToProto(heartbeatRequest);
60+
TString sendHearbeatRequestUrl = "/worker_heartbeat";
61+
auto httpClient = TKeepAliveHttpClient(Host_, Port_);
62+
TStringStream outputStream;
63+
64+
httpClient.DoPost(sendHearbeatRequestUrl, protoSendHeartbeatRequest.SerializeAsString(), &outputStream, Headers_);
65+
TString serializedResponse = outputStream.ReadAll();
66+
NProto::THeartbeatResponse protoHeartbeatResponse;
67+
YQL_ENSURE(protoHeartbeatResponse.ParseFromString(serializedResponse));
68+
return NThreading::MakeFuture(HeartbeatResponseFromProto(protoHeartbeatResponse));
69+
}
70+
71+
private:
72+
TString Host_;
73+
ui16 Port_;
74+
TSimpleHttpClient::THeaders Headers_;
75+
};
76+
77+
} // namespace
78+
79+
IFmrCoordinator::TPtr MakeFmrCoordinatorClient(const TFmrCoordinatorClientSettings& settings) {
80+
return MakeIntrusive<TFmrCoordinatorClient>(settings);
81+
}
82+
83+
} // namespace NYql::NFmr
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
2+
3+
namespace NYql::NFmr {
4+
5+
struct TFmrCoordinatorClientSettings {
6+
ui16 Port;
7+
TString Host = "localhost";
8+
};
9+
10+
IFmrCoordinator::TPtr MakeFmrCoordinatorClient(const TFmrCoordinatorClientSettings& settings);
11+
12+
} // namespace NYql::NFmr

yt/yql/providers/yt/fmr/coordinator/impl/ut/ya.make

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ SRCS(
55
)
66

77
PEERDIR(
8-
library/cpp/yt/assert
98
yt/yql/providers/yt/fmr/coordinator/impl
109
yt/yql/providers/yt/fmr/job_factory/impl
1110
yt/yql/providers/yt/fmr/worker/impl

yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#pragma once
2+
13
#include <library/cpp/random_provider/random_provider.h>
24
#include <util/system/mutex.h>
35
#include <util/system/guard.h>
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
yql_yt_coordinator_proto_helpers.cpp
5+
)
6+
7+
PEERDIR(
8+
yt/yql/providers/yt/fmr/coordinator/interface
9+
yt/yql/providers/yt/fmr/proto
10+
yt/yql/providers/yt/fmr/request_options/proto_helpers
11+
)
12+
13+
YQL_LAST_ABI_VERSION()
14+
15+
END()
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
#include "yql_yt_coordinator_proto_helpers.h"
2+
3+
namespace NYql::NFmr {
4+
5+
NProto::THeartbeatRequest HeartbeatRequestToProto(const THeartbeatRequest& heartbeatRequest) {
6+
NProto::THeartbeatRequest protoHeartbeatRequest;
7+
protoHeartbeatRequest.SetWorkerId(heartbeatRequest.WorkerId);
8+
protoHeartbeatRequest.SetVolatileId(heartbeatRequest.VolatileId);
9+
std::vector<NProto::TTaskState> taskStates;
10+
for (size_t i = 0; i < heartbeatRequest.TaskStates.size(); ++i) {
11+
auto protoTaskState = TaskStateToProto(*heartbeatRequest.TaskStates[i]);
12+
protoHeartbeatRequest.AddTaskStates();
13+
protoHeartbeatRequest.MutableTaskStates(i)->Swap(&protoTaskState);
14+
}
15+
return protoHeartbeatRequest;
16+
}
17+
18+
THeartbeatRequest HeartbeatRequestFromProto(const NProto::THeartbeatRequest protoHeartbeatRequest) {
19+
THeartbeatRequest heartbeatRequest;
20+
heartbeatRequest.WorkerId = protoHeartbeatRequest.GetWorkerId();
21+
heartbeatRequest.VolatileId = protoHeartbeatRequest.GetVolatileId();
22+
std::vector<TTaskState::TPtr> taskStates;
23+
for (size_t i = 0; i < protoHeartbeatRequest.TaskStatesSize(); ++i) {
24+
TTaskState curTaskState = TaskStateFromProto(protoHeartbeatRequest.GetTaskStates(i));
25+
taskStates.emplace_back(TIntrusivePtr<TTaskState>(new TTaskState(curTaskState)));
26+
}
27+
heartbeatRequest.TaskStates = taskStates;
28+
return heartbeatRequest;
29+
}
30+
31+
NProto::THeartbeatResponse HeartbeatResponseToProto(const THeartbeatResponse& heartbeatResponse) {
32+
NProto::THeartbeatResponse protoHeartbeatResponse;
33+
for (size_t i = 0; i < heartbeatResponse.TasksToRun.size(); ++i) {
34+
auto protoTask = TaskToProto(*heartbeatResponse.TasksToRun[i]);
35+
auto * taskToRun = protoHeartbeatResponse.AddTasksToRun();
36+
taskToRun->Swap(&protoTask);
37+
}
38+
for (auto& id: heartbeatResponse.TaskToDeleteIds) {
39+
protoHeartbeatResponse.AddTaskToDeleteIds(id);
40+
}
41+
return protoHeartbeatResponse;
42+
}
43+
44+
THeartbeatResponse HeartbeatResponseFromProto(const NProto::THeartbeatResponse& protoHeartbeatResponse) {
45+
THeartbeatResponse heartbeatResponse;
46+
std::vector<TTask::TPtr> tasksToRun;
47+
std::unordered_set<TString> taskToDeleteIds;
48+
for (size_t i = 0; i < protoHeartbeatResponse.TasksToRunSize(); ++i) {
49+
TTask curTask = TaskFromProto(protoHeartbeatResponse.GetTasksToRun(i));
50+
tasksToRun.emplace_back(TIntrusivePtr<TTask>(new TTask(curTask)));
51+
}
52+
for (size_t i = 0; i < protoHeartbeatResponse.TaskToDeleteIdsSize(); ++i) {
53+
taskToDeleteIds.emplace(protoHeartbeatResponse.GetTaskToDeleteIds(i));
54+
}
55+
56+
heartbeatResponse.TasksToRun = tasksToRun;
57+
heartbeatResponse.TaskToDeleteIds = taskToDeleteIds;
58+
return heartbeatResponse;
59+
}
60+
61+
NProto::TStartOperationRequest StartOperationRequestToProto(const TStartOperationRequest& startOperationRequest) {
62+
NProto::TStartOperationRequest protoStartOperationRequest;
63+
protoStartOperationRequest.SetTaskType(static_cast<NProto::ETaskType>(startOperationRequest.TaskType));
64+
auto protoTaskParams = TaskParamsToProto(startOperationRequest.TaskParams);
65+
protoStartOperationRequest.MutableTaskParams()->Swap(&protoTaskParams);
66+
protoStartOperationRequest.SetSessionId(startOperationRequest.SessionId);
67+
if (startOperationRequest.IdempotencyKey) {
68+
protoStartOperationRequest.SetIdempotencyKey(*startOperationRequest.IdempotencyKey);
69+
}
70+
protoStartOperationRequest.SetNumRetries(startOperationRequest.NumRetries);
71+
return protoStartOperationRequest;
72+
}
73+
74+
TStartOperationRequest StartOperationRequestFromProto(const NProto::TStartOperationRequest& protoStartOperationRequest) {
75+
TStartOperationRequest startOperationRequest;
76+
startOperationRequest.TaskType = static_cast<ETaskType>(protoStartOperationRequest.GetTaskType());
77+
startOperationRequest.TaskParams = TaskParamsFromProto(protoStartOperationRequest.GetTaskParams());
78+
startOperationRequest.SessionId = protoStartOperationRequest.GetSessionId();
79+
if (protoStartOperationRequest.HasIdempotencyKey()) {
80+
startOperationRequest.IdempotencyKey = protoStartOperationRequest.GetIdempotencyKey();
81+
}
82+
startOperationRequest.NumRetries = protoStartOperationRequest.GetNumRetries();
83+
return startOperationRequest;
84+
}
85+
86+
NProto::TStartOperationResponse StartOperationResponseToProto(const TStartOperationResponse& startOperationResponse) {
87+
NProto::TStartOperationResponse protoStartOperationResponse;
88+
protoStartOperationResponse.SetOperationId(startOperationResponse.OperationId);
89+
protoStartOperationResponse.SetStatus(static_cast<NProto::EOperationStatus>(startOperationResponse.Status));
90+
return protoStartOperationResponse;
91+
}
92+
93+
TStartOperationResponse StartOperationResponseFromProto(const NProto::TStartOperationResponse& protoStartOperationResponse) {
94+
return TStartOperationResponse{
95+
.Status = static_cast<EOperationStatus>(protoStartOperationResponse.GetStatus()),
96+
.OperationId = protoStartOperationResponse.GetOperationId()
97+
};
98+
}
99+
100+
NProto::TGetOperationResponse GetOperationResponseToProto(const TGetOperationResponse& getOperationResponse) {
101+
NProto::TGetOperationResponse protoGetOperationResponse;
102+
protoGetOperationResponse.SetStatus(static_cast<NProto::EOperationStatus>(getOperationResponse.Status));
103+
for (auto& errorMessage: getOperationResponse.ErrorMessages) {
104+
auto* curError = protoGetOperationResponse.AddErrorMessages();
105+
auto protoError = FmrErrorToProto(errorMessage);
106+
curError->Swap(&protoError);
107+
}
108+
return protoGetOperationResponse;
109+
}
110+
111+
TGetOperationResponse GetOperationResponseFromProto(const NProto::TGetOperationResponse protoGetOperationReponse) {
112+
TGetOperationResponse getOperationResponse;
113+
getOperationResponse.Status = static_cast<EOperationStatus>(protoGetOperationReponse.GetStatus());
114+
std::vector<TFmrError> errorMessages;
115+
for (size_t i = 0; i < protoGetOperationReponse.ErrorMessagesSize(); ++i) {
116+
TFmrError errorMessage = FmrErrorFromProto(protoGetOperationReponse.GetErrorMessages(i));
117+
errorMessages.emplace_back(errorMessage);
118+
}
119+
getOperationResponse.ErrorMessages = errorMessages;
120+
return getOperationResponse;
121+
}
122+
123+
NProto::TDeleteOperationResponse DeleteOperationResponseToProto(const TDeleteOperationResponse& deleteOperationResponse) {
124+
NProto::TDeleteOperationResponse protoDeleteOperationResponse;
125+
protoDeleteOperationResponse.SetStatus(static_cast<NProto::EOperationStatus>(deleteOperationResponse.Status));
126+
return protoDeleteOperationResponse;
127+
}
128+
129+
TDeleteOperationResponse DeleteOperationResponseFromProto(const NProto::TDeleteOperationResponse& protoDeleteOperationResponse) {
130+
return TDeleteOperationResponse{.Status = static_cast<EOperationStatus>(protoDeleteOperationResponse.GetStatus())};
131+
}
132+
133+
} // namespace NYql::NFmr
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#pragma once
2+
3+
#include <yt/yql/providers/yt/fmr/coordinator/interface/yql_yt_coordinator.h>
4+
#include <yt/yql/providers/yt/fmr/proto/coordinator.pb.h>
5+
#include <yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.h>
6+
7+
namespace NYql::NFmr {
8+
9+
NProto::THeartbeatRequest HeartbeatRequestToProto(const THeartbeatRequest& heartbeatRequest);
10+
11+
THeartbeatRequest HeartbeatRequestFromProto(const NProto::THeartbeatRequest protoHeartbeatRequest);
12+
13+
NProto::THeartbeatResponse HeartbeatResponseToProto(const THeartbeatResponse& heartbeatResponse);
14+
15+
THeartbeatResponse HeartbeatResponseFromProto(const NProto::THeartbeatResponse& protoHeartbeatResponse);
16+
17+
NProto::TStartOperationRequest StartOperationRequestToProto(const TStartOperationRequest& startOperationRequest);
18+
19+
TStartOperationRequest StartOperationRequestFromProto(const NProto::TStartOperationRequest& protoStartOperationRequest);
20+
21+
NProto::TStartOperationResponse StartOperationResponseToProto(const TStartOperationResponse& startOperationResponse);
22+
23+
TStartOperationResponse StartOperationResponseFromProto(const NProto::TStartOperationResponse& protoStartOperationResponse);
24+
25+
NProto::TGetOperationResponse GetOperationResponseToProto(const TGetOperationResponse& getOperationResponse);
26+
27+
TGetOperationResponse GetOperationResponseFromProto(const NProto::TGetOperationResponse protoGetOperationReponse);
28+
29+
NProto::TDeleteOperationResponse DeleteOperationResponseToProto(const TDeleteOperationResponse& deleteOperationResponse);
30+
31+
TDeleteOperationResponse DeleteOperationResponseFromProto(const NProto::TDeleteOperationResponse& protoDeleteOperationResponse);
32+
33+
} // namespace NYql::NFmr

0 commit comments

Comments
 (0)