Skip to content

Commit 9e1fcd3

Browse files
authored
Merge pull request #11430 from uzhastik/24_3_merge_16
24 3 merge 16
2 parents 346eb69 + ac138d9 commit 9e1fcd3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+989
-97
lines changed

ydb/core/blobstorage/nodewarden/node_warden_impl.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ void TNodeWarden::RemoveDrivesWithBadSerialsAndReport(TVector<NPDisk::TDriveData
9090
}
9191

9292
TVector<NPDisk::TDriveData> TNodeWarden::ListLocalDrives() {
93+
if (!AppData()->FeatureFlags.GetEnableDriveSerialsDiscovery()) {
94+
return {};
95+
}
96+
9397
TStringStream details;
9498
TVector<NPDisk::TDriveData> drives = ListDevicesWithPartlabel(details);
9599

ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ namespace NKikimr {
174174
TEvResumeForce *ResumeForceToken = nullptr;
175175
TInstant ReplicationEndTime;
176176
bool UnrecoveredNonphantomBlobs = false;
177+
bool RequestedReplicationToken = false;
178+
bool HoldingReplicationToken = false;
177179

178180
TWatchdogTimer<TEvReplCheckProgress> ReplProgressWatchdog;
179181

@@ -287,6 +289,12 @@ namespace NKikimr {
287289
case Plan:
288290
// this is a first quantum of replication, so we have to register it in the broker
289291
State = AwaitToken;
292+
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken);
293+
if (RequestedReplicationToken) {
294+
STLOG(PRI_CRIT, BS_REPL, BSVR38, ReplCtx->VCtx->VDiskLogPrefix << "excessive replication token requested");
295+
break;
296+
}
297+
RequestedReplicationToken = true;
290298
if (!Send(MakeBlobStorageReplBrokerID(), new TEvQueryReplToken(ReplCtx->VDiskCfg->BaseInfo.PDiskId))) {
291299
HandleReplToken();
292300
}
@@ -303,6 +311,10 @@ namespace NKikimr {
303311
}
304312

305313
void HandleReplToken() {
314+
Y_ABORT_UNLESS(RequestedReplicationToken);
315+
RequestedReplicationToken = false;
316+
HoldingReplicationToken = true;
317+
306318
// switch to replication state
307319
Transition(AwaitToken, Replication);
308320
if (!ResumeIfReady()) {
@@ -408,6 +420,9 @@ namespace NKikimr {
408420
if (State == WaitQueues || State == Replication) {
409421
// release token as we have finished replicating
410422
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
423+
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken);
424+
Y_DEBUG_ABORT_UNLESS(HoldingReplicationToken);
425+
HoldingReplicationToken = false;
411426
}
412427
ResetReplProgressTimer(true);
413428

@@ -635,7 +650,15 @@ namespace NKikimr {
635650

636651
// return replication token if we have one
637652
if (State == AwaitToken || State == WaitQueues || State == Replication) {
638-
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
653+
Y_DEBUG_ABORT_UNLESS(RequestedReplicationToken || HoldingReplicationToken);
654+
if (RequestedReplicationToken || HoldingReplicationToken) {
655+
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
656+
}
657+
} else {
658+
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken && !HoldingReplicationToken);
659+
if (RequestedReplicationToken || HoldingReplicationToken) {
660+
STLOG(PRI_CRIT, BS_REPL, BSVR37, ReplCtx->VCtx->VDiskLogPrefix << "stuck replication token");
661+
}
639662
}
640663

641664
if (ReplJobActorId) {

ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ namespace NKikimr {
129129
ui64 NextReceiveCookie;
130130
TResultQueue ResultQueue;
131131
std::shared_ptr<TMessageRelevanceTracker> Tracker = std::make_shared<TMessageRelevanceTracker>();
132+
bool Terminated = false;
132133

133134
TQueue<std::unique_ptr<TEvBlobStorage::TEvVGet>> SchedulerRequestQ;
134135
THashMap<ui64, TReplMemTokenId> RequestTokens;
@@ -227,9 +228,7 @@ namespace NKikimr {
227228
PrefetchDataSize = 0;
228229
RequestFromVDiskProxyPending = false;
229230
if (Finished) {
230-
Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue);
231-
RequestTokens.clear();
232-
return PassAway(); // TODO(alexvru): check correctness of invocations
231+
return PassAway();
233232
}
234233
}
235234
// send request(s) if prefetch queue is not full
@@ -297,6 +296,9 @@ namespace NKikimr {
297296
if (msg->Record.GetCookie() == NextReceiveCookie) {
298297
ui64 cookie = NextReceiveCookie;
299298
ProcessResult(msg);
299+
if (Terminated) {
300+
return;
301+
}
300302
ReleaseMemToken(cookie);
301303
while (!ResultQueue.empty()) {
302304
const TQueueItem& top = ResultQueue.top();
@@ -305,6 +307,9 @@ namespace NKikimr {
305307
}
306308
ui64 cookie = NextReceiveCookie;
307309
ProcessResult(top.get());
310+
if (Terminated) {
311+
return;
312+
}
308313
ReleaseMemToken(cookie);
309314
ResultQueue.pop();
310315
}
@@ -314,6 +319,7 @@ namespace NKikimr {
314319
}
315320

316321
void ReleaseMemToken(ui64 cookie) {
322+
Y_ABORT_UNLESS(!Terminated);
317323
if (RequestTokens) {
318324
auto it = RequestTokens.find(cookie);
319325
Y_ABORT_UNLESS(it != RequestTokens.end());
@@ -428,6 +434,13 @@ namespace NKikimr {
428434
}
429435
}
430436

437+
void PassAway() override {
438+
Y_ABORT_UNLESS(!Terminated);
439+
Terminated = true;
440+
Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue);
441+
TActorBootstrapped::PassAway();
442+
}
443+
431444
STRICT_STFUNC(StateFunc,
432445
hFunc(TEvReplProxyNext, Handle)
433446
hFunc(TEvReplMemToken, Handle)
@@ -446,8 +459,7 @@ namespace NKikimr {
446459
TTrackableVector<TVDiskProxy::TScheduledBlob>&& ids,
447460
const TVDiskID& vdiskId,
448461
const TActorId& serviceId)
449-
: TActorBootstrapped<TVDiskProxyActor>()
450-
, ReplCtx(std::move(replCtx))
462+
: ReplCtx(std::move(replCtx))
451463
, GType(ReplCtx->VCtx->Top->GType)
452464
, Ids(std::move(ids))
453465
, VDiskId(vdiskId)

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
136136
{
137137
Y_UNUSED(config);
138138

139-
if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE)
139+
if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE &&
140+
*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO)
140141
return false;
141142

142143
if (txCtx.GetSnapshot().IsValid())

ydb/core/kqp/common/kqp_tx.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
212212
break;
213213

214214
case Ydb::Table::TransactionSettings::kSnapshotReadOnly:
215-
// TODO: (KIKIMR-3374) Use separate isolation mode to avoid optimistic locks.
216-
EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
215+
EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO;
217216
Readonly = true;
218217
break;
219218

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,8 +1039,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
10391039
std::sort(std::begin(shardsRanges), std::end(shardsRanges), [&](const TShardRangesWithShardId& lhs, const TShardRangesWithShardId& rhs) {
10401040
// Special case for infinity
10411041
if (lhs.Ranges->GetRightBorder().first->GetCells().empty() || rhs.Ranges->GetRightBorder().first->GetCells().empty()) {
1042-
YQL_ENSURE(!lhs.Ranges->GetRightBorder().first->GetCells().empty() || !rhs.Ranges->GetRightBorder().first->GetCells().empty());
1043-
return rhs.Ranges->GetRightBorder().first->GetCells().empty();
1042+
return !lhs.Ranges->GetRightBorder().first->GetCells().empty();
10441043
}
10451044
return CompareTypedCellVectors(
10461045
lhs.Ranges->GetRightBorder().first->GetCells().data(),

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,8 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
187187
request.SetTxId(TxId);
188188
if (LockTxId) {
189189
request.SetLockTxId(*LockTxId);
190+
request.SetLockNodeId(LockNodeId);
190191
}
191-
request.SetLockNodeId(LockNodeId);
192192
ActorIdToProto(ExecuterId, request.MutableExecuterActorId());
193193

194194
if (Deadline) {

ydb/core/kqp/gateway/behaviour/view/manager.cpp

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ void FillCreateViewProposal(NKikimrSchemeOp::TModifyScheme& modifyScheme,
6060
const auto pathPair = SplitPathByDb(settings.GetObjectId(), context.GetDatabase());
6161
modifyScheme.SetWorkingDir(pathPair.first);
6262
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateView);
63+
modifyScheme.SetFailedOnAlreadyExists(!settings.GetExistingOk());
6364

6465
auto& viewDesc = *modifyScheme.MutableCreateView();
6566
viewDesc.SetName(pathPair.second);
@@ -77,16 +78,20 @@ void FillDropViewProposal(NKikimrSchemeOp::TModifyScheme& modifyScheme,
7778
const auto pathPair = SplitPathByObjectId(settings.GetObjectId());
7879
modifyScheme.SetWorkingDir(pathPair.first);
7980
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpDropView);
81+
modifyScheme.SetSuccessOnNotExist(settings.GetMissingOk());
8082

8183
auto& drop = *modifyScheme.MutableDrop();
8284
drop.SetName(pathPair.second);
8385
}
8486

8587
NThreading::TFuture<TYqlConclusionStatus> SendSchemeRequest(TEvTxUserProxy::TEvProposeTransaction* request,
8688
TActorSystem* actorSystem,
87-
bool failOnAlreadyExists) {
89+
bool failedOnAlreadyExists,
90+
bool successOnNotExist) {
8891
const auto promiseScheme = NThreading::NewPromise<NKqp::TSchemeOpRequestHandler::TResult>();
89-
IActor* const requestHandler = new TSchemeOpRequestHandler(request, promiseScheme, failOnAlreadyExists);
92+
IActor* const requestHandler = new TSchemeOpRequestHandler(
93+
request, promiseScheme, failedOnAlreadyExists, successOnNotExist
94+
);
9095
actorSystem->Register(requestHandler);
9196
return promiseScheme.GetFuture().Apply([](const NThreading::TFuture<NKqp::TSchemeOpRequestHandler::TResult>& opResult) {
9297
if (opResult.HasValue()) {
@@ -109,7 +114,12 @@ NThreading::TFuture<TYqlConclusionStatus> CreateView(const NYql::TCreateObjectSe
109114
auto& schemeTx = *proposal->Record.MutableTransaction()->MutableModifyScheme();
110115
FillCreateViewProposal(schemeTx, settings, context.GetExternalData());
111116

112-
return SendSchemeRequest(proposal.Release(), context.GetExternalData().GetActorSystem(), true);
117+
return SendSchemeRequest(
118+
proposal.Release(),
119+
context.GetExternalData().GetActorSystem(),
120+
schemeTx.GetFailedOnAlreadyExists(),
121+
schemeTx.GetSuccessOnNotExist()
122+
);
113123
}
114124

115125
NThreading::TFuture<TYqlConclusionStatus> DropView(const NYql::TDropObjectSettings& settings,
@@ -122,7 +132,12 @@ NThreading::TFuture<TYqlConclusionStatus> DropView(const NYql::TDropObjectSettin
122132
auto& schemeTx = *proposal->Record.MutableTransaction()->MutableModifyScheme();
123133
FillDropViewProposal(schemeTx, settings);
124134

125-
return SendSchemeRequest(proposal.Release(), context.GetExternalData().GetActorSystem(), false);
135+
return SendSchemeRequest(
136+
proposal.Release(),
137+
context.GetExternalData().GetActorSystem(),
138+
schemeTx.GetFailedOnAlreadyExists(),
139+
schemeTx.GetSuccessOnNotExist()
140+
);
126141
}
127142

128143
void PrepareCreateView(NKqpProto::TKqpSchemeOperation& schemeOperation,
@@ -214,10 +229,10 @@ NThreading::TFuture<TYqlConclusionStatus> TViewManager::ExecutePrepared(const NK
214229
switch (schemeOperation.GetOperationCase()) {
215230
case NKqpProto::TKqpSchemeOperation::kCreateView:
216231
schemeTx.CopyFrom(schemeOperation.GetCreateView());
217-
return SendSchemeRequest(proposal.Release(), context.GetActorSystem(), true);
232+
break;
218233
case NKqpProto::TKqpSchemeOperation::kDropView:
219234
schemeTx.CopyFrom(schemeOperation.GetDropView());
220-
return SendSchemeRequest(proposal.Release(), context.GetActorSystem(), false);
235+
break;
221236
default:
222237
return NThreading::MakeFuture(TYqlConclusionStatus::Fail(
223238
TStringBuilder()
@@ -226,6 +241,12 @@ NThreading::TFuture<TYqlConclusionStatus> TViewManager::ExecutePrepared(const NK
226241
)
227242
);
228243
}
244+
return SendSchemeRequest(
245+
proposal.Release(),
246+
context.GetActorSystem(),
247+
schemeTx.GetFailedOnAlreadyExists(),
248+
schemeTx.GetSuccessOnNotExist()
249+
);
229250
}
230251

231252
}

ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,18 @@ TMaybeNode<TCoLambda> ExtractTopSortKeySelector(TExprBase node, const NYql::TPar
193193
return {};
194194
}
195195

196+
bool IsIdLambda(TExprBase body) {
197+
if (auto cond = body.Maybe<TCoConditionalValueBase>()) {
198+
if (auto boolLit = cond.Cast().Predicate().Maybe<TCoBool>()) {
199+
return boolLit.Literal().Cast().Value() == "true" && cond.Value().Maybe<TCoArgument>();
200+
}
201+
}
202+
if (body.Maybe<TCoArgument>()) {
203+
return true;
204+
}
205+
return false;
206+
}
207+
196208
} // namespace
197209

198210
TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx,
@@ -305,7 +317,7 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
305317
const NYql::TKikimrTableDescription & tableDesc) -> TIndexComparisonKey
306318
{
307319
return std::make_tuple(
308-
keySelector.IsValid() && IsSortKeyPrimary(keySelector.Cast(), tableDesc),
320+
keySelector.IsValid() && IsSortKeyPrimary(keySelector.Cast(), tableDesc) && IsIdLambda(TCoLambda(buildResult.PrunedLambda).Body()),
309321
buildResult.PointPrefixLen >= descriptionKeyColumns,
310322
buildResult.PointPrefixLen >= descriptionKeyColumns ? 0 : buildResult.PointPrefixLen,
311323
buildResult.UsedPrefixLen >= descriptionKeyColumns,

0 commit comments

Comments
 (0)