Skip to content

Commit b3bc0f3

Browse files
authored
Merge to analytics stable s3 block writing support (#19714)
2 parents 8731e32 + cc183d2 commit b3bc0f3

File tree

95 files changed

+3913
-691
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+3913
-691
lines changed

ydb/core/grpc_services/query/rpc_fetch_script_results.cpp

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -125,18 +125,14 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
125125
}
126126

127127
bool GetExecutionIdFromRequest() {
128-
try {
129-
TMaybe<TString> executionId = NKqp::ScriptExecutionIdFromOperation(GetProtoRequest()->operation_id());
130-
if (!executionId) {
131-
Reply(Ydb::StatusIds::BAD_REQUEST, "Invalid operation id");
132-
return false;
133-
}
134-
ExecutionId = *executionId;
135-
return true;
136-
} catch (const std::exception& ex) {
137-
Reply(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Invalid operation id: " << ex.what());
128+
TString error;
129+
TMaybe<TString> executionId = NKqp::ScriptExecutionIdFromOperation(GetProtoRequest()->operation_id(), error);
130+
if (!executionId) {
131+
Reply(Ydb::StatusIds::BAD_REQUEST, error);
138132
return false;
139133
}
134+
ExecutionId = *executionId;
135+
return true;
140136
}
141137

142138
private:

ydb/core/kqp/common/kqp_script_executions.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include "kqp_script_executions.h"
22

3+
#include <util/string/builder.h>
4+
35
#include <ydb/public/sdk/cpp/src/library/operation_id/protos/operation_id.pb.h>
46

57
namespace NKikimr::NKqp {
@@ -11,21 +13,29 @@ TString ScriptExecutionOperationFromExecutionId(const TString& executionId) {
1113
return NOperationId::ProtoToString(operationId);
1214
}
1315

14-
TMaybe<TString> ScriptExecutionIdFromOperation(const TString& operationId) {
16+
TMaybe<TString> ScriptExecutionIdFromOperation(const TString& operationId, TString& error) try {
1517
NOperationId::TOperationId operation(operationId);
16-
return ScriptExecutionIdFromOperation(operation);
18+
return ScriptExecutionIdFromOperation(operation, error);
19+
} catch (const std::exception& ex) {
20+
error = TStringBuilder() << "Invalid operation id: " << ex.what();
21+
return Nothing();
1722
}
1823

19-
TMaybe<TString> ScriptExecutionIdFromOperation(const NOperationId::TOperationId& operationId) {
24+
TMaybe<TString> ScriptExecutionIdFromOperation(const NOperationId::TOperationId& operationId, TString& error) try {
2025
if (operationId.GetKind() != NOperationId::TOperationId::SCRIPT_EXECUTION) {
26+
error = TStringBuilder() << "Invalid operation id, expected SCRIPT_EXECUTION = " << static_cast<int>(NOperationId::TOperationId::SCRIPT_EXECUTION) << " kind, got " << static_cast<int>(operationId.GetKind());
2127
return Nothing();
2228
}
2329

2430
const auto& values = operationId.GetValue("id");
2531
if (values.empty() || !values[0]) {
32+
error = TStringBuilder() << "Invalid operation id, please specify key 'id'";
2633
return Nothing();
2734
}
2835
return TString{*values[0]};
36+
} catch (const std::exception& ex) {
37+
error = TStringBuilder() << "Invalid operation id: " << ex.what();
38+
return Nothing();
2939
}
3040

3141
} // namespace NKikimr::NKqp

ydb/core/kqp/common/kqp_script_executions.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
namespace NKikimr::NKqp {
99

1010
TString ScriptExecutionOperationFromExecutionId(const TString& executionId);
11-
TMaybe<TString> ScriptExecutionIdFromOperation(const TString& operationId);
12-
TMaybe<TString> ScriptExecutionIdFromOperation(const NOperationId::TOperationId& operationId);
11+
TMaybe<TString> ScriptExecutionIdFromOperation(const TString& operationId, TString& error);
12+
TMaybe<TString> ScriptExecutionIdFromOperation(const NOperationId::TOperationId& operationId, TString& error);
1313

1414
} // namespace NKikimr::NKqp

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
8888
std::atomic<ui64> MinChannelBufferSize = 0;
8989
std::atomic<ui64> MinMemAllocSize = 1_MB;
9090
std::atomic<ui64> MinMemFreeSize = 32_MB;
91+
std::atomic<ui64> ChannelChunkSizeLimit = 48_MB;
9192

9293
public:
9394
TKqpCaFactory(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
@@ -106,6 +107,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
106107
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
107108
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
108109
MinChannelBufferSize.store(config.GetMinChannelBufferSize());
110+
ChannelChunkSizeLimit.store(config.GetChannelChunkSizeLimit());
109111
MinMemAllocSize.store(config.GetMinMemAllocSize());
110112
MinMemFreeSize.store(config.GetMinMemFreeSize());
111113
}
@@ -142,6 +144,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
142144

143145
memoryLimits.ChannelBufferSize = std::max<ui32>(estimation.ChannelBufferMemoryLimit / std::max<ui32>(1, inputChannelsCount), MinChannelBufferSize.load());
144146
memoryLimits.OutputChunkMaxSize = args.OutputChunkMaxSize;
147+
memoryLimits.ChunkSizeLimit = ChannelChunkSizeLimit.load();
145148
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "channel_info")
146149
("ch_size", estimation.ChannelBufferMemoryLimit)
147150
("ch_count", estimation.ChannelBuffersCount)

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1098,6 +1098,14 @@ class TKqpExecuterBase : public TActor<TDerived> {
10981098
structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(SecureParams).ToJson();
10991099
}
11001100

1101+
ui64 selfNodeIdx = 0;
1102+
for (size_t i = 0; i < resourceSnapshot.size(); ++i) {
1103+
if (resourceSnapshot[i].GetNodeId() == SelfId().NodeId()) {
1104+
selfNodeIdx = i;
1105+
break;
1106+
}
1107+
}
1108+
11011109
TVector<ui64> tasksIds;
11021110

11031111
// generate all tasks
@@ -1117,7 +1125,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
11171125
if (resourceSnapshot.empty()) {
11181126
task.Meta.Type = TTaskMeta::TTaskType::Compute;
11191127
} else {
1120-
task.Meta.NodeId = resourceSnapshot[i % resourceSnapshot.size()].GetNodeId();
1128+
task.Meta.NodeId = resourceSnapshot[(selfNodeIdx + i) % resourceSnapshot.size()].GetNodeId();
11211129
task.Meta.Type = TTaskMeta::TTaskType::Scan;
11221130
}
11231131

ydb/core/kqp/proxy_service/kqp_script_executions.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -990,9 +990,10 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
990990
{}
991991

992992
void Bootstrap() {
993-
TMaybe<TString> executionId = NKqp::ScriptExecutionIdFromOperation(Request->Get()->OperationId);
993+
TString error;
994+
TMaybe<TString> executionId = NKqp::ScriptExecutionIdFromOperation(Request->Get()->OperationId, error);
994995
if (!executionId) {
995-
Reply(Ydb::StatusIds::BAD_REQUEST, "Incorrect operation id");
996+
Reply(Ydb::StatusIds::BAD_REQUEST, error);
996997
return;
997998
}
998999
ExecutionId = *executionId;
@@ -1258,9 +1259,10 @@ class TGetScriptExecutionOperationActor : public TCheckLeaseStatusActorBase {
12581259
{}
12591260

12601261
void OnBootstrap() override {
1261-
TMaybe<TString> executionId = ScriptExecutionIdFromOperation(Request->Get()->OperationId);
1262+
TString error;
1263+
TMaybe<TString> executionId = ScriptExecutionIdFromOperation(Request->Get()->OperationId, error);
12621264
if (!executionId) {
1263-
Reply(Ydb::StatusIds::BAD_REQUEST, "Incorrect operation id");
1265+
Reply(Ydb::StatusIds::BAD_REQUEST, error);
12641266
return;
12651267
}
12661268
ExecutionId = *executionId;
@@ -1600,9 +1602,10 @@ class TCancelScriptExecutionOperationActor : public NActors::TActorBootstrapped<
16001602
{}
16011603

16021604
void Bootstrap() {
1603-
const TMaybe<TString> executionId = NKqp::ScriptExecutionIdFromOperation(Request->Get()->OperationId);
1605+
TString error;
1606+
const TMaybe<TString> executionId = NKqp::ScriptExecutionIdFromOperation(Request->Get()->OperationId, error);
16041607
if (!executionId) {
1605-
return Reply(Ydb::StatusIds::BAD_REQUEST, "Incorrect operation id");
1608+
return Reply(Ydb::StatusIds::BAD_REQUEST, error);
16061609
}
16071610
ExecutionId = *executionId;
16081611

@@ -1616,14 +1619,15 @@ class TCancelScriptExecutionOperationActor : public NActors::TActorBootstrapped<
16161619
hFunc(TEvKqp::TEvCancelScriptExecutionResponse, Handle);
16171620
hFunc(NActors::TEvents::TEvUndelivered, Handle);
16181621
hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
1622+
IgnoreFunc(NActors::TEvInterconnect::TEvNodeConnected);
16191623
)
16201624

16211625
void Handle(TEvPrivate::TEvLeaseCheckResult::TPtr& ev) {
16221626
if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) {
16231627
KQP_PROXY_LOG_D("[TCancelScriptExecutionOperationActor] ExecutionId: " << ExecutionId << ", check lease success");
16241628
RunScriptActor = ev->Get()->RunScriptActorId;
16251629
if (ev->Get()->OperationStatus) {
1626-
Reply(Ydb::StatusIds::PRECONDITION_FAILED); // Already finished.
1630+
Reply(Ydb::StatusIds::PRECONDITION_FAILED, "Script execution operation is already finished");
16271631
} else {
16281632
if (CancelSent) { // We have not found the actor, but after it status of the operation is not defined, something strage happened.
16291633
Reply(Ydb::StatusIds::INTERNAL_ERROR, "Failed to cancel script execution operation");

ydb/core/kqp/ut/federated_query/common/common.cpp

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "common.h"
22

33
#include <library/cpp/testing/unittest/registar.h>
4+
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
45

56
namespace NKikimr::NKqp::NFederatedQueryTest {
67
TString GetSymbolsString(char start, char end, const TString& skip) {
@@ -26,13 +27,34 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
2627
}
2728
}
2829

30+
void WaitResourcesPublish(ui32 nodeId, ui32 expectedNodeCount) {
31+
std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> resourceManager;
32+
while (true) {
33+
if (!resourceManager) {
34+
resourceManager = NKikimr::NKqp::TryGetKqpResourceManager(nodeId);
35+
}
36+
if (resourceManager && resourceManager->GetClusterResources().size() == expectedNodeCount) {
37+
return;
38+
}
39+
Sleep(TDuration::MilliSeconds(10));
40+
}
41+
}
42+
43+
void WaitResourcesPublish(const TKikimrRunner& kikimrRunner) {
44+
const auto& testServer = kikimrRunner.GetTestServer();
45+
const auto nodeCount = testServer.StaticNodes();
46+
for (ui32 nodeId = 0; nodeId < nodeCount; ++nodeId) {
47+
WaitResourcesPublish(testServer.GetRuntime()->GetNodeId(nodeId), nodeCount);
48+
}
49+
}
50+
2951
std::shared_ptr<TKikimrRunner> MakeKikimrRunner(
3052
bool initializeHttpGateway,
3153
NYql::NConnector::IClient::TPtr connectorClient,
3254
NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResolver,
3355
std::optional<NKikimrConfig::TAppConfig> appConfig,
3456
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
35-
const TString& domainRoot)
57+
const TKikimrRunnerOptions& optionst)
3658
{
3759
NKikimrConfig::TFeatureFlags featureFlags;
3860
featureFlags.SetEnableExternalDataSources(true);
@@ -69,7 +91,8 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
6991
.SetKqpSettings({})
7092
.SetS3ActorsFactory(std::move(s3ActorsFactory))
7193
.SetWithSampleTables(false)
72-
.SetDomainRoot(domainRoot);
94+
.SetDomainRoot(optionst.DomainRoot)
95+
.SetNodeCount(optionst.NodeCount);
7396

7497
settings = settings.SetAppConfig(appConfig.value());
7598

ydb/core/kqp/ut/federated_query/common/common.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,19 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
1414
const NYdb::TOperation::TOperationId& operationId,
1515
const NYdb::TDriver& ydbDriver);
1616

17+
void WaitResourcesPublish(ui32 nodeId, ui32 expectedNodeCount);
18+
void WaitResourcesPublish(const TKikimrRunner& kikimrRunner);
19+
20+
struct TKikimrRunnerOptions {
21+
TString DomainRoot = "Root";
22+
ui32 NodeCount = 1;
23+
};
24+
1725
std::shared_ptr<TKikimrRunner> MakeKikimrRunner(
1826
bool initializeHttpGateway = false,
1927
NYql::NConnector::IClient::TPtr connectorClient = nullptr,
2028
NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResolver = nullptr,
2129
std::optional<NKikimrConfig::TAppConfig> appConfig = std::nullopt,
2230
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory = nullptr,
23-
const TString& domainRoot = "Root");
31+
const TKikimrRunnerOptions& options = {});
2432
} // namespace NKikimr::NKqp::NFederatedQueryTest

ydb/core/kqp/ut/federated_query/common/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SRCS(
77
STYLE_CPP()
88

99
PEERDIR(
10+
ydb/core/kqp/rm_service
1011
ydb/core/kqp/ut/common
1112
ydb/library/yql/providers/s3/actors_factory
1213
ydb/public/sdk/cpp/src/client/operation

ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.h>
77
#include <ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.h>
88
#include <ydb/public/api/protos/ydb_query.pb.h>
9+
#include <ydb/public/api/grpc/ydb_operation_v1.grpc.pb.h>
910
#include <ydb/public/api/grpc/ydb_query_v1.grpc.pb.h>
1011
#include <ydb/public/sdk/cpp/src/library/grpc/client/grpc_client_low.h>
1112
#include <ydb-cpp-sdk/client/operation/operation.h>
@@ -489,27 +490,60 @@ namespace NKikimr::NKqp {
489490

490491
// Create trash query
491492
NYdbGrpc::TGRpcClientLow clientLow;
493+
const auto channel = grpc::CreateChannel("localhost:" + ToString(kikimr->GetTestServer().GetGRpcServer().GetPort()), grpc::InsecureChannelCredentials());
494+
const auto queryServiceStub = Ydb::Query::V1::QueryService::NewStub(channel);
495+
const auto operationServiceStub = Ydb::Operation::V1::OperationService::NewStub(channel);
492496

493-
std::shared_ptr<grpc::Channel> channel;
494-
channel = grpc::CreateChannel("localhost:" + ToString(kikimr->GetTestServer().GetGRpcServer().GetPort()), grpc::InsecureChannelCredentials());
495497
{
496-
std::unique_ptr<Ydb::Query::V1::QueryService::Stub> stub;
497-
stub = Ydb::Query::V1::QueryService::NewStub(channel);
498498
grpc::ClientContext context;
499499
Ydb::Query::FetchScriptResultsRequest request;
500500
request.set_operation_id(operationId);
501501
request.set_fetch_token(fetchToken);
502502
Ydb::Query::FetchScriptResultsResponse response;
503-
grpc::Status st = stub->FetchScriptResults(&context, request, &response);
503+
grpc::Status st = queryServiceStub->FetchScriptResults(&context, request, &response);
504504
UNIT_ASSERT(st.ok());
505-
UNIT_ASSERT_EQUAL_C(response.status(), Ydb::StatusIds::BAD_REQUEST, response);
505+
UNIT_ASSERT_VALUES_EQUAL_C(response.status(), Ydb::StatusIds::BAD_REQUEST, response);
506+
}
507+
508+
{
509+
grpc::ClientContext context;
510+
Ydb::Operations::ForgetOperationRequest request;
511+
request.set_id(operationId);
512+
Ydb::Operations::ForgetOperationResponse response;
513+
grpc::Status st = operationServiceStub->ForgetOperation(&context, request, &response);
514+
UNIT_ASSERT(st.ok());
515+
UNIT_ASSERT_VALUES_EQUAL_C(response.status(), Ydb::StatusIds::BAD_REQUEST, response);
516+
}
517+
518+
{
519+
grpc::ClientContext context;
520+
Ydb::Operations::GetOperationRequest request;
521+
request.set_id(operationId);
522+
Ydb::Operations::GetOperationResponse response;
523+
grpc::Status st = operationServiceStub->GetOperation(&context, request, &response);
524+
UNIT_ASSERT(st.ok());
525+
UNIT_ASSERT_VALUES_EQUAL_C(response.operation().status(), Ydb::StatusIds::BAD_REQUEST, response);
526+
}
527+
528+
{
529+
grpc::ClientContext context;
530+
Ydb::Operations::CancelOperationRequest request;
531+
request.set_id(operationId);
532+
Ydb::Operations::CancelOperationResponse response;
533+
grpc::Status st = operationServiceStub->CancelOperation(&context, request, &response);
534+
UNIT_ASSERT(st.ok());
535+
UNIT_ASSERT_VALUES_EQUAL_C(response.status(), Ydb::StatusIds::BAD_REQUEST, response);
506536
}
507537
}
508538

509-
Y_UNIT_TEST(TestFailsOnIncorrectScriptExecutionOperationId) {
539+
Y_UNIT_TEST(TestFailsOnIncorrectScriptExecutionOperationId1) {
510540
TestFailsOnIncorrectScriptExecutionOperation("trash", "");
511541
}
512542

543+
Y_UNIT_TEST(TestFailsOnIncorrectScriptExecutionOperationId2) {
544+
TestFailsOnIncorrectScriptExecutionOperation("ydb://scriptexec/9?fd=b214872a-d040e60d-62a1b34-a9be3c3d", "trash");
545+
}
546+
513547
Y_UNIT_TEST(TestFailsOnIncorrectScriptExecutionFetchToken) {
514548
TestFailsOnIncorrectScriptExecutionOperation("", "trash");
515549
}

0 commit comments

Comments
 (0)