Skip to content

Commit d9d39c9

Browse files
Fix optional columns (#7875)
Co-authored-by: ivanmorozov333 <ivanmorozov@ydb.tech>
1 parent 5117256 commit d9d39c9

File tree

13 files changed

+104
-35
lines changed

13 files changed

+104
-35
lines changed

ydb/core/formats/arrow/common/container.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <ydb/library/actors/core/log.h>
33
#include <ydb/core/formats/arrow/arrow_helpers.h>
44
#include <ydb/core/formats/arrow/simple_arrays_cache.h>
5+
#include <ydb/library/accessor/validator.h>
56

67
namespace NKikimr::NArrow {
78

@@ -93,10 +94,10 @@ TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Schema>& schem
9394
Initialize();
9495
}
9596

96-
TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Table>& table) {
97+
TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Table>& table)
98+
: RecordsCount(TValidator::CheckNotNull(table)->num_rows())
99+
, Schema(std::make_shared<NModifier::TSchema>(TValidator::CheckNotNull(table)->schema())) {
97100
AFL_VERIFY(table);
98-
Schema = std::make_shared<NModifier::TSchema>(table->schema());
99-
RecordsCount = table->num_rows();
100101
for (auto&& i : table->columns()) {
101102
if (i->num_chunks() == 1) {
102103
Columns.emplace_back(std::make_shared<NAccessor::TTrivialArray>(i->chunk(0)));
@@ -107,10 +108,10 @@ TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::Table>& table)
107108
Initialize();
108109
}
109110

110-
TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::RecordBatch>& table) {
111+
TGeneralContainer::TGeneralContainer(const std::shared_ptr<arrow::RecordBatch>& table)
112+
: RecordsCount(TValidator::CheckNotNull(table)->num_rows())
113+
, Schema(std::make_shared<NModifier::TSchema>(TValidator::CheckNotNull(table)->schema())) {
111114
AFL_VERIFY(table);
112-
Schema = std::make_shared<NModifier::TSchema>(table->schema());
113-
RecordsCount = table->num_rows();
114115
for (auto&& i : table->columns()) {
115116
Columns.emplace_back(std::make_shared<NAccessor::TTrivialArray>(i));
116117
}

ydb/core/formats/arrow/common/container.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ class IFieldsConstructor {
2424

2525
class TGeneralContainer {
2626
private:
27-
YDB_READONLY_DEF(std::optional<ui64>, RecordsCount);
27+
std::optional<ui64> RecordsCount;
2828
YDB_READONLY_DEF(std::shared_ptr<NModifier::TSchema>, Schema);
2929
std::vector<std::shared_ptr<NAccessor::IChunkedArray>> Columns;
3030
void Initialize();
3131
public:
3232
TGeneralContainer(const ui32 recordsCount);
3333

34-
ui32 GetRecordsCountVerified() const {
34+
ui32 GetRecordsCount() const {
3535
AFL_VERIFY(RecordsCount);
3636
return *RecordsCount;
3737
}

ydb/core/formats/arrow/modifier/subset.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ class TSchemaSubset {
1414
TSchemaSubset() = default;
1515
TSchemaSubset(const std::set<ui32>& fieldsIdx, const ui32 fieldsCount);
1616

17+
static TSchemaSubset AllFieldsAccepted() {
18+
TSchemaSubset result;
19+
result.Exclude = true;
20+
return result;
21+
}
22+
1723
template <class T>
1824
std::vector<T> Apply(const std::vector<T>& fullSchema) const {
1925
if (FieldIdx.empty()) {

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2624,6 +2624,43 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
26242624

26252625
}
26262626

2627+
Y_UNIT_TEST(NormalizeAbsentColumn) {
2628+
auto settings = TKikimrSettings().SetWithSampleTables(false);
2629+
TKikimrRunner kikimr(settings);
2630+
TLocalHelper testHelper(kikimr);
2631+
2632+
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
2633+
csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));
2634+
csController->SetLagForCompactionBeforeTierings(TDuration::Seconds(1));
2635+
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
2636+
csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
2637+
2638+
testHelper.CreateTestOlapTable();
2639+
auto tableClient = kikimr.GetTableClient();
2640+
2641+
Tests::NCommon::TLoggerInit(kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD, NKikimrServices::TX_COLUMNSHARD_SCAN }, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();
2642+
2643+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
2644+
2645+
{
2646+
auto alterQuery = TStringBuilder() << "ALTER TABLESTORE `/Root/olapStore` ADD COLUMN new_column1 Uint64;";
2647+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
2648+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
2649+
}
2650+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 1000);
2651+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 1000);
2652+
2653+
{
2654+
auto alterQuery = TStringBuilder() << "ALTER TABLESTORE `/Root/olapStore` ADD COLUMN new_column2 Uint64;";
2655+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
2656+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
2657+
}
2658+
2659+
csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
2660+
csController->WaitIndexation(TDuration::Seconds(5));
2661+
2662+
}
2663+
26272664
Y_UNIT_TEST(MultiInsertWithSinks) {
26282665
NKikimrConfig::TAppConfig appConfig;
26292666
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
33
#include <ydb/core/kqp/ut/common/columnshard.h>
44
#include <ydb/core/testlib/common_helper.h>
5+
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
6+
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
57
#include <ydb/public/lib/ut_helpers/ut_helpers_query.h>
68
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
79
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
@@ -3039,6 +3041,11 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
30393041

30403042
auto session = Kikimr->GetTableClient().CreateSession().GetValueSync().GetSession();
30413043

3044+
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
3045+
csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));
3046+
csController->SetLagForCompactionBeforeTierings(TDuration::Seconds(1));
3047+
csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
3048+
30423049
const TString query = Sprintf(R"(
30433050
CREATE TABLE `/Root/DataShard` (
30443051
Col1 Uint64 NOT NULL,
@@ -3054,6 +3061,8 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
30543061
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
30553062
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
30563063
DoExecute();
3064+
csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
3065+
csController->WaitIndexation(TDuration::Seconds(5));
30573066
}
30583067

30593068
};

ydb/core/kqp/ut/service/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ PEERDIR(
2424
library/cpp/threading/local_executor
2525
ydb/core/kqp
2626
ydb/core/kqp/ut/common
27+
ydb/core/tx/columnshard/hooks/testing
2728
ydb/library/yql/sql/pg
2829
ydb/library/yql/parser/pg_wrapper
2930
ydb/public/lib/ut_helpers

ydb/core/protos/feature_flags.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,4 +146,6 @@ message TFeatureFlags {
146146
optional bool EnableSingleCompositeActionGroup = 131 [default = false];
147147
optional bool EnableResourcePoolsOnServerless = 132 [default = false];
148148
optional bool EnableChangefeedsOnIndexTables = 134 [default = false];
149+
optional bool EnableResourcePoolsCounters = 135 [default = false];
150+
optional bool EnableOptionalColumnsInColumnShard = 136 [default = false];
149151
}

ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,14 @@
22

33
namespace NKikimr::NOlap::NCompaction {
44

5+
void IColumnMerger::Start(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input) {
6+
AFL_VERIFY(!Started);
7+
Started = true;
8+
// for (auto&& i : input) {
9+
// AFL_VERIFY(i->GetDataType()->id() == Context.GetResultField()->type()->id())("input", i->GetDataType()->ToString())(
10+
// "result", Context.GetResultField()->ToString());
11+
// }
12+
return DoStart(input);
13+
}
14+
515
}

ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,7 @@ class IColumnMerger {
2121

2222
virtual ~IColumnMerger() = default;
2323

24-
void Start(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input) {
25-
AFL_VERIFY(!Started);
26-
Started = true;
27-
return DoStart(input);
28-
}
24+
void Start(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input);
2925

3026
std::vector<TColumnPortionResult> Execute(
3127
const NCompaction::TColumnMergeContext& context, const std::shared_ptr<arrow::RecordBatch>& remap) {

ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
3030

3131
ui32 idx = 0;
3232
for (auto&& batch : Batches) {
33+
AFL_VERIFY(batch->GetColumnsCount() == resultFiltered->GetColumnsCount())("data", batch->GetColumnsCount())(
34+
"schema", resultFiltered->GetColumnsCount());
3335
{
3436
NArrow::NConstruction::IArrayBuilder::TPtr column =
3537
std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntConstFiller<arrow::UInt16Type>>>(
@@ -54,7 +56,6 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
5456
NActors::TLogContextGuard logGuard(
5557
NActors::TLogContextBuilder::Build()("field_name", resultFiltered->GetIndexInfo().GetColumnName(columnId)));
5658
auto columnInfo = stats->GetColumnInfo(columnId);
57-
auto resultField = resultFiltered->GetIndexInfo().GetColumnFieldVerified(columnId);
5859
std::shared_ptr<IColumnMerger> merger = std::make_shared<TPlainMerger>();
5960
// resultFiltered->BuildColumnMergerVerified(columnId);
6061

0 commit comments

Comments
 (0)