Skip to content

Commit fd35bc3

Browse files
committed
fix
1 parent 5e51b67 commit fd35bc3

10 files changed

+74
-73
lines changed

ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
#include "kafka_balancer_actor.h"
2-
#include "kqp_balance_transaction.h"
32

43
namespace NKafka {
54

ydb/core/kafka_proxy/actors/kafka_balancer_actor.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
#pragma once
22

33
#include "actors.h"
4-
#include "kafka_consumer_groups_metadata_initializers.h"
5-
#include "kafka_consumer_members_metadata_initializers.h"
6-
#include "kqp_balance_transaction.h"
4+
#include "../kafka_consumer_groups_metadata_initializers.h"
5+
#include "../kafka_consumer_members_metadata_initializers.h"
6+
#include "../kqp_balance_transaction.h"
77

88
#include <ydb/core/base/tablet_pipe.h>
99
#include <ydb/core/kafka_proxy/kafka_events.h>

ydb/core/kafka_proxy/actors/kqp_balance_transaction.h

Lines changed: 0 additions & 43 deletions
This file was deleted.

ydb/core/kafka_proxy/actors/kafka_consumer_groups_metadata_initializers.cpp renamed to ydb/core/kafka_proxy/kafka_consumer_groups_metadata_initializers.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#include "kafka_consumer_groups_metadata_initializers.h"
2-
#include "kafka_balancer_actor.h"
2+
#include "actors/kafka_balancer_actor.h"
33

44
namespace NKikimr::NGRpcProxy::V1 {
55

ydb/core/kafka_proxy/actors/kafka_consumer_members_metadata_initializers.cpp renamed to ydb/core/kafka_proxy/kafka_consumer_members_metadata_initializers.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#include "kafka_consumer_members_metadata_initializers.h"
2-
#include "kafka_balancer_actor.h"
2+
#include "actors/kafka_balancer_actor.h"
33

44
namespace NKikimr::NGRpcProxy::V1 {
55

ydb/core/kafka_proxy/actors/kqp_balance_transaction.cpp renamed to ydb/core/kafka_proxy/kqp_balance_transaction.cpp

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,23 @@ TKqpTxHelper::TKqpTxHelper(TString database)
99
: DataBase(database)
1010
{}
1111

12-
void TKqpTxHelper::SendCreateSessionRequest(const NActors::TActorContext& ctx) {
12+
void TKqpTxHelper::SendCreateSessionRequest(const TActorContext& ctx) {
1313
auto ev = MakeCreateSessionRequest();
14-
ctx.Send(NKikimr::NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0);
14+
ctx.Send(MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0);
1515
}
1616

17-
void TKqpTxHelper::BeginTransaction(ui64 cookie, const NActors::TActorContext& ctx) {
18-
auto begin = MakeHolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest>();
17+
void TKqpTxHelper::BeginTransaction(ui64 cookie, const TActorContext& ctx) {
18+
auto begin = MakeHolder<TEvKqp::TEvQueryRequest>();
1919

2020
begin->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_BEGIN_TX);
2121
begin->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
2222
begin->Record.MutableRequest()->SetSessionId(KqpSessionId);
2323
begin->Record.MutableRequest()->SetDatabase(DataBase);
2424

25-
ctx.Send(NKikimr::NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), begin.Release(), 0, cookie);
25+
ctx.Send(MakeKqpProxyID(ctx.SelfID.NodeId()), begin.Release(), 0, cookie);
2626
}
2727

28-
bool TKqpTxHelper::HandleCreateSessionResponse(NKikimr::NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext&) {
28+
bool TKqpTxHelper::HandleCreateSessionResponse(TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext&) {
2929
const auto& record = ev->Get()->Record;
3030

3131
if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
@@ -38,32 +38,32 @@ bool TKqpTxHelper::HandleCreateSessionResponse(NKikimr::NKqp::TEvKqp::TEvCreateS
3838
return true;
3939
}
4040

41-
void TKqpTxHelper::CloseKqpSession(const NActors::TActorContext& ctx) {
41+
void TKqpTxHelper::CloseKqpSession(const TActorContext& ctx) {
4242
if (KqpSessionId) {
4343
auto ev = MakeCloseSessionRequest();
44-
ctx.Send(NKikimr::NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0);
44+
ctx.Send(MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0);
4545
KqpSessionId = "";
4646
}
4747
}
4848

49-
THolder<NKikimr::NKqp::TEvKqp::TEvCreateSessionRequest> TKqpTxHelper::MakeCreateSessionRequest() {
50-
auto ev = MakeHolder<NKikimr::NKqp::TEvKqp::TEvCreateSessionRequest>();
49+
THolder<TEvKqp::TEvCreateSessionRequest> TKqpTxHelper::MakeCreateSessionRequest() {
50+
auto ev = MakeHolder<TEvKqp::TEvCreateSessionRequest>();
5151
ev->Record.MutableRequest()->SetDatabase(DataBase);
5252
return ev;
5353
}
5454

55-
THolder<NKikimr::NKqp::TEvKqp::TEvCloseSessionRequest> TKqpTxHelper::MakeCloseSessionRequest() {
56-
auto ev = MakeHolder<NKikimr::NKqp::TEvKqp::TEvCloseSessionRequest>();
55+
THolder<TEvKqp::TEvCloseSessionRequest> TKqpTxHelper::MakeCloseSessionRequest() {
56+
auto ev = MakeHolder<TEvKqp::TEvCloseSessionRequest>();
5757
ev->Record.MutableRequest()->SetSessionId(KqpSessionId);
5858
return ev;
5959
}
6060

61-
void TKqpTxHelper::SendRequest(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request, ui64 cookie, const NActors::TActorContext& ctx) {
62-
ctx.Send(NKikimr::NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), request.Release(), 0, cookie);
61+
void TKqpTxHelper::SendRequest(THolder<TEvKqp::TEvQueryRequest> request, ui64 cookie, const TActorContext& ctx) {
62+
ctx.Send(MakeKqpProxyID(ctx.SelfID.NodeId()), request.Release(), 0, cookie);
6363
}
6464

65-
void TKqpTxHelper::SendYqlRequest(const TString& yqlRequest, NYdb::TParams sqlParams, ui64 cookie, const NActors::TActorContext& ctx, bool commit) {
66-
auto ev = MakeHolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest>();
65+
void TKqpTxHelper::SendYqlRequest(const TString& yqlRequest, NYdb::TParams sqlParams, ui64 cookie, const TActorContext& ctx, bool commit) {
66+
auto ev = MakeHolder<TEvKqp::TEvQueryRequest>();
6767

6868
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
6969
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
@@ -80,22 +80,22 @@ void TKqpTxHelper::SendYqlRequest(const TString& yqlRequest, NYdb::TParams sqlPa
8080
ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
8181

8282
ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(sqlParams)));
83-
ctx.Send(NKikimr::NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, cookie);
83+
ctx.Send(MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, cookie);
8484
}
8585

86-
void TKqpTxHelper::CommitTx(ui64 cookie, const NActors::TActorContext& ctx) {
87-
auto commit = MakeHolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest>();
86+
void TKqpTxHelper::CommitTx(ui64 cookie, const TActorContext& ctx) {
87+
auto commit = MakeHolder<TEvKqp::TEvQueryRequest>();
8888

8989
commit->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_COMMIT_TX);
9090
commit->Record.MutableRequest()->MutableTxControl()->set_tx_id(TxId);
9191
commit->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
9292
commit->Record.MutableRequest()->SetSessionId(KqpSessionId);
9393
commit->Record.MutableRequest()->SetDatabase(DataBase);
9494

95-
ctx.Send(NKikimr::NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), commit.Release(), 0, cookie);
95+
ctx.Send(MakeKqpProxyID(ctx.SelfID.NodeId()), commit.Release(), 0, cookie);
9696
}
9797

98-
void TKqpTxHelper::SendInitTablesRequest(const NActors::TActorContext& ctx) {
98+
void TKqpTxHelper::SendInitTablesRequest(const TActorContext& ctx) {
9999
ctx.Send(
100100
NKikimr::NMetadata::NProvider::MakeServiceId(ctx.SelfID.NodeId()),
101101
new NKikimr::NMetadata::NProvider::TEvPrepareManager(NKikimr::NGRpcProxy::V1::TKafkaConsumerMembersMetaInitManager::GetInstant())
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#pragma once
2+
3+
#include <ydb/core/client/server/msgbus_server_pq_metacache.h>
4+
#include <ydb/core/grpc_services/rpc_deferrable.h>
5+
#include <ydb/core/kqp/common/events/events.h>
6+
#include "ydb/core/kqp/common/simple/services.h"
7+
#include <ydb/core/persqueue/events/global.h>
8+
9+
namespace NKafka {
10+
11+
using namespace NKikimr;
12+
using namespace NKikimr::NKqp;
13+
using namespace NActors;
14+
15+
class TKqpTxHelper {
16+
public:
17+
TKqpTxHelper(TString database);
18+
void SendCreateSessionRequest(const TActorContext& ctx);
19+
void BeginTransaction(ui64 cookie, const TActorContext& ctx);
20+
bool HandleCreateSessionResponse(TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx);
21+
void CloseKqpSession(const TActorContext& ctx);
22+
void SendRequest(THolder<TEvKqp::TEvQueryRequest> request, ui64 cookie, const TActorContext& ctx);
23+
void CommitTx(ui64 cookie, const TActorContext& ctx);
24+
void SendYqlRequest(const TString& yqlRequest, NYdb::TParams sqlParams, ui64 cookie, const TActorContext& ctx, bool commit = false);
25+
void SendInitTablesRequest(const TActorContext& ctx);
26+
void SetTxId(const TString& txId);
27+
void ResetTxId();
28+
29+
public:
30+
TString DataBase;
31+
TString TxId;
32+
33+
private:
34+
THolder<TEvKqp::TEvCreateSessionRequest> MakeCreateSessionRequest();
35+
THolder<TEvKqp::TEvCloseSessionRequest> MakeCloseSessionRequest();
36+
37+
38+
private:
39+
TString Consumer;
40+
TString Path;
41+
42+
TString KqpSessionId;
43+
};
44+
45+
} // namespace NKikimr::NGRpcProxy::V1

ydb/core/kafka_proxy/ya.make

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@ SRCS(
1818
actors/kafka_create_topics_actor.cpp
1919
actors/kafka_create_partitions_actor.cpp
2020
actors/kafka_alter_configs_actor.cpp
21-
actors/kafka_consumer_groups_metadata_initializers.cpp
22-
actors/kafka_consumer_members_metadata_initializers.cpp
23-
actors/kqp_balance_transaction.cpp
2421
actors/kafka_balance_actor_sql.cpp
2522
actors/kafka_balancer_actor.cpp
2623
kafka_connection.cpp
@@ -38,6 +35,9 @@ SRCS(
3835
kafka_records.cpp
3936
kafka_consumer_protocol.cpp
4037
kafka_metrics.cpp
38+
kqp_balance_transaction.cpp
39+
kafka_consumer_groups_metadata_initializers.cpp
40+
kafka_consumer_members_metadata_initializers.cpp
4141
)
4242

4343
GENERATE_ENUM_SERIALIZATION(kafka.h)

0 commit comments

Comments
 (0)