1
1
#include " kqp_balance_transaction.h"
2
- #include " ydb/core/kqp/common/simple/services.h"
3
2
#include " kafka_consumer_groups_metadata_initializers.h"
4
3
#include " kafka_consumer_members_metadata_initializers.h"
5
4
6
5
7
- namespace NKikimr ::NGRpcProxy::V1 {
6
+ namespace NKafka {
8
7
9
8
TKqpTxHelper::TKqpTxHelper (TString database)
10
9
: DataBase(database)
11
10
{}
12
11
13
- void TKqpTxHelper::SendCreateSessionRequest (const TActorContext& ctx) {
12
+ void TKqpTxHelper::SendCreateSessionRequest (const NActors:: TActorContext& ctx) {
14
13
auto ev = MakeCreateSessionRequest ();
15
- ctx.Send (NKqp::MakeKqpProxyID (ctx.SelfID .NodeId ()), ev.Release (), 0 , 0 );
14
+ ctx.Send (NKikimr:: NKqp::MakeKqpProxyID (ctx.SelfID .NodeId ()), ev.Release (), 0 , 0 );
16
15
}
17
16
18
17
void TKqpTxHelper::BeginTransaction (ui64 cookie, const NActors::TActorContext& ctx) {
19
- auto begin = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
18
+ auto begin = MakeHolder<NKikimr:: NKqp::TEvKqp::TEvQueryRequest>();
20
19
21
20
begin->Record .MutableRequest ()->SetAction (NKikimrKqp::QUERY_ACTION_BEGIN_TX);
22
21
begin->Record .MutableRequest ()->MutableTxControl ()->mutable_begin_tx ()->mutable_serializable_read_write ();
23
22
begin->Record .MutableRequest ()->SetSessionId (KqpSessionId);
24
23
begin->Record .MutableRequest ()->SetDatabase (DataBase);
25
24
26
- ctx.Send (NKqp::MakeKqpProxyID (ctx.SelfID .NodeId ()), begin.Release (), 0 , cookie);
25
+ ctx.Send (NKikimr:: NKqp::MakeKqpProxyID (ctx.SelfID .NodeId ()), begin.Release (), 0 , cookie);
27
26
}
28
27
29
- bool TKqpTxHelper::HandleCreateSessionResponse (NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext&) {
28
+ bool TKqpTxHelper::HandleCreateSessionResponse (NKikimr:: NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors:: TActorContext&) {
30
29
const auto & record = ev->Get ()->Record ;
31
30
32
31
if (record.GetYdbStatus () != Ydb::StatusIds::SUCCESS) {
@@ -39,32 +38,32 @@ bool TKqpTxHelper::HandleCreateSessionResponse(NKqp::TEvKqp::TEvCreateSessionRes
39
38
return true ;
40
39
}
41
40
42
- void TKqpTxHelper::CloseKqpSession (const TActorContext& ctx) {
41
+ void TKqpTxHelper::CloseKqpSession (const NActors:: TActorContext& ctx) {
43
42
if (KqpSessionId) {
44
43
auto ev = MakeCloseSessionRequest ();
45
- ctx.Send (NKqp::MakeKqpProxyID (ctx.SelfID .NodeId ()), ev.Release (), 0 , 0 );
44
+ ctx.Send (NKikimr:: NKqp::MakeKqpProxyID (ctx.SelfID .NodeId ()), ev.Release (), 0 , 0 );
46
45
KqpSessionId = " " ;
47
46
}
48
47
}
49
48
50
- THolder<NKqp::TEvKqp::TEvCreateSessionRequest> TKqpTxHelper::MakeCreateSessionRequest () {
51
- auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
49
+ THolder<NKikimr:: NKqp::TEvKqp::TEvCreateSessionRequest> TKqpTxHelper::MakeCreateSessionRequest () {
50
+ auto ev = MakeHolder<NKikimr:: NKqp::TEvKqp::TEvCreateSessionRequest>();
52
51
ev->Record .MutableRequest ()->SetDatabase (DataBase);
53
52
return ev;
54
53
}
55
54
56
- THolder<NKqp::TEvKqp::TEvCloseSessionRequest> TKqpTxHelper::MakeCloseSessionRequest () {
57
- auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
55
+ THolder<NKikimr:: NKqp::TEvKqp::TEvCloseSessionRequest> TKqpTxHelper::MakeCloseSessionRequest () {
56
+ auto ev = MakeHolder<NKikimr:: NKqp::TEvKqp::TEvCloseSessionRequest>();
58
57
ev->Record .MutableRequest ()->SetSessionId (KqpSessionId);
59
58
return ev;
60
59
}
61
60
62
- void TKqpTxHelper::SendRequest (THolder<NKqp::TEvKqp::TEvQueryRequest> request, ui64 cookie, const NActors::TActorContext& ctx) {
63
- ctx.Send (NKqp::MakeKqpProxyID (ctx.SelfID .NodeId ()), request.Release (), 0 , cookie);
61
+ void TKqpTxHelper::SendRequest (THolder<NKikimr:: NKqp::TEvKqp::TEvQueryRequest> request, ui64 cookie, const NActors::TActorContext& ctx) {
62
+ ctx.Send (NKikimr:: NKqp::MakeKqpProxyID (ctx.SelfID .NodeId ()), request.Release (), 0 , cookie);
64
63
}
65
64
66
65
void TKqpTxHelper::SendYqlRequest (const TString& yqlRequest, NYdb::TParams sqlParams, ui64 cookie, const NActors::TActorContext& ctx, bool commit) {
67
- auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
66
+ auto ev = MakeHolder<NKikimr:: NKqp::TEvKqp::TEvQueryRequest>();
68
67
69
68
ev->Record .MutableRequest ()->SetAction (NKikimrKqp::QUERY_ACTION_EXECUTE);
70
69
ev->Record .MutableRequest ()->SetType (NKikimrKqp::QUERY_TYPE_SQL_DML);
@@ -81,30 +80,30 @@ void TKqpTxHelper::SendYqlRequest(const TString& yqlRequest, NYdb::TParams sqlPa
81
80
ev->Record .MutableRequest ()->MutableQueryCachePolicy ()->set_keep_in_cache (true );
82
81
83
82
ev->Record .MutableRequest ()->MutableYdbParameters ()->swap (*(NYdb::TProtoAccessor::GetProtoMapPtr (sqlParams)));
84
- ctx.Send (NKqp::MakeKqpProxyID (ctx.SelfID .NodeId ()), ev.Release (), 0 , cookie);
83
+ ctx.Send (NKikimr:: NKqp::MakeKqpProxyID (ctx.SelfID .NodeId ()), ev.Release (), 0 , cookie);
85
84
}
86
85
87
86
void TKqpTxHelper::CommitTx (ui64 cookie, const NActors::TActorContext& ctx) {
88
- auto commit = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
87
+ auto commit = MakeHolder<NKikimr:: NKqp::TEvKqp::TEvQueryRequest>();
89
88
90
89
commit->Record .MutableRequest ()->SetAction (NKikimrKqp::QUERY_ACTION_COMMIT_TX);
91
90
commit->Record .MutableRequest ()->MutableTxControl ()->set_tx_id (TxId);
92
91
commit->Record .MutableRequest ()->MutableTxControl ()->set_commit_tx (true );
93
92
commit->Record .MutableRequest ()->SetSessionId (KqpSessionId);
94
93
commit->Record .MutableRequest ()->SetDatabase (DataBase);
95
94
96
- ctx.Send (NKqp::MakeKqpProxyID (ctx.SelfID .NodeId ()), commit.Release (), 0 , cookie);
95
+ ctx.Send (NKikimr:: NKqp::MakeKqpProxyID (ctx.SelfID .NodeId ()), commit.Release (), 0 , cookie);
97
96
}
98
97
99
- void TKqpTxHelper::SendInitTablesRequest (const TActorContext& ctx) {
98
+ void TKqpTxHelper::SendInitTablesRequest (const NActors:: TActorContext& ctx) {
100
99
ctx.Send (
101
- NMetadata::NProvider::MakeServiceId (ctx.SelfID .NodeId ()),
102
- new NMetadata::NProvider::TEvPrepareManager (NGRpcProxy::V1::TKafkaConsumerMembersMetaInitManager::GetInstant ())
100
+ NKikimr:: NMetadata::NProvider::MakeServiceId (ctx.SelfID .NodeId ()),
101
+ new NKikimr:: NMetadata::NProvider::TEvPrepareManager (NKikimr:: NGRpcProxy::V1::TKafkaConsumerMembersMetaInitManager::GetInstant ())
103
102
);
104
103
105
104
ctx.Send (
106
- NMetadata::NProvider::MakeServiceId (ctx.SelfID .NodeId ()),
107
- new NMetadata::NProvider::TEvPrepareManager (NGRpcProxy::V1::TKafkaConsumerGroupsMetaInitManager::GetInstant ())
105
+ NKikimr:: NMetadata::NProvider::MakeServiceId (ctx.SelfID .NodeId ()),
106
+ new NKikimr:: NMetadata::NProvider::TEvPrepareManager (NKikimr:: NGRpcProxy::V1::TKafkaConsumerGroupsMetaInitManager::GetInstant ())
108
107
);
109
108
}
110
109
0 commit comments