Skip to content

Commit f1d45a2

Browse files
authored
Fixed StartProducerWithCDSAndPreferUnknownCluster test (#17585)
1 parent f70ce55 commit f1d45a2

File tree

6 files changed

+39
-8
lines changed

6 files changed

+39
-8
lines changed

ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ namespace NGRpcProxy {
1111
static const int CLUSTERS_UPDATER_TIMEOUT_ON_ERROR = 1;
1212

1313

14-
TClustersUpdater::TClustersUpdater(IPQClustersUpdaterCallback* callback)
14+
TClustersUpdater::TClustersUpdater(IPQClustersUpdaterCallback* callback, TStatus::TPtr& status)
1515
: Callback(callback)
16+
, Status(status)
1617
{};
1718

1819
void TClustersUpdater::Bootstrap(const NActors::TActorContext& ctx) {
@@ -34,15 +35,21 @@ void TClustersUpdater::Handle(TEvPQClustersUpdater::TEvUpdateClusters::TPtr&, co
3435
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), req.Release());
3536
}
3637

37-
void TClustersUpdater::Handle(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate::TPtr& ev, const TActorContext&) {
38+
void TClustersUpdater::Handle(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate::TPtr& ev, const TActorContext& ctx) {
39+
TGuard<TSpinLock> guard(Status->Lock);
40+
if (!Status->Running) {
41+
return Die(ctx);
42+
}
3843

3944
Callback->NetClassifierUpdated(ev->Get()->Classifier);
4045
}
4146

42-
43-
44-
4547
void TClustersUpdater::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const TActorContext &ctx) {
48+
TGuard<TSpinLock> guard(Status->Lock);
49+
if (!Status->Running) {
50+
return Die(ctx);
51+
}
52+
4653
auto& record = ev->Get()->Record;
4754

4855
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {

ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,19 @@ class IPQClustersUpdaterCallback {
4747

4848
class TClustersUpdater : public NActors::TActorBootstrapped<TClustersUpdater> {
4949
public:
50-
TClustersUpdater(IPQClustersUpdaterCallback* callback);
50+
struct TStatus {
51+
using TPtr = std::shared_ptr<TStatus>;
52+
53+
bool Running = true;
54+
TSpinLock Lock;
55+
56+
void Stop() {
57+
TGuard guard(Lock);
58+
Running = false;
59+
}
60+
};
61+
62+
TClustersUpdater(IPQClustersUpdaterCallback* callback, TStatus::TPtr& status);
5163

5264
void Bootstrap(const NActors::TActorContext& ctx);
5365

@@ -58,6 +70,7 @@ class TClustersUpdater : public NActors::TActorBootstrapped<TClustersUpdater> {
5870
TString LocalCluster;
5971
TVector<TString> Clusters;
6072
bool Enabled = false;
73+
TStatus::TPtr Status;
6174

6275
STFUNC(StateFunc) {
6376
switch (ev->GetTypeRewrite()) {

ydb/services/deprecated/persqueue_v0/grpc_pq_read.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ TPQReadService::TPQReadService(NKikimr::NGRpcService::TGRpcPersQueueService* ser
193193
);
194194

195195
if (NeedDiscoverClusters) {
196-
ActorSystem->Register(new TClustersUpdater(this));
196+
ClustersUpdaterStatus = std::make_shared<TClustersUpdater::TStatus>();
197+
ActorSystem->Register(new TClustersUpdater(this, ClustersUpdaterStatus));
197198
}
198199
}
199200

ydb/services/deprecated/persqueue_v0/grpc_pq_read.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ class TPQReadService : public IPQClustersUpdaterCallback, public std::enable_sha
7979

8080
void StopService() {
8181
AtomicSet(ShuttingDown_, 1);
82+
if (ClustersUpdaterStatus) {
83+
ClustersUpdaterStatus->Stop();
84+
}
8285
}
8386

8487
bool IsShuttingDown() const {
@@ -139,6 +142,8 @@ class TPQReadService : public IPQClustersUpdaterCallback, public std::enable_sha
139142
NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier; // Detects client's datacenter by IP. May be null
140143

141144
bool NeedDiscoverClusters;
145+
TClustersUpdater::TStatus::TPtr ClustersUpdaterStatus;
146+
142147
NPersQueue::TConverterFactoryPtr TopicConverterFactory;
143148
std::unique_ptr<NPersQueue::TTopicsListController> TopicsHandler;
144149
};

ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,8 @@ void TPQWriteServiceImpl::InitClustersUpdater()
151151
TAppData* appData = ActorSystem->AppData<TAppData>();
152152
NeedDiscoverClusters = !appData->PQConfig.GetTopicsAreFirstClassCitizen();
153153
if (NeedDiscoverClusters) {
154-
ActorSystem->Register(new TClustersUpdater(this));
154+
ClustersUpdaterStatus = std::make_shared<TClustersUpdater::TStatus>();
155+
ActorSystem->Register(new TClustersUpdater(this, ClustersUpdaterStatus));
155156
}
156157
}
157158

ydb/services/deprecated/persqueue_v0/grpc_pq_write.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ class TPQWriteServiceImpl : public IPQClustersUpdaterCallback, public std::enabl
6969

7070
void StopService() {
7171
AtomicSet(ShuttingDown_, 1);
72+
if (ClustersUpdaterStatus) {
73+
ClustersUpdaterStatus->Stop();
74+
}
7275
}
7376

7477
bool IsShuttingDown() const {
@@ -116,6 +119,7 @@ class TPQWriteServiceImpl : public IPQClustersUpdaterCallback, public std::enabl
116119
TAtomic ShuttingDown_ = 0;
117120

118121
bool NeedDiscoverClusters; // Legacy mode OR account-mode in multi-cluster setup;
122+
TClustersUpdater::TStatus::TPtr ClustersUpdaterStatus;
119123

120124
NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier; // Detects client's datacenter by IP. May be null
121125
};

0 commit comments

Comments
 (0)