Skip to content

Commit 475d5eb

Browse files
authored
Added the transfer data from topics to tables (more cherry picks) (#18421)
2 parents 30b1a69 + 193fb86 commit 475d5eb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+3662
-255
lines changed

ydb/apps/ydbd/main.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "export.h"
22
#include <ydb/core/driver_lib/run/main.h>
33
#include <ydb/core/security/ticket_parser.h>
4+
#include <ydb/core/transfer/transfer_writer.h>
45
#include <ydb/core/tx/schemeshard/schemeshard_operation_factory.h>
56
#include <ydb/core/ymq/actor/auth_multi_factory.h>
67
#include <ydb/core/ymq/base/events_writer.h>
@@ -23,6 +24,7 @@ int main(int argc, char **argv) {
2324
factories->SqsEventsWriterFactory = std::make_shared<TSqsEventsWriterFactory>();
2425
factories->SchemeOperationFactory.reset(NKikimr::NSchemeShard::DefaultOperationFactory());
2526
factories->ConfigSwissKnife = NKikimr::NYamlConfig::CreateDefaultConfigSwissKnife();
27+
factories->TransferWriterFactory = std::make_shared<NKikimr::NReplication::NTransfer::TTransferWriterFactory>();
2628

2729
return ParameterizedMain(argc, argv, std::move(factories));
2830
}

ydb/core/backup/impl/local_partition_reader.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
#include "local_partition_reader.h"
22
#include "logging.h"
33

4-
#include <ydb/library/actors/core/actor.h>
5-
#include <ydb/library/services/services.pb.h>
6-
74
#include <ydb/core/persqueue/events/global.h>
85
#include <ydb/core/protos/grpc_pq_old.pb.h>
96
#include <ydb/core/tx/replication/service/worker.h>
7+
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
8+
#include <ydb/library/actors/core/actor.h>
9+
#include <ydb/library/services/services.pb.h>
1010

1111
using namespace NActors;
1212
using namespace NKikimr::NReplication::NService;
@@ -131,11 +131,11 @@ class TLocalPartitionReader
131131
}
132132

133133
auto gotOffset = Offset;
134-
TVector<TEvWorker::TEvData::TRecord> records(::Reserve(readResult.ResultSize()));
134+
TVector<NReplication::TTopicMessage> records(::Reserve(readResult.ResultSize()));
135135

136136
for (auto& result : readResult.GetResult()) {
137137
gotOffset = std::max(gotOffset, result.GetOffset());
138-
records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData(), TInstant::MilliSeconds(result.GetCreateTimestampMS()), result.GetSourceId(), result.GetSourceId(), result.GetSeqNo());
138+
records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData());
139139
}
140140
SentOffset = gotOffset + 1;
141141

ydb/core/backup/impl/local_partition_reader_ut.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ Y_UNIT_TEST_SUITE(LocalPartitionReader) {
4646
auto data = runtime.GrabEdgeEventRethrow<TEvWorker::TEvData>(handle);
4747
UNIT_ASSERT_VALUES_EQUAL(data->Source, PARTITION_STR);
4848
UNIT_ASSERT_VALUES_EQUAL(data->Records.size(), 2);
49-
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].Offset, INITIAL_OFFSET + dataPatternCookie * 2);
50-
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].Data, Sprintf("1-%d", dataPatternCookie));
51-
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].Offset, INITIAL_OFFSET + dataPatternCookie * 2 + 1);
52-
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].Data, Sprintf("2-%d", dataPatternCookie));
49+
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].GetOffset(), INITIAL_OFFSET + dataPatternCookie * 2);
50+
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].GetData(), Sprintf("1-%d", dataPatternCookie));
51+
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].GetOffset(), INITIAL_OFFSET + dataPatternCookie * 2 + 1);
52+
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].GetData(), Sprintf("2-%d", dataPatternCookie));
5353
}
5454

