Skip to content

Commit 1f29b7c

Browse files
authored
[Stable 24 3 8 analytics] EvWrite & CTAS (ydb-platform#8861)
1 parent 5909a74 commit 1f29b7c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1803
-154
lines changed

.github/config/muted_ya.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,18 @@ ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
1414
ydb/core/kqp/ut/olap KqpOlapAggregations.Aggregation_ResultCountAll_FilterL
1515
ydb/core/kqp/ut/olap KqpOlapWrite.WriteDeleteCleanGC
1616
ydb/core/kqp/ut/pg KqpPg.CreateIndex
17+
ydb/core/kqp/ut/tx KqpLocksTricky.TestNoLocksIssueInteractiveTx+withSink
18+
ydb/core/kqp/ut/tx KqpLocksTricky.TestNoLocksIssue+withSink
19+
ydb/core/kqp/ut/tx KqpSnapshotRead.ReadOnlyTxWithIndexCommitsOnConcurrentWrite+withSink
20+
ydb/core/kqp/ut/tx KqpSinkTx.InvalidateOnError
1721
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
1822
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
23+
ydb/core/kqp/ut/service KqpQueryService.TableSink_OlapRWQueries
24+
ydb/core/kqp/ut/service KqpQueryService.TableSink_OltpReplace+HasSecondaryIndex
25+
ydb/core/kqp/ut/query KqpQuery.OlapCreateAsSelect_Complex
26+
ydb/core/kqp/ut/query KqpQuery.OlapCreateAsSelect_Simple
27+
ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExternalDataSource
28+
ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExternalTable
1929
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
2030
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
2131
ydb/core/kqp/ut/scheme KqpOlap.OlapRead_GenericQuerys

ydb/core/grpc_services/query/rpc_kqp_tx.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ class TBeginTransactionRPC : public TActorBootstrapped<TBeginTransactionRPC> {
124124
if (kqpResponse.HasTxMeta()) {
125125
beginTxResult->mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id());
126126
}
127+
*beginTxResult->mutable_issues() = issueMessage;
127128
}
128129

129130
Reply(record.GetYdbStatus(), beginTxResult);
@@ -168,7 +169,7 @@ class TFinishTransactionRPC : public TActorBootstrapped<TFinishTransactionRPC>
168169
private:
169170
virtual std::pair<TString, TString> GetReqData() const = 0;
170171
virtual void Fill(NKikimrKqp::TQueryRequest* req) const = 0;
171-
virtual NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status) const = 0;
172+
virtual NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) const = 0;
172173

173174
void StateWork(TAutoPtr<IEventHandle>& ev) {
174175
try {
@@ -218,15 +219,15 @@ class TFinishTransactionRPC : public TActorBootstrapped<TFinishTransactionRPC>
218219
const auto& record = ev->Get()->Record.GetRef();
219220
FillCommonKqpRespFields(record, Request.get());
220221

222+
NYql::TIssues issues;
221223
if (record.HasResponse()) {
222224
const auto& kqpResponse = record.GetResponse();
223225
const auto& issueMessage = kqpResponse.GetQueryIssues();
224-
NYql::TIssues issues;
225226
NYql::IssuesFromMessage(issueMessage, issues);
226227
Request->RaiseIssues(issues);
227228
}
228229

229-
Reply(record.GetYdbStatus(), CreateResult(record.GetYdbStatus()));
230+
Reply(record.GetYdbStatus(), CreateResult(record.GetYdbStatus(), issues));
230231
}
231232

232233
void InternalError(const TString& message) {
@@ -271,9 +272,10 @@ class TCommitTransactionRPC : public TFinishTransactionRPC {
271272
req->MutableTxControl()->set_commit_tx(true);
272273
}
273274

274-
NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status) const override {
275+
NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) const override {
275276
auto result = TEvCommitTransactionRequest::AllocateResult<Ydb::Query::CommitTransactionResponse>(Request);
276277
result->set_status(status);
278+
NYql::IssuesToMessage(issues, result->mutable_issues());
277279
return result;
278280
}
279281
};
@@ -293,9 +295,10 @@ class TRollbackTransactionRPC : public TFinishTransactionRPC {
293295
req->SetAction(NKikimrKqp::QUERY_ACTION_ROLLBACK_TX);
294296
}
295297

296-
NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status) const override {
298+
NProtoBuf::Message* CreateResult(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) const override {
297299
auto result = TEvRollbackTransactionRequest::AllocateResult<Ydb::Query::RollbackTransactionResponse>(Request);
298300
result->set_status(status);
301+
NYql::IssuesToMessage(issues, result->mutable_issues());
299302
return result;
300303
}
301304
};

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,33 @@ namespace NKqp {
77

88
using namespace NYql;
99

10-
TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TMaybe<TKqpTxLock>& invalidatedLock) {
10+
NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKikimrPathId& pathId) {
1111
TStringBuilder message;
1212
message << "Transaction locks invalidated.";
1313

14-
TMaybe<TString> tableName;
15-
if (invalidatedLock) {
16-
TKikimrPathId id(invalidatedLock->GetSchemeShard(), invalidatedLock->GetPathId());
17-
auto table = txCtx.TableByIdMap.FindPtr(id);
18-
if (table) {
19-
tableName = *table;
14+
if (pathId.OwnerId() != 0) {
15+
auto table = txCtx.TableByIdMap.FindPtr(pathId);
16+
if (!table) {
17+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
2018
}
19+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << *table);
20+
} else {
21+
// Olap tables don't return SchemeShard in locks, thus we use tableId here.
22+
for (const auto& [pathId, table] : txCtx.TableByIdMap) {
23+
if (pathId.TableId() == pathId.TableId()) {
24+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << table);
25+
}
26+
}
27+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
2128
}
29+
}
2230

