Skip to content

Commit b8d3bc8

Browse files
qyryqGazizonoki
authored andcommitted
Moved "ydb_federated_topic: use settings from DbDriverState_ in OnDescribePartition" commit from ydb repo
1 parent e02d4ba commit b8d3bc8

File tree

5 files changed

+81
-5
lines changed

5 files changed

+81
-5
lines changed

src/client/federated_topic/impl/federated_read_session.cpp

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,19 @@ void TFederatedReadSessionImpl::Start() {
9898
}
9999

100100
void TFederatedReadSessionImpl::OpenSubSessionsImpl(const std::vector<std::shared_ptr<TDbInfo>>& dbInfos) {
101-
for (const auto& db : dbInfos) {
101+
{
102+
TStringBuilder log(GetLogPrefix());
103+
log << "Open read subsessions to databases: ";
104+
bool first = true;
105+
for (const auto& db : dbInfos) {
106+
if (first) first = false; else log << ", ";
107+
log << "{ name: " << db->name()
108+
<< ", endpoint: " << db->endpoint()
109+
<< ", path: " << db->path() << " }";
110+
}
111+
LOG_LAZY(Log, TLOG_INFO, log);
112+
}
113+
for (const auto& db : dbInfos) {
102114
NTopic::TTopicClientSettings settings = SubClientSettings;
103115
settings
104116
.Database(db->path())
@@ -113,7 +125,7 @@ void TFederatedReadSessionImpl::OpenSubSessionsImpl(const std::vector<std::share
113125

114126
void TFederatedReadSessionImpl::OnFederatedStateUpdateImpl() {
115127
if (!FederationState->Status.IsSuccess()) {
116-
LOG_LAZY(Log, TLOG_ERR, GetLogPrefix() << "Federated state update failed.");
128+
LOG_LAZY(Log, TLOG_ERR, GetLogPrefix() << "Federated state update failed. FederationState: " << *FederationState);
117129
CloseImpl();
118130
return;
119131
}
@@ -151,6 +163,27 @@ void TFederatedReadSessionImpl::OnFederatedStateUpdateImpl() {
151163
// TODO: investigate here, why empty list?
152164
// Reason (and returned status) could be BAD_REQUEST or UNAVAILABLE.
153165
LOG_LAZY(Log, TLOG_ERR, GetLogPrefix() << "No available databases to read.");
166+
auto issues = FederationState->Status.GetIssues();
167+
TStringBuilder issue;
168+
issue << "Requested databases {";
169+
bool first = true;
170+
for (auto const& dbFromSettings : Settings.GetDatabasesToReadFrom()) {
171+
if (first) first = false; else issue << ",";
172+
issue << " " << dbFromSettings;
173+
}
174+
issue << " } not found. Available databases {";
175+
first = true;
176+
for (auto const& db : FederationState->DbInfos) {
177+
if (db->status() == Ydb::FederationDiscovery::DatabaseInfo_Status_AVAILABLE) {
178+
if (first) first = false; else issue << ",";
179+
issue << " { name: " << db->name()
180+
<< ", endpoint: " << db->endpoint()
181+
<< ", path: " << db->path() << " }";
182+
}
183+
}
184+
issue << " }";
185+
issues.AddIssue(issue);
186+
FederationState->Status = TStatus(EStatus::BAD_REQUEST, std::move(issues));
154187
CloseImpl();
155188
return;
156189
}

src/client/federated_topic/impl/federated_write_session.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
#include <src/client/persqueue_core/impl/log_lazy.h>
44
#include <src/client/topic/impl/topic_impl.h>
55

6+
#define INCLUDE_YDB_INTERNAL_H
7+
#include <src/client/impl/ydb_internal/logger/log.h>
8+
#undef INCLUDE_YDB_INTERNAL_H
9+
610
#include <ydb-cpp-sdk/library/threading/future/future.h>
711

812
#include <algorithm>
@@ -32,11 +36,16 @@ TFederatedWriteSession::TFederatedWriteSession(const TFederatedWriteSessionSetti
3236
, Observer(std::move(observer))
3337
, AsyncInit(Observer->WaitForFirstState())
3438
, FederationState(nullptr)
39+
, Log(Connections->GetLog())
3540
, ClientEventsQueue(std::make_shared<NTopic::TWriteSessionEventsQueue>(Settings))
3641
, BufferFreeSpace(Settings.MaxMemoryUsage_)
3742
{
3843
}
3944

45+
TStringBuilder TFederatedWriteSession::GetLogPrefix() const {
46+
return TStringBuilder() << GetDatabaseLogPrefix(SubClientSetttings.Database_.value_or("")) << "[" << SessionId << "] ";
47+
}
48+
4049
void TFederatedWriteSession::Start() {
4150
// TODO validate settings?
4251
Settings.EventHandlers_.HandlersExecutor_->Start();
@@ -165,6 +174,10 @@ void TFederatedWriteSession::OnFederatedStateUpdateImpl() {
165174
}
166175

167176
if (!DatabasesAreSame(preferrableDb, CurrentDatabase)) {
177+
LOG_LAZY(Log, TLOG_INFO, GetLogPrefix()
178+
<< "Start federated write session to database '" << preferrableDb->name()
179+
<< "' (previous was " << (CurrentDatabase ? CurrentDatabase->name() : "<empty>") << ")"
180+
<< " FederationState: " << *FederationState);
168181
OpenSubSessionImpl(preferrableDb);
169182
}
170183

src/client/federated_topic/impl/federated_write_session.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ class TFederatedWriteSession : public NTopic::IWriteSession,
9090

9191
void CloseImpl(EStatus statusCode, NYql::TIssues&& issues);
9292

93+
TStringBuilder GetLogPrefix() const;
94+
9395
private:
9496
// For subsession creation
9597
const NTopic::TFederatedWriteSessionSettings Settings;
@@ -102,6 +104,8 @@ class TFederatedWriteSession : public NTopic::IWriteSession,
102104
std::shared_ptr<TFederatedDbState> FederationState;
103105
NYdbGrpc::IQueueClientContextPtr UpdateStateDelayContext;
104106

107+
TLog Log;
108+
105109
std::shared_ptr<TDbInfo> CurrentDatabase;
106110

107111
std::string SessionId;

src/client/federated_topic/impl/federation_observer.cpp

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <src/client/persqueue_core/impl/log_lazy.h>
12
#include <src/api/grpc/ydb_federation_discovery_v1.grpc.pb.h>
23

34
#include <src/client/federated_topic/impl/federation_observer.h>
@@ -131,14 +132,16 @@ void TFederatedDbObserverImpl::OnFederationDiscovery(TStatus&& status, Ydb::Fede
131132
// 1) The request was meant for a non-federated topic: fall back to single db mode.
132133
// 2) The database path in the request is simply wrong: the client should get the BAD_REQUEST status.
133134
if (status.GetStatus() == EStatus::CLIENT_CALL_UNIMPLEMENTED || status.GetStatus() == EStatus::BAD_REQUEST) {
134-
// fall back to single db mode
135+
LOG_LAZY(DbDriverState_->Log, TLOG_INFO, TStringBuilder()
136+
<< "OnFederationDiscovery fall back to single mode, database=" << DbDriverState_->Database);
135137
FederatedDbState->Status = TPlainStatus{}; // SUCCESS
138+
FederatedDbState->ControlPlaneEndpoint = DbDriverState_->DiscoveryEndpoint;
136139
auto dbState = Connections_->GetDriverState(std::nullopt, std::nullopt, std::nullopt, std::nullopt, std::nullopt);
137140
FederatedDbState->ControlPlaneEndpoint = dbState->DiscoveryEndpoint;
138141
// FederatedDbState->SelfLocation = ???;
139142
auto db = std::make_shared<Ydb::FederationDiscovery::DatabaseInfo>();
140-
db->set_path(dbState->Database);
141-
db->set_endpoint(dbState->DiscoveryEndpoint);
143+
db->set_path(DbDriverState_->Database);
144+
db->set_endpoint(DbDriverState_->DiscoveryEndpoint);
142145
db->set_status(Ydb::FederationDiscovery::DatabaseInfo_Status_AVAILABLE);
143146
db->set_weight(100);
144147
FederatedDbState->DbInfos.emplace_back(std::move(db));
@@ -170,4 +173,25 @@ void TFederatedDbObserverImpl::OnFederationDiscovery(TStatus&& status, Ydb::Fede
170173
}
171174
}
172175

176+
IOutputStream& operator<<(IOutputStream& out, TFederatedDbState const& state) {
177+
out << "{ Status: " << state.Status.GetStatus();
178+
if (auto const& issues = state.Status.GetIssues(); !issues.Empty()) {
179+
out << ", Issues: { " << issues.ToOneLineString() << " }";
180+
}
181+
if (!state.DbInfos.empty()) {
182+
out << ", DbInfos: { ";
183+
bool first = true;
184+
for (auto const& info : state.DbInfos) {
185+
if (first) {
186+
first = false;
187+
} else {
188+
out << ", ";
189+
}
190+
out << "{ " << info->ShortDebugString() << " }";
191+
}
192+
out << " }";
193+
}
194+
return out << " }";
195+
}
196+
173197
} // namespace NYdb::NFederatedTopic

src/client/federated_topic/impl/federation_observer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ struct TFederatedDbState {
5151
}
5252
return nullptr;
5353
}
54+
55+
friend IOutputStream& operator<<(IOutputStream& out, TFederatedDbState const& state);
5456
};
5557

5658

0 commit comments

Comments
 (0)