Skip to content

Commit a196e19

Browse files
CS improvements (#7013)
Co-authored-by: ivanmorozov333 <ivanmorozov@ydb.tech>
1 parent 90223b3 commit a196e19

File tree

222 files changed

+2386
-2815
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

222 files changed

+2386
-2815
lines changed

.github/config/muted_ya.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ ydb/core/external_sources *
88
ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient
99
ydb/core/keyvalue/ut_trace TKeyValueTracingTest.*
1010
ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDataSourceMetadata
11-
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization
1211
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
1312
ydb/core/kqp/ut/olap KqpOlap.ScanQueryOltpAndOlap
1413
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
1514
ydb/core/kqp/ut/olap KqpOlap.YqlScriptOltpAndOlap
1615
ydb/core/kqp/ut/olap KqpOlapAggregations.Aggregation_ResultCountAll_FilterL
16+
ydb/core/kqp/ut/olap KqpOlapWrite.WriteDeleteCleanGC
1717
ydb/core/kqp/ut/pg KqpPg.CreateIndex
1818
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
1919
ydb/core/kqp/ut/query KqpQuery.QueryTimeout

ydb/core/formats/arrow/arrow_helpers.cpp

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -892,24 +892,33 @@ std::shared_ptr<arrow::RecordBatch> MergeColumns(const std::vector<std::shared_p
892892
}
893893

894894
std::vector<std::shared_ptr<arrow::RecordBatch>> SliceToRecordBatches(const std::shared_ptr<arrow::Table>& t) {
895-
std::set<ui32> splitPositions;
896-
const ui32 numRows = t->num_rows();
897-
for (auto&& i : t->columns()) {
898-
ui32 pos = 0;
899-
for (auto&& arr : i->chunks()) {
900-
splitPositions.emplace(pos);
901-
pos += arr->length();
895+
if (!t->num_rows()) {
896+
return {};
897+
}
898+
std::vector<ui32> positions;
899+
{
900+
for (auto&& i : t->columns()) {
901+
ui32 pos = 0;
902+
for (auto&& arr : i->chunks()) {
903+
positions.emplace_back(pos);
904+
pos += arr->length();
905+
}
906+
AFL_VERIFY(pos == t->num_rows());
902907
}
903-
AFL_VERIFY(pos == t->num_rows());
908+
positions.emplace_back(t->num_rows());
904909
}
910+
std::sort(positions.begin(), positions.end());
911+
positions.erase(std::unique(positions.begin(), positions.end()), positions.end());
912+
905913
std::vector<std::vector<std::shared_ptr<arrow::Array>>> slicedData;
906-
slicedData.resize(splitPositions.size());
907-
std::vector<ui32> positions(splitPositions.begin(), splitPositions.end());
908-
for (auto&& i : t->columns()) {
909-
for (ui32 idx = 0; idx < positions.size(); ++idx) {
910-
auto slice = i->Slice(positions[idx], ((idx + 1 == positions.size()) ? numRows : positions[idx + 1]) - positions[idx]);
911-
AFL_VERIFY(slice->num_chunks() == 1);
912-
slicedData[idx].emplace_back(slice->chunks().front());
914+
slicedData.resize(positions.size() - 1);
915+
{
916+
for (auto&& i : t->columns()) {
917+
for (ui32 idx = 0; idx + 1 < positions.size(); ++idx) {
918+
auto slice = i->Slice(positions[idx], positions[idx + 1] - positions[idx]);
919+
AFL_VERIFY(slice->num_chunks() == 1);
920+
slicedData[idx].emplace_back(slice->chunks().front());
921+
}
913922
}
914923
}
915924
std::vector<std::shared_ptr<arrow::RecordBatch>> result;

ydb/core/formats/arrow/program.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -946,18 +946,18 @@ arrow::Result<std::shared_ptr<NArrow::TColumnFilter>> TProgramStep::BuildFilter(
946946
}
947947

948948
const std::set<ui32>& TProgramStep::GetFilterOriginalColumnIds() const {
949-
AFL_VERIFY(IsFilterOnly());
949+
// AFL_VERIFY(IsFilterOnly());
950950
return FilterOriginalColumnIds;
951951
}
952952

953953
std::set<std::string> TProgram::GetEarlyFilterColumns() const {
954954
std::set<std::string> result;
955955
for (ui32 i = 0; i < Steps.size(); ++i) {
956+
auto stepFields = Steps[i]->GetColumnsInUsage(true);
957+
result.insert(stepFields.begin(), stepFields.end());
956958
if (!Steps[i]->IsFilterOnly()) {
957959
break;
958960
}
959-
auto stepFields = Steps[i]->GetColumnsInUsage();
960-
result.insert(stepFields.begin(), stepFields.end());
961961
}
962962
return result;
963963
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#include "serialization.h"
2+
#include <ydb/core/formats/arrow/switch/switch_type.h>
3+
#include <ydb/library/actors/core/log.h>
4+
5+
namespace NKikimr::NArrow::NScalar {
6+
7+
TConclusion<TString> TSerializer::SerializePayloadToString(const std::shared_ptr<arrow::Scalar>& scalar) {
8+
TString resultString;
9+
const bool resultFlag = NArrow::SwitchType(scalar->type->id(), [&](const auto& type) {
10+
using TWrap = std::decay_t<decltype(type)>;
11+
if constexpr (arrow::has_c_type<typename TWrap::T>()) {
12+
using CType = typename TWrap::T::c_type;
13+
using ScalarType = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
14+
const ScalarType* scalarTyped = static_cast<const ScalarType*>(scalar.get());
15+
resultString = TString(sizeof(CType), '\0');
16+
memcpy(&resultString[0], scalarTyped->data(), sizeof(CType));
17+
return true;
18+
}
19+
return false;
20+
});
21+
if (!resultFlag) {
22+
return TConclusionStatus::Fail("incorrect scalar type for payload serialization: " + scalar->type->ToString());
23+
}
24+
return resultString;
25+
}
26+
27+
TConclusion<std::shared_ptr<arrow::Scalar>> TSerializer::DeserializeFromStringWithPayload(const TString& data, const std::shared_ptr<arrow::DataType>& dataType) {
28+
AFL_VERIFY(dataType);
29+
std::shared_ptr<arrow::Scalar> result;
30+
const bool resultFlag = NArrow::SwitchType(dataType->id(), [&](const auto& type) {
31+
using TWrap = std::decay_t<decltype(type)>;
32+
if constexpr (arrow::has_c_type<typename TWrap::T>()) {
33+
using CType = typename TWrap::T::c_type;
34+
AFL_VERIFY(data.size() == sizeof(CType));
35+
using ScalarType = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
36+
result = std::make_shared<ScalarType>(*(CType*)&data[0], dataType);
37+
return true;
38+
}
39+
return false;
40+
});
41+
if (!resultFlag) {
42+
return TConclusionStatus::Fail("incorrect scalar type for payload deserialization: " + dataType->ToString());
43+
}
44+
return result;
45+
}
46+
47+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#pragma once
2+
#include <ydb/library/conclusion/result.h>
3+
4+
#include <contrib/libs/apache/arrow/cpp/src/arrow/scalar.h>
5+
#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
6+
7+
#include <util/generic/string.h>
8+
9+
namespace NKikimr::NArrow::NScalar {
10+
class TSerializer {
11+
public:
12+
static TConclusion<TString> SerializePayloadToString(const std::shared_ptr<arrow::Scalar>& scalar);
13+
static TConclusion<std::shared_ptr<arrow::Scalar>> DeserializeFromStringWithPayload(const TString& data, const std::shared_ptr<arrow::DataType>& dataType);
14+
};
15+
}

ydb/core/formats/arrow/scalar/ya.make

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
LIBRARY()
2+
3+
PEERDIR(
4+
contrib/libs/apache/arrow
5+
ydb/library/conclusion
6+
ydb/core/formats/arrow/switch
7+
ydb/library/actors/core
8+
)
9+
10+
SRCS(
11+
serialization.cpp
12+
)
13+
14+
END()

ydb/core/formats/arrow/size_calcer.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,11 +242,12 @@ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) {
242242
}
243243

244244
NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context) {
245-
std::optional<TFirstLastSpecialKeys> specialKeys;
245+
std::optional<TString> specialKeys;
246246
if (context.GetFieldsForSpecialKeys().size()) {
247-
specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys());
247+
specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys()).SerializeToString();
248248
}
249-
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(), NArrow::GetBatchDataSize(batch), specialKeys);
249+
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(),
250+
NArrow::GetBatchDataSize(batch), specialKeys);
250251
}
251252

252253
TConclusionStatus TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR) {

ydb/core/formats/arrow/size_calcer.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,13 @@ class TSerializedBatch {
7474
YDB_READONLY_DEF(TString, Data);
7575
YDB_READONLY(ui32, RowsCount, 0);
7676
YDB_READONLY(ui32, RawBytes, 0);
77-
std::optional<TFirstLastSpecialKeys> SpecialKeys;
77+
std::optional<TString> SpecialKeys;
7878
public:
7979
size_t GetSize() const {
8080
return Data.size();
8181
}
8282

83-
const TFirstLastSpecialKeys& GetSpecialKeysSafe() const {
83+
const TString& GetSpecialKeysSafe() const {
8484
AFL_VERIFY(SpecialKeys);
8585
return *SpecialKeys;
8686
}
@@ -95,7 +95,7 @@ class TSerializedBatch {
9595
static TConclusionStatus BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR);
9696
static TSerializedBatch Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context);
9797

98-
TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const std::optional<TFirstLastSpecialKeys>& specialKeys)
98+
TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const std::optional<TString>& specialKeys)
9999
: SchemaData(schemaData)
100100
, Data(data)
101101
, RowsCount(rowsCount)

ydb/core/formats/arrow/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ PEERDIR(
1212
ydb/core/formats/arrow/dictionary
1313
ydb/core/formats/arrow/transformer
1414
ydb/core/formats/arrow/reader
15+
ydb/core/formats/arrow/scalar
1516
ydb/core/formats/arrow/hash
1617
ydb/library/actors/core
1718
ydb/library/arrow_kernels

ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_sharding.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "alter_sharding.h"
2+
#include <ydb/library/actors/core/log.h>
23
#include <util/string/type.h>
34

45
namespace NKikimr::NKqp {
@@ -26,4 +27,8 @@ void TAlterShardingOperation::DoSerializeScheme(NKikimrSchemeOp::TModifyScheme&
2627
scheme.MutableAlterColumnTable()->MutableReshardColumnTable()->SetIncrease(*Increase);
2728
}
2829

30+
void TAlterShardingOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& /*scheme*/) const {
31+
AFL_VERIFY(false);
32+
}
33+
2934
}

0 commit comments

Comments
 (0)