Skip to content

Commit e6f602e

Browse files
committed
YQ-3560 RowDispatcher: local mode to use in dqrun (ydb-platform#10072)
1 parent 3b10297 commit e6f602e

File tree

10 files changed

+28
-20
lines changed

10 files changed

+28
-20
lines changed

ydb/core/fq/libs/config/protos/row_dispatcher.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import "ydb/core/fq/libs/config/protos/storage.proto";
1111
message TRowDispatcherCoordinatorConfig {
1212
TYdbStorageConfig Database = 1;
1313
string CoordinationNodePath = 2;
14+
bool LocalMode = 3; // Use only local row_dispatcher.
1415
}
1516
message TRowDispatcherConfig {
1617
bool Enabled = 1;
@@ -19,5 +20,4 @@ message TRowDispatcherConfig {
1920
uint64 MaxSessionUsedMemory = 4;
2021
bool WithoutConsumer = 5;
2122
TRowDispatcherCoordinatorConfig Coordinator = 6;
22-
2323
}

ydb/core/fq/libs/init/init.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,6 @@ void Init(
192192
if (protoConfig.GetRowDispatcher().GetEnabled()) {
193193
auto rowDispatcher = NFq::NewRowDispatcherService(
194194
protoConfig.GetRowDispatcher(),
195-
protoConfig.GetCommon(),
196195
NKikimr::CreateYdbCredentialsProviderFactory,
197196
yqSharedResources,
198197
credentialsFactory,

ydb/core/fq/libs/row_dispatcher/leader_election.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,10 @@ void TLeaderElection::Bootstrap() {
222222
Become(&TLeaderElection::StateFunc);
223223
LogPrefix = "TLeaderElection " + SelfId().ToString() + " ";
224224
LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped, local coordinator id " << CoordinatorId.ToString());
225+
if (Config.GetLocalMode()) {
226+
TActivationContext::ActorSystem()->Send(ParentId, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(CoordinatorId));
227+
return;
228+
}
225229
ProcessState();
226230
}
227231

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
108108

109109

110110
NConfig::TRowDispatcherConfig Config;
111-
NConfig::TCommonConfig CommonConfig;
112111
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
113112
TYqSharedResources::TPtr YqSharedResources;
114113
TMaybe<TActorId> CoordinatorActorId;
@@ -171,7 +170,6 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
171170
public:
172171
explicit TRowDispatcher(
173172
const NConfig::TRowDispatcherConfig& config,
174-
const NConfig::TCommonConfig& commonConfig,
175173
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
176174
const TYqSharedResources::TPtr& yqSharedResources,
177175
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
@@ -234,15 +232,13 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
234232

235233
TRowDispatcher::TRowDispatcher(
236234
const NConfig::TRowDispatcherConfig& config,
237-
const NConfig::TCommonConfig& commonConfig,
238235
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
239236
const TYqSharedResources::TPtr& yqSharedResources,
240237
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
241238
const TString& tenant,
242239
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
243240
const ::NMonitoring::TDynamicCounterPtr& counters)
244241
: Config(config)
245-
, CommonConfig(commonConfig)
246242
, CredentialsProviderFactory(credentialsProviderFactory)
247243
, YqSharedResources(yqSharedResources)
248244
, CredentialsFactory(credentialsFactory)
@@ -586,7 +582,6 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvPrintState::TPtr&) {
586582

587583
std::unique_ptr<NActors::IActor> NewRowDispatcher(
588584
const NConfig::TRowDispatcherConfig& config,
589-
const NConfig::TCommonConfig& commonConfig,
590585
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
591586
const TYqSharedResources::TPtr& yqSharedResources,
592587
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
@@ -596,7 +591,6 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher(
596591
{
597592
return std::unique_ptr<NActors::IActor>(new TRowDispatcher(
598593
config,
599-
commonConfig,
600594
credentialsProviderFactory,
601595
yqSharedResources,
602596
credentialsFactory,

ydb/core/fq/libs/row_dispatcher/row_dispatcher.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ namespace NFq {
1616

1717
std::unique_ptr<NActors::IActor> NewRowDispatcher(
1818
const NConfig::TRowDispatcherConfig& config,
19-
const NConfig::TCommonConfig& commonConfig,
2019
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
2120
const TYqSharedResources::TPtr& yqSharedResources,
2221
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,

ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ using namespace NActors;
1111

1212
std::unique_ptr<NActors::IActor> NewRowDispatcherService(
1313
const NConfig::TRowDispatcherConfig& config,
14-
const NConfig::TCommonConfig& commonConfig,
1514
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
1615
const TYqSharedResources::TPtr& yqSharedResources,
1716
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
@@ -20,7 +19,6 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService(
2019
{
2120
return NewRowDispatcher(
2221
config,
23-
commonConfig,
2422
credentialsProviderFactory,
2523
yqSharedResources,
2624
credentialsFactory,

ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ namespace NFq {
1616

1717
std::unique_ptr<NActors::IActor> NewRowDispatcherService(
1818
const NConfig::TRowDispatcherConfig& config,
19-
const NConfig::TCommonConfig& commonConfig,
2019
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
2120
const TYqSharedResources::TPtr& yqSharedResources,
2221
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,

ydb/core/fq/libs/row_dispatcher/ut/leader_election_ut.cpp

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,18 @@ class TFixture : public NUnitTest::TBaseFixture {
2323
Runtime.Initialize(app->Unwrap());
2424
Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_DEBUG);
2525
auto credFactory = NKikimr::CreateYdbCredentialsProviderFactory;
26-
auto yqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive<NMonitoring::TDynamicCounters>()));
26+
YqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive<NMonitoring::TDynamicCounters>()));
2727

2828
RowDispatcher = Runtime.AllocateEdgeActor();
2929
Coordinator1 = Runtime.AllocateEdgeActor();
3030
Coordinator2 = Runtime.AllocateEdgeActor();
3131
Coordinator3 = Runtime.AllocateEdgeActor();
32+
}
3233

34+
void Init(bool localMode = false) {
3335
NConfig::TRowDispatcherCoordinatorConfig config;
3436
config.SetCoordinationNodePath("row_dispatcher");
37+
config.SetLocalMode(localMode);
3538
auto& database = *config.MutableDatabase();
3639
database.SetEndpoint(GetEnv("YDB_ENDPOINT"));
3740
database.SetDatabase(GetEnv("YDB_DATABASE"));
@@ -42,7 +45,7 @@ class TFixture : public NUnitTest::TBaseFixture {
4245
Coordinator1,
4346
config,
4447
NKikimr::CreateYdbCredentialsProviderFactory,
45-
yqSharedResources,
48+
YqSharedResources,
4649
"/tenant",
4750
MakeIntrusive<NMonitoring::TDynamicCounters>()
4851
).release());
@@ -52,7 +55,7 @@ class TFixture : public NUnitTest::TBaseFixture {
5255
Coordinator2,
5356
config,
5457
NKikimr::CreateYdbCredentialsProviderFactory,
55-
yqSharedResources,
58+
YqSharedResources,
5659
"/tenant",
5760
MakeIntrusive<NMonitoring::TDynamicCounters>()
5861
).release());
@@ -62,7 +65,7 @@ class TFixture : public NUnitTest::TBaseFixture {
6265
Coordinator3,
6366
config,
6467
NKikimr::CreateYdbCredentialsProviderFactory,
65-
yqSharedResources,
68+
YqSharedResources,
6669
"/tenant",
6770
MakeIntrusive<NMonitoring::TDynamicCounters>()
6871
).release());
@@ -95,10 +98,12 @@ class TFixture : public NUnitTest::TBaseFixture {
9598
NActors::TActorId Coordinator2;
9699
NActors::TActorId Coordinator3;
97100
NActors::TActorId LeaderDetector;
101+
TYqSharedResources::TPtr YqSharedResources;
98102
};
99103

100104
Y_UNIT_TEST_SUITE(LeaderElectionTests) {
101105
Y_UNIT_TEST_F(Test1, TFixture) {
106+
Init();
102107

103108
auto coordinatorId1 = ExpectCoordinatorChanged();
104109
auto coordinatorId2 = ExpectCoordinatorChanged();
@@ -134,7 +139,15 @@ Y_UNIT_TEST_SUITE(LeaderElectionTests) {
134139
auto coordinatorId6 = ExpectCoordinatorChanged();
135140
UNIT_ASSERT(coordinatorId6 != coordinatorId4);
136141
}
137-
}
138142

143+
Y_UNIT_TEST_F(TestLocalMode, TFixture) {
144+
Init(true);
145+
auto coordinatorId1 = ExpectCoordinatorChanged();
146+
auto coordinatorId2 = ExpectCoordinatorChanged();
147+
auto coordinatorId3 = ExpectCoordinatorChanged();
148+
TSet<NActors::TActorId> set {coordinatorId1, coordinatorId2, coordinatorId3};
149+
UNIT_ASSERT(set.size() == 3);
150+
}
139151
}
140152

153+
}

ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ class TFixture : public NUnitTest::TBaseFixture {
6161
database.SetDatabase("YDB_DATABASE");
6262
database.SetToken("");
6363

64-
NConfig::TCommonConfig commonConfig;
6564
auto credFactory = NKikimr::CreateYdbCredentialsProviderFactory;
6665
auto yqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive<NMonitoring::TDynamicCounters>()));
6766

@@ -74,7 +73,6 @@ class TFixture : public NUnitTest::TBaseFixture {
7473

7574
RowDispatcher = Runtime.Register(NewRowDispatcher(
7675
config,
77-
commonConfig,
7876
NKikimr::CreateYdbCredentialsProviderFactory,
7977
yqSharedResources,
8078
credentialsFactory,

ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class TBlockingEQueue {
7878

7979
class TFileTopicReadSession : public NYdb::NTopic::IReadSession {
8080

81-
constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(100);
81+
constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
8282

8383
public:
8484
TFileTopicReadSession(TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = ""):
@@ -182,10 +182,14 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(100);
182182
TString rawMsg;
183183
TVector<TMessage> msgs;
184184
size_t size = 0;
185+
ui64 maxBatchRowSize = 100;
185186

186187
while (size_t read = fi.ReadLine(rawMsg)) {
187188
msgs.emplace_back(MakeNextMessage(rawMsg));
188189
MsgOffset_++;
190+
if (!maxBatchRowSize--) {
191+
break;
192+
}
189193
size += rawMsg.size();
190194
}
191195
if (!msgs.empty()) {

0 commit comments

Comments
 (0)