23-
if (tableName) {
24-
message << " Table: " << *tableName;
25-
}
26-
27-
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
31+
TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKqpTxLock& invalidatedLock) {
32+
return GetLocksInvalidatedIssue(
33+
txCtx,
34+
TKikimrPathId(
35+
invalidatedLock.GetSchemeShard(),
36+
invalidatedLock.GetPathId()));
2837
}
2938

3039
std::pair<bool, std::vector<TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,

ydb/core/kqp/common/kqp_tx.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ class TTransactionsCache {
434434
}
435435
};
436436

437+
NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const NYql::TKikimrPathId& pathId);
437438
std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type,
438439
const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx);
439440

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
172172
}
173173

174174
void StartSplitting(const TActorContext &ctx) {
175-
YQL_ENSURE(PerStatementResult);
176-
177175
const auto prepareSettings = PrepareCompilationSettings(ctx);
178176
auto result = KqpHost->SplitQuery(QueryRef, prepareSettings);
179177

@@ -280,7 +278,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
280278
IKqpHost::TPrepareSettings prepareSettings;
281279
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;
282280
prepareSettings.IsInternalCall = QueryId.Settings.IsInternalCall;
283-
prepareSettings.PerStatementResult = PerStatementResult;
284281

285282
switch (QueryId.Settings.Syntax) {
286283
case Ydb::Query::Syntax::SYNTAX_YQL_V1:

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,18 +131,18 @@ namespace NKikimr::NKqp {
131131
using namespace NYql::NDq;
132132
using namespace NYql::NDqProto;
133133

134-
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
134+
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
135135
TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
136136
const NYql::NDq::TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
137137
TIntrusivePtr<NActors::TProtoArenaHolder> arena) {
138-
return new NScanPrivate::TKqpScanComputeActor(executerId, txId, task, std::move(asyncIoFactory),
138+
return new NScanPrivate::TKqpScanComputeActor(executerId, txId, lockTxId, lockNodeId, task, std::move(asyncIoFactory),
139139
settings, memoryLimits, std::move(traceId), std::move(arena));
140140
}
141141

142142
IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
143143
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
144-
const ui64 txId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) {
145-
return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, meta, shardsScanningPolicy, counters, std::move(traceId));
144+
const ui64 txId, ui64 lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) {
145+
return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, meta, shardsScanningPolicy, counters, std::move(traceId));
146146
}
147147

148148
}

ydb/core/kqp/compute_actor/kqp_compute_actor.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,14 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr
5050
TIntrusivePtr<NActors::TProtoArenaHolder> arena,
5151
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
5252

53-
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
53+
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
5454
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
5555
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
5656
TIntrusivePtr<NActors::TProtoArenaHolder> arena);
5757

5858
IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
5959
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
60-
const ui64 txId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
60+
const ui64 txId, ui64 lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
6161

6262
NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
6363
TIntrusivePtr<TKqpCounters> counters,

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
211211
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
212212
YQL_ENSURE(args.ComputesByStages);
213213
auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
214-
IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.Task,
214+
IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.LockTxId, args.LockNodeId, args.Task,
215215
AsyncIoFactory, runtimeSettings, memoryLimits,
216216
std::move(args.TraceId), std::move(args.Arena));
217217
TActorId result = TlsActivationContext->Register(computeActor);

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ struct IKqpNodeComputeActorFactory {
106106
struct TCreateArgs {
107107
const NActors::TActorId& ExecuterId;
108108
const ui64 TxId;
109+
const ui64 LockTxId;
110+
const ui32 LockNodeId;
109111
NYql::NDqProto::TDqTask* Task;
110112
TIntrusivePtr<NRm::TTxState> TxInfo;
111113
const NYql::NDq::TComputeRuntimeSettings& RuntimeSettings;

ydb/core/kqp/compute_actor/kqp_compute_events.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@
1111

1212
namespace NKikimr::NKqp {
1313

14+
struct TLocksInfo {
15+
TVector<NKikimrDataEvents::TLock> Locks;
16+
TVector<NKikimrDataEvents::TLock> BrokenLocks;
17+
};
18+
1419
struct TEvKqpCompute {
1520
struct TEvRemoteScanData : public TEventPB<TEvRemoteScanData, NKikimrKqp::TEvRemoteScanData,
1621
TKqpComputeEvents::EvRemoteScanData> {};
@@ -54,6 +59,7 @@ struct TEvKqpCompute {
5459
bool PageFault = false; // page fault was the reason for sending this message
5560
mutable THolder<TEvRemoteScanData> Remote;
5661
std::shared_ptr<IShardScanStats> StatsOnFinished;
62+
TLocksInfo LocksInfo;
5763

5864
template <class T>
5965
const T& GetStatsAs() const {

0 commit comments

Comments
 (0)