Skip to content

Commit 97167eb

Browse files
authored
Add OLAP support with topics in BufferWriteActor (#12686)
1 parent 308501b commit 97167eb

File tree

4 files changed

+211
-46
lines changed

4 files changed

+211
-46
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1419,7 +1419,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
14191419
State = EState::WRITING;
14201420
Alloc->Release();
14211421
Counters->BufferActorsCount->Inc();
1422-
TxManager->AddTopicsToShards();
14231422
}
14241423

14251424
void Bootstrap() {

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1378,6 +1378,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
13781378
&& !txCtx->BufferActorId
13791379
&& (txCtx->HasTableWrite || request.TopicOperations.GetSize() != 0)) {
13801380
txCtx->TxManager->SetTopicOperations(std::move(request.TopicOperations));
1381+
txCtx->TxManager->AddTopicsToShards();
13811382

13821383
TKqpBufferWriterSettings settings {
13831384
.SessionActorId = SelfId(),
@@ -1388,6 +1389,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
13881389
};
13891390
auto* actor = CreateKqpBufferWriterActor(std::move(settings));
13901391
txCtx->BufferActorId = RegisterWithSameMailbox(actor);
1392+
} else if (Settings.TableService.GetEnableOltpSink() && txCtx->BufferActorId) {
1393+
txCtx->TxManager->SetTopicOperations(std::move(request.TopicOperations));
1394+
txCtx->TxManager->AddTopicsToShards();
13911395
}
13921396

13931397
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,

ydb/core/testlib/test_client.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,18 @@ namespace Tests {
231231
AppConfig->MutableTableServiceConfig()->SetEnableOltpSink(withOltpSink);
232232
return *this;
233233
}
234+
TServerSettings& SetEnableOlapSink(bool withOlapSink) {
235+
AppConfig->MutableTableServiceConfig()->SetEnableOlapSink(withOlapSink);
236+
return *this;
237+
}
238+
TServerSettings& SetEnableHtapTx(bool withHtapTx) {
239+
AppConfig->MutableTableServiceConfig()->SetEnableHtapTx(withHtapTx);
240+
return *this;
241+
}
242+
TServerSettings& SetAllowOlapDataQuery(bool withAllowOlapDataQuery) {
243+
AppConfig->MutableTableServiceConfig()->SetAllowOlapDataQuery(withAllowOlapDataQuery);
244+
return *this;
245+
}
234246

235247

236248
// Add additional grpc services

0 commit comments

Comments
 (0)