5555
TEvPersQueue::TEvResponse* GenerateData(ui32 dataPatternCookie) {

ydb/core/driver_lib/run/run.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,9 +1126,10 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig)
11261126
AppData->IoContextFactory = ModuleFactories ? ModuleFactories->IoContextFactory.get() : nullptr;
11271127
AppData->SchemeOperationFactory = ModuleFactories ? ModuleFactories->SchemeOperationFactory.get() : nullptr;
11281128
AppData->ConfigSwissKnife = ModuleFactories ? ModuleFactories->ConfigSwissKnife.get() : nullptr;
1129-
if (ModuleFactories) {
1130-
AppData->TransferWriterFactory = ModuleFactories->TransferWriterFactory;
1131-
}
1129+
1130+
AppData->TransferWriterFactory = ModuleFactories
1131+
? ModuleFactories->TransferWriterFactory
1132+
: nullptr;
11321133

11331134
AppData->SqsAuthFactory = ModuleFactories
11341135
? ModuleFactories->SqsAuthFactory.get()

ydb/core/driver_lib/run/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ PEERDIR(
102102
ydb/core/tablet_flat
103103
ydb/core/test_tablet
104104
ydb/core/tracing
105+
ydb/core/transfer
105106
ydb/core/tx
106107
ydb/core/tx/columnshard
107108
ydb/core/tx/conveyor/service

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8126,8 +8126,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
81268126
)", kikimr.GetEndpoint().c_str());
81278127

81288128
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
8129-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
8130-
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "The transfer destination path '/Root/table_not_exists' not found");
8129+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
8130+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "Path does not exist");
81318131
}
81328132

81338133
// positive
@@ -8400,8 +8400,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
84008400
)", kikimr.GetEndpoint().c_str());
84018401

84028402
const auto result = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
8403-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
8404-
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "The transfer destination path '/Root/table_not_exists' not found");
8403+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
8404+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "Path does not exist");
84058405
}
84068406

84078407
// positive
@@ -8507,7 +8507,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
85078507

85088508
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
85098509
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
8510-
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "Check failed: path: '/Root/transfer', error: path hasn't been resolved, nearest resolved path: '/Root'");
8510+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "path hasn't been resolved");
85118511
}
85128512

85138513
{
@@ -8865,7 +8865,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
88658865

88668866
const auto result = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
88678867
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
8868-
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "Check failed: path: '/Root/transfer', error: path hasn't been resolved, nearest resolved path: '/Root'");
8868+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToOneLineString(), "path hasn't been resolved");
88698869
}
88708870

88718871
{

ydb/core/persqueue/pq_impl.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1802,8 +1802,6 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext&
18021802
{
18031803
PQ_LOG_T("Handle TEvPersQueue::TEvStatus");
18041804

1805-
ReadBalancerActorId = ev->Sender;
1806-
18071805
if (!ConfigInited || !AllOriginalPartitionsInited()) {
18081806
PQ_LOG_D("Postpone the request." <<
18091807
" ConfigInited " << static_cast<int>(ConfigInited) <<

ydb/core/persqueue/read_balancer.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,8 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA
534534
} else {
535535
TActorId pipeClient = GetPipeClient(tabletId, ctx);
536536

537+
NTabletPipe::SendData(ctx, pipeClient, new TEvPQ::TEvSubDomainStatus(SubDomainOutOfSpace));
538+
537539
auto it = AggregatedStats.Cookies.find(tabletId);
538540
if (!pipeReconnected || it != AggregatedStats.Cookies.end()) {
539541
ui64 cookie;
@@ -548,8 +550,6 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA
548550
TStringBuilder() << "Send TEvPersQueue::TEvStatus TabletId: " << tabletId << " Cookie: " << cookie);
549551
NTabletPipe::SendData(ctx, pipeClient, new TEvPersQueue::TEvStatus("", true), cookie);
550552
}
551-
552-
NTabletPipe::SendData(ctx, pipeClient, new TEvPQ::TEvSubDomainStatus(SubDomainOutOfSpace));
553553
}
554554
}
555555

0 commit comments

Comments
 (0)