Skip to content

Commit e3fc279

Browse files
authored
Fix limit tests for EvWrite (#15214)
1 parent 33c7ead commit e3fc279

File tree

2 files changed

+35
-13
lines changed

2 files changed

+35
-13
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
298298
STATEFN(FinalizeState) {
299299
try {
300300
switch(ev->GetTypeRewrite()) {
301-
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
301+
hFunc(TEvKqp::TEvAbortExecution, HandleFinalize);
302302
hFunc(TEvKqpBuffer::TEvResult, HandleFinalize);
303303
hFunc(TEvents::TEvUndelivered, HandleFinalize);
304304

@@ -325,6 +325,14 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
325325
ReportEventElapsedTime();
326326
}
327327

328+
void HandleFinalize(TEvKqp::TEvAbortExecution::TPtr& ev) {
329+
if (IsCancelAfterAllowed(ev)) {
330+
TBase::HandleAbortExecution(ev);
331+
} else {
332+
LOG_D("Got TEvAbortExecution from : " << ev->Sender << " but cancelation is not alowed");
333+
}
334+
}
335+
328336
void HandleFinalize(TEvKqpBuffer::TEvResult::TPtr& ev) {
329337
if (ev->Get()->Stats) {
330338
if (Stats) {
@@ -1712,7 +1720,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
17121720

17131721
void ExecuteDatashardTransaction(ui64 shardId, NKikimrTxDataShard::TKqpTransaction& kqpTx, const bool isOlap)
17141722
{
1715-
YQL_ENSURE(!TxManager);
1723+
YQL_ENSURE(ReadOnlyTx || !TxManager);
17161724
TShardState shardState;
17171725
shardState.State = ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing;
17181726
shardState.DatashardState.ConstructInPlace();

ydb/core/kqp/ut/query/kqp_limits_ut.cpp

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -431,8 +431,9 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
431431
UNIT_ASSERT_C(getOutOfSpace, "Successfully inserted " << rowsPerBatch << " x " << batchCount << " lines, each of size " << dataTextSize << "bytes");
432432
}
433433

434-
Y_UNIT_TEST(TooBigQuery) {
434+
Y_UNIT_TEST_TWIN(TooBigQuery, useSink) {
435435
auto app = NKikimrConfig::TAppConfig();
436+
app.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
436437
app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(1'000'000'000);
437438
app.MutableTableServiceConfig()->SetCompileTimeoutMs(TDuration::Minutes(5).MilliSeconds());
438439

@@ -468,8 +469,12 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
468469
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
469470
result.GetIssues().PrintTo(Cerr);
470471
//UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
471-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
472-
UNIT_ASSERT(HasIssue(result.GetIssues(), NKikimrIssues::TIssuesIds::SHARD_PROGRAM_SIZE_EXCEEDED));
472+
if (useSink) {
473+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
474+
} else {
475+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
476+
UNIT_ASSERT(HasIssue(result.GetIssues(), NKikimrIssues::TIssuesIds::SHARD_PROGRAM_SIZE_EXCEEDED));
477+
}
473478
}
474479

475480
Y_UNIT_TEST(BigParameter) {
@@ -522,8 +527,10 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
522527
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
523528
}
524529

525-
Y_UNIT_TEST(TooBigKey) {
526-
TKikimrRunner kikimr;
530+
Y_UNIT_TEST_TWIN(TooBigKey, useSink) {
531+
NKikimrConfig::TAppConfig appConfig;
532+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
533+
TKikimrRunner kikimr(appConfig);
527534
auto db = kikimr.GetTableClient();
528535
auto session = db.CreateSession().GetValueSync().GetSession();
529536

@@ -543,10 +550,15 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
543550

544551
result.GetIssues().PrintTo(Cerr);
545552
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
546-
UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR,
547-
[] (const auto& issue) {
548-
return issue.GetMessage().contains("exceeds limit");
549-
}));
553+
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR,
554+
[&](const auto& issue) {
555+
if (useSink) {
556+
return issue.GetMessage().contains("Row key size of")
557+
&& issue.GetMessage().contains("bytes is larger than the allowed threshold");
558+
} else {
559+
return issue.GetMessage().contains("exceeds limit");
560+
}
561+
}), result.GetIssues().ToString());
550562
}
551563

552564
Y_UNIT_TEST(TooBigColumn) {
@@ -700,8 +712,10 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
700712
}
701713
}
702714

703-
Y_UNIT_TEST(CancelAfterRwTx) {
704-
TKikimrRunner kikimr;
715+
Y_UNIT_TEST_TWIN(CancelAfterRwTx, useSink) {
716+
NKikimrConfig::TAppConfig appConfig;
717+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
718+
TKikimrRunner kikimr(appConfig);
705719
NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
706720

707721
{

0 commit comments

Comments
 (0)