Skip to content

Commit 7aa367a

Browse files
authored
Fix empty upsert with sink (#17482)
1 parent ef35ffe commit 7aa367a

File tree

2 files changed

+66
-3
lines changed

2 files changed

+66
-3
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1747,7 +1747,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
17471747

17481748
void Handle(TEvBufferWrite::TPtr& ev) {
17491749
Counters->ForwardActorWritesLatencyHistogram->Collect((TInstant::Now() - ev->Get()->SendTime).MicroSeconds());
1750-
17511750
TWriteToken token;
17521751
if (!ev->Get()->Token) {
17531752
AFL_ENSURE(ev->Get()->Settings);
@@ -2967,6 +2966,12 @@ class TKqpForwardWriteActor : public TActorBootstrapped<TKqpForwardWriteActor>,
29672966

29682967
void Handle(TEvBufferWriteResult::TPtr& result) {
29692968
CA_LOG_D("TKqpForwardWriteActor recieve EvBufferWriteResult from " << BufferActorId);
2969+
2970+
WriteToken = result->Get()->Token;
2971+
OnFlushed();
2972+
}
2973+
2974+
void OnFlushed() {
29702975
InFlight = false;
29712976

29722977
EgressStats.Bytes += DataSize;
@@ -2975,8 +2980,6 @@ class TKqpForwardWriteActor : public TActorBootstrapped<TKqpForwardWriteActor>,
29752980
EgressStats.Resume();
29762981

29772982
Counters->ForwardActorWritesSizeHistogram->Collect(DataSize);
2978-
2979-
WriteToken = result->Get()->Token;
29802983
DataSize = 0;
29812984

29822985
if (Closed) {
@@ -3032,6 +3035,12 @@ class TKqpForwardWriteActor : public TActorBootstrapped<TKqpForwardWriteActor>,
30323035

30333036
ev->SendTime = TInstant::Now();
30343037

3038+
if (ev->Data->IsEmpty() && ev->Close && WriteToken.IsEmpty()) {
3039+
// Nothing was written
3040+
OnFlushed();
3041+
return;
3042+
}
3043+
30353044
CA_LOG_D("Send data=" << DataSize << ", closed=" << Closed << ", bufferActorId=" << BufferActorId);
30363045
AFL_ENSURE(Send(BufferActorId, ev.release()));
30373046
}

ydb/core/kqp/ut/query/kqp_query_ut.cpp

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2580,6 +2580,60 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
25802580
}
25812581
}
25822582

2583+
Y_UNIT_TEST_TWIN(UpdateThenDelete, UseSink) {
2584+
NKikimrConfig::TAppConfig appConfig;
2585+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
2586+
auto settings = TKikimrSettings()
2587+
.SetAppConfig(appConfig)
2588+
.SetWithSampleTables(true);
2589+
2590+
TKikimrRunner kikimr(settings);
2591+
auto client = kikimr.GetTableClient();
2592+
2593+
{
2594+
const TString query = R"(
2595+
DECLARE $data AS List<Struct<
2596+
Key: String,
2597+
Value: String
2598+
>>;
2599+
2600+
UPSERT INTO KeyValue2 SELECT * FROM AS_TABLE($data);
2601+
2602+
DELETE FROM KeyValue2 ON SELECT * FROM KeyValue2 AS a LEFT ONLY JOIN AS_TABLE($data) AS b USING (Key);
2603+
)";
2604+
2605+
TTypeBuilder builder;
2606+
builder
2607+
.BeginStruct()
2608+
.AddMember("Key", TTypeBuilder().Primitive(NYdb::EPrimitiveType::String).Build())
2609+
.AddMember("Value", TTypeBuilder().Primitive(NYdb::EPrimitiveType::String).Build())
2610+
.EndStruct();
2611+
2612+
auto params = client.GetParamsBuilder()
2613+
.AddParam("$data")
2614+
.EmptyList(builder.Build())
2615+
.Build()
2616+
.Build();
2617+
2618+
auto session = client.CreateSession().GetValueSync().GetSession();
2619+
auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx(), std::move(params)).ExtractValueSync();
2620+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
2621+
2622+
}
2623+
{
2624+
const TString query = R"(
2625+
SELECT * FROM KeyValue2;
2626+
)";
2627+
2628+
auto session = client.CreateSession().GetValueSync().GetSession();
2629+
auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
2630+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
2631+
2632+
Cerr << FormatResultSetYson(result.GetResultSet(0)) << Endl;
2633+
UNIT_ASSERT_VALUES_EQUAL(0, result.GetResultSet(0).RowsCount());
2634+
}
2635+
}
2636+
25832637
}
25842638

25852639
} // namespace NKqp

0 commit comments

Comments
 (0)