Skip to content

Commit 2df27f3

Browse files
committed
[ydb-sdk] add federated topic client GetAllClusters method (#16070)
1 parent 5430875 commit 2df27f3

File tree

5 files changed

+96
-0
lines changed

5 files changed

+96
-0
lines changed

include/ydb-cpp-sdk/client/federated_topic/federated_topic.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ using TDbInfo = Ydb::FederationDiscovery::DatabaseInfo;
1515

1616
using TSessionClosedEvent = NTopic::TSessionClosedEvent;
1717

18+
using TAsyncDescribeTopicResult = NTopic::TAsyncDescribeTopicResult;
19+
1820
//! Federated partition session.
1921
struct TFederatedPartitionSession : public TThrRefBase, public TPrintable<TFederatedPartitionSession> {
2022
using TPtr = TIntrusivePtr<TFederatedPartitionSession>;
@@ -516,6 +518,32 @@ class TFederatedTopicClient {
516518
// std::shared_ptr<NTopic::ISimpleBlockingWriteSession> CreateSimpleBlockingWriteSession(const TFederatedWriteSessionSettings& settings);
517519
std::shared_ptr<NTopic::IWriteSession> CreateWriteSession(const TFederatedWriteSessionSettings& settings);
518520

521+
struct TClusterInfo {
522+
enum class EStatus : int {
523+
STATUS_UNSPECIFIED,
524+
AVAILABLE,
525+
READ_ONLY,
526+
UNAVAILABLE,
527+
};
528+
std::string Name;
529+
std::string Endpoint;
530+
std::string Path;
531+
EStatus Status;
532+
// TODO: Id, Weight, ...?
533+
//! Replaces Endpoint and Database for federated clusters
534+
void AdjustTopicClientSettings(NTopic::TTopicClientSettings& settings) const;
535+
//! Prepend Database for federated clusters
536+
void AdjustTopicPath(std::string& path) const;
537+
//! Usable for at least read operations
538+
bool IsAvailableForRead() const;
539+
bool IsAvailableForWrite() const;
540+
};
541+
542+
//! Discover all clusters for federated topic.
543+
// Will return single cluster with empty name for non-federated clusters.
544+
// May return empty list if FederatedTopicClient was destroyed when future fired.
545+
NThreading::TFuture<std::vector<TClusterInfo>> GetAllClusterInfo();
546+
519547
protected:
520548
void OverrideCodec(NTopic::ECodec codecId, std::unique_ptr<NTopic::ICodec>&& codecImpl);
521549

src/client/federated_topic/impl/federated_topic.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,33 @@ void TFederatedTopicClient::OverrideCodec(NTopic::ECodec codecId, std::unique_pt
8686
return Impl_->OverrideCodec(codecId, std::move(codecImpl));
8787
}
8888

89+
NThreading::TFuture<std::vector<TFederatedTopicClient::TClusterInfo>> TFederatedTopicClient::GetAllClusterInfo() {
90+
return Impl_->GetAllClusterInfo();
91+
}
92+
93+
void TFederatedTopicClient::TClusterInfo::AdjustTopicClientSettings(NTopic::TTopicClientSettings& settings) const {
94+
if (Name.empty()) {
95+
return;
96+
}
97+
settings.DiscoveryEndpoint(Endpoint);
98+
settings.Database(Path);
99+
}
100+
101+
void TFederatedTopicClient::TClusterInfo::AdjustTopicPath(std::string& path) const {
102+
if (Name.empty()) {
103+
return;
104+
}
105+
if (path.empty() || path[0] != '/') {
106+
path = Path + '/' + path;
107+
}
108+
}
109+
110+
bool TFederatedTopicClient::TClusterInfo::IsAvailableForRead() const {
111+
return Status == TClusterInfo::EStatus::AVAILABLE || Status == TClusterInfo::EStatus::READ_ONLY;
112+
}
113+
114+
bool TFederatedTopicClient::TClusterInfo::IsAvailableForWrite() const {
115+
return Status == TClusterInfo::EStatus::AVAILABLE;
116+
}
117+
89118
} // namespace NYdb::NFederatedTopic

src/client/federated_topic/impl/federated_topic_impl.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,40 @@ void TFederatedTopicClient::TImpl::InitObserver() {
4949
}
5050
}
5151

52+
NThreading::TFuture<std::vector<TFederatedTopicClient::TClusterInfo>> TFederatedTopicClient::TImpl::GetAllClusterInfo() {
53+
InitObserver();
54+
return Observer->WaitForFirstState().Apply(
55+
[weakObserver = std::weak_ptr(Observer)] (const auto& ) {
56+
std::vector<TClusterInfo> result;
57+
auto observer = weakObserver.lock();
58+
if (!observer) {
59+
return result;
60+
}
61+
auto state = observer->GetState();
62+
result.reserve(state->DbInfos.size());
63+
for (const auto& db: state->DbInfos) {
64+
auto& dbinfo = result.emplace_back();
65+
switch (db->status()) {
66+
#define TRANSLATE_STATUS(NAME) \
67+
case TDbInfo::Status::DatabaseInfo_Status_##NAME: \
68+
dbinfo.Status = TClusterInfo::EStatus::NAME; \
69+
break
70+
TRANSLATE_STATUS(STATUS_UNSPECIFIED);
71+
TRANSLATE_STATUS(AVAILABLE);
72+
TRANSLATE_STATUS(READ_ONLY);
73+
TRANSLATE_STATUS(UNAVAILABLE);
74+
default:
75+
Y_ENSURE(false /* impossible status */);
76+
}
77+
#undef TRANSLATE_STATUS
78+
dbinfo.Name = db->name();
79+
dbinfo.Endpoint = db->endpoint();
80+
dbinfo.Path = db->path();
81+
}
82+
return result;
83+
});
84+
}
85+
5286
auto TFederatedTopicClient::TImpl::GetSubsessionHandlersExecutor() -> NTopic::IExecutor::TPtr {
5387
with_lock (Lock) {
5488
if (!SubsessionHandlersExecutor) {

src/client/federated_topic/impl/federated_topic_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ class TFederatedTopicClient::TImpl {
6565
std::shared_ptr<NTopic::ISimpleBlockingWriteSession> CreateSimpleBlockingWriteSession(const TFederatedWriteSessionSettings& settings);
6666
std::shared_ptr<NTopic::IWriteSession> CreateWriteSession(const TFederatedWriteSessionSettings& settings);
6767

68+
NThreading::TFuture<std::vector<TFederatedTopicClient::TClusterInfo>> GetAllClusterInfo();
69+
6870
std::shared_ptr<TFederatedDbObserver> GetObserver() {
6971
std::lock_guard guard(Lock);
7072
return Observer;

src/client/federated_topic/impl/federation_observer.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,9 @@ IOutputStream& operator<<(IOutputStream& out, TFederatedDbState const& state) {
197197
}
198198
out << " ]";
199199
}
200+
if (!state.ControlPlaneEndpoint.empty()) {
201+
out << " ControlPlaneEndpoint: " << state.ControlPlaneEndpoint;
202+
}
200203
return out << " }";
201204
}
202205

0 commit comments

Comments
 (0)