Skip to content

Commit bae758c

Browse files
committed
timestamp push down has been fixed (#18764)
1 parent 23aa5f7 commit bae758c

File tree

4 files changed

+40
-5
lines changed

4 files changed

+40
-5
lines changed

.github/config/muted_ya.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,9 @@ ydb/core/kqp/ut/cost KqpCost.OlapWriteRow
2121
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation
2222
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare
2323
ydb/core/kqp/ut/olap KqpOlapJson.BloomIndexesVariants
24-
ydb/core/kqp/ut/olap KqpOlapDelete.DeleteWithDiffrentTypesPKColumns-isStream
2524
ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesColumnActualization
2625
ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesDictActualization
2726
ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesDictStatActualization
28-
ydb/core/kqp/ut/olap KqpOlapDelete.DeleteWithDiffrentTypesPKColumns-isStream
2927
ydb/core/kqp/ut/olap KqpOlapJson.CompactionVariants
3028
ydb/core/kqp/ut/olap KqpOlapJson.DuplicationCompactionVariants
3129
ydb/core/kqp/ut/olap KqpOlapJson.SwitchAccessorCompactionVariants

ydb/core/formats/arrow/program/functions.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,15 @@ class TKernelFunction: public IStepFunction {
345345
TAccessorsCollection::TChunksMerger merger;
346346
while (auto args = argumentsReader.ReadNext()) {
347347
try {
348+
for (auto& arg: *args) {
349+
if (arg.kind() == arrow::Datum::ARRAY && arg.descr().type->id() == arrow::Type::TIMESTAMP) {
350+
auto timestamp_type = std::static_pointer_cast<arrow::TimestampType>(arg.descr().type);
351+
arrow::TimeUnit::type unit = timestamp_type->unit();
352+
if (unit == arrow::TimeUnit::MICRO) {
353+
arrow::util::get<std::shared_ptr<arrow::ArrayData>>(arg.value)->type = arrow::uint64();
354+
}
355+
}
356+
}
348357
auto result = Function->Execute(*args, FunctionOptions.get(), GetContext());
349358
if (result.ok()) {
350359
merger.AddChunk(*result);

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12229,7 +12229,7 @@ Y_UNIT_TEST_SUITE(KqpOlapTypes) {
1222912229
testHelper.ReadData("SELECT dec35 FROM `/Root/ColumnTableTest` WHERE id > 5 ORDER BY dec35", "[[\"155555555555555.1\"];[\"255555555555555\"];[\"1255555555555555.1\"];[\"1555555555555555.1\"]]");
1223012230
}
1223112231

12232-
Y_UNIT_TEST(TimestampCmpErr) {
12232+
Y_UNIT_TEST(NegativeTimestampErr) {
1223312233
TKikimrSettings runnerSettings;
1223412234
runnerSettings.WithSampleTables = false;
1223512235

@@ -12250,9 +12250,8 @@ Y_UNIT_TEST_SUITE(KqpOlapTypes) {
1225012250
{
1225112251
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
1225212252
tableInserter.AddRow().Add(1).Add(ts.MicroSeconds()).Add(now.MicroSeconds());
12253-
testHelper.BulkUpsert(testTable, tableInserter);
12253+
testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::BAD_REQUEST);
1225412254
}
12255-
testHelper.ReadData("SELECT timestamp < timestamp_max FROM `/Root/ColumnTableTest` WHERE id=1", "[[\%false]]");
1225612255
}
1225712256

1225812257
Y_UNIT_TEST(AttributeNegative) {

ydb/core/tx/tx_proxy/upload_rows_common_impl.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,30 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
615615
ctx);
616616
}
617617

618+
bool IsTimestampColumnsArePositive(const std::shared_ptr<arrow::RecordBatch>& batch, TString& error) {
619+
if (!batch) {
620+
return true;
621+
}
622+
for (int i = 0; i < batch->num_columns(); ++i) {
623+
std::shared_ptr<arrow::Array> column = batch->column(i);
624+
std::shared_ptr<arrow::DataType> type = column->type();
625+
std::string columnName = batch->schema()->field(i)->name();
626+
if (type->id() == arrow::Type::TIMESTAMP) {
627+
auto timestampArray = std::static_pointer_cast<arrow::TimestampArray>(column);
628+
for (int64_t j = 0; j < timestampArray->length(); ++j) {
629+
if (timestampArray->IsValid(j)) {
630+
int64_t timestampValue = timestampArray->Value(j);
631+
if (timestampValue < 0) {
632+
error = TStringBuilder{} << "Negative timestamp value found at column " << columnName << ", row " << j << ", value " << timestampValue;
633+
return false;
634+
}
635+
}
636+
}
637+
}
638+
}
639+
return true;
640+
}
641+
618642
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
619643
const NSchemeCache::TSchemeCacheNavigate& request = *ev->Get()->Request;
620644

@@ -717,6 +741,11 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
717741
}
718742
}
719743

744+
TString error;
745+
if (!IsTimestampColumnsArePositive(Batch, error)) {
746+
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, error, ctx);
747+
}
748+
720749
if (Batch) {
721750
UploadCounters.OnRequest(Batch->num_rows());
722751
}

0 commit comments

Comments
 (0)