@@ -129,50 +129,92 @@ void TFederatedWriteSessionImpl::OpenSubSessionImpl(std::shared_ptr<TDbInfo> db)
129
129
CurrentDatabase = db;
130
130
}
131
131
132
- std::shared_ptr<TDbInfo> TFederatedWriteSessionImpl::SelectDatabaseImpl () {
133
- std::vector<std::shared_ptr<TDbInfo>> availableDatabases;
132
+ std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabaseByHash (
133
+ NTopic::TFederatedWriteSessionSettings const & settings,
134
+ std::vector<std::shared_ptr<TDbInfo>> const & dbInfos
135
+ ) {
134
136
ui64 totalWeight = 0 ;
137
+ std::vector<std::shared_ptr<TDbInfo>> available;
135
138
136
- for (const auto & db : FederationState->DbInfos ) {
137
- if (db->status () != TDbInfo::Status::DatabaseInfo_Status_AVAILABLE) {
138
- continue ;
139
- }
140
-
141
- if (Settings.PreferredDatabase_ && (AsciiEqualsIgnoreCase (db->name (), *Settings.PreferredDatabase_ ) ||
142
- AsciiEqualsIgnoreCase (db->id (), *Settings.PreferredDatabase_ ))) {
143
- return db;
144
- } else if (AsciiEqualsIgnoreCase (FederationState->SelfLocation , db->location ())) {
145
- return db;
146
- } else {
147
- availableDatabases.push_back (db);
139
+ for (const auto & db : dbInfos) {
140
+ if (db->status () == TDbInfo::Status::DatabaseInfo_Status_AVAILABLE) {
141
+ available.push_back (db);
148
142
totalWeight += db->weight ();
149
143
}
150
144
}
151
145
152
- if (availableDatabases.empty () || totalWeight == 0 ) {
153
- // close session, return error
154
- return nullptr ;
146
+ if (available.empty () || totalWeight == 0 ) {
147
+ return {nullptr , EStatus::NOT_FOUND};
155
148
}
156
149
157
- std::sort (availableDatabases.begin (), availableDatabases.end (), [](const std::shared_ptr<TDbInfo>& lhs, const std::shared_ptr<TDbInfo>& rhs){
158
- return lhs->weight () > rhs->weight ()
159
- || lhs->weight () == rhs->weight () && lhs->name () < rhs->name ();
160
- });
150
+ std::sort (available.begin (), available.end (), [](auto const & lhs, auto const & rhs) { return lhs->name () < rhs->name (); });
161
151
162
- ui64 hashValue = THash<std::string>()(Settings .Path_ );
163
- hashValue = CombineHashes (hashValue, THash<std::string>()(Settings .ProducerId_ ));
152
+ ui64 hashValue = THash<std::string>()(settings .Path_ );
153
+ hashValue = CombineHashes (hashValue, THash<std::string>()(settings .ProducerId_ ));
164
154
hashValue %= totalWeight;
165
155
166
156
ui64 borderWeight = 0 ;
167
- for (const auto & db : availableDatabases ) {
157
+ for (auto const & db : available ) {
168
158
borderWeight += db->weight ();
169
159
if (hashValue < borderWeight) {
170
- return db ;
160
+ return {db, EStatus::SUCCESS} ;
171
161
}
172
162
}
173
163
Y_UNREACHABLE ();
174
164
}
175
165
166
+ std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabase (
167
+ NTopic::TFederatedWriteSessionSettings const & settings,
168
+ std::vector<std::shared_ptr<TDbInfo>> const & dbInfos, std::string const & selfLocation
169
+ ) {
170
+ /* Logic of the function should follow this table:
171
+ | PreferredDb | Preferred state | Local state | AllowFallback | Return |
172
+ |-------------+-----------------+-------------+---------------+-------------|
173
+ | set | not found | - | any | NOT_FOUND |
174
+ | set | available | - | any | preferred |
175
+ | set | unavailable | - | false | UNAVAILABLE |
176
+ | set | unavailable | - | true | by hash |
177
+ | unset | - | not found | false | NOT_FOUND |
178
+ | unset | - | not found | true | by hash |
179
+ | unset | - | available | any | local |
180
+ | unset | - | unavailable | false | UNAVAILABLE |
181
+ | unset | - | unavailable | true | by hash |
182
+ */
183
+
184
+ decltype (begin (dbInfos)) it;
185
+ if (settings.PreferredDatabase_ ) {
186
+ it = std::find_if (begin (dbInfos), end (dbInfos), [&preferred = settings.PreferredDatabase_ ](auto const & db) {
187
+ return AsciiEqualsIgnoreCase (*preferred, db->name ()) || AsciiEqualsIgnoreCase (*preferred, db->id ());
188
+ });
189
+ if (it == end (dbInfos)) {
190
+ return {nullptr , EStatus::NOT_FOUND};
191
+ }
192
+ } else {
193
+ it = std::find_if (begin (dbInfos), end (dbInfos), [&selfLocation](auto const & db) {
194
+ return AsciiEqualsIgnoreCase (selfLocation, db->location ());
195
+ });
196
+ if (it == end (dbInfos)) {
197
+ if (!settings.AllowFallback_ ) {
198
+ return {nullptr , EStatus::NOT_FOUND};
199
+ }
200
+ return SelectDatabaseByHash (settings, dbInfos);
201
+ }
202
+ }
203
+
204
+ auto db = *it;
205
+ if (db->status () == TDbInfo::Status::DatabaseInfo_Status_AVAILABLE) {
206
+ return {db, EStatus::SUCCESS};
207
+ }
208
+ if (!settings.AllowFallback_ ) {
209
+ return {nullptr , EStatus::UNAVAILABLE};
210
+ }
211
+ return SelectDatabaseByHash (settings, dbInfos);
212
+ }
213
+
214
+ std::pair<std::shared_ptr<TDbInfo>, EStatus> TFederatedWriteSessionImpl::SelectDatabaseImpl () {
215
+ return SelectDatabase (Settings, FederationState->DbInfos , FederationState->SelfLocation );
216
+ }
217
+
176
218
void TFederatedWriteSessionImpl::OnFederatedStateUpdateImpl () {
177
219
if (!FederationState->Status .IsSuccess ()) {
178
220
CloseImpl (FederationState->Status .GetStatus (), NYql::TIssues (FederationState->Status .GetIssues ()));
@@ -181,13 +223,23 @@ void TFederatedWriteSessionImpl::OnFederatedStateUpdateImpl() {
181
223
182
224
Y_ABORT_UNLESS (!FederationState->DbInfos .empty ());
183
225
184
- auto preferrableDb = SelectDatabaseImpl ();
226
+ auto [ preferrableDb, status] = SelectDatabaseImpl ();
185
227
186
228
if (!preferrableDb) {
187
- CloseImpl (EStatus::UNAVAILABLE,
188
- NYql::TIssues{NYql::TIssue (" Fail to select database: no available database with positive weight" )});
229
+ if (!RetryState) {
230
+ RetryState = Settings.RetryPolicy_ ->CreateRetryState ();
231
+ }
232
+ if (auto delay = RetryState->GetNextRetryDelay (status)) {
233
+ LOG_LAZY (Log, TLOG_NOTICE, GetLogPrefix () << " Retry to update federation state in " << delay);
234
+ ScheduleFederatedStateUpdateImpl (*delay);
235
+ } else {
236
+ std::string message = " Failed to select database: no available database" ;
237
+ LOG_LAZY (Log, TLOG_ERR, GetLogPrefix () << message);
238
+ CloseImpl (status, NYql::TIssues{NYql::TIssue (message)});
239
+ }
189
240
return ;
190
241
}
242
+ RetryState.reset ();
191
243
192
244
if (!DatabasesAreSame (preferrableDb, CurrentDatabase)) {
193
245
LOG_LAZY (Log, TLOG_INFO, GetLogPrefix ()
0 commit comments