Skip to content

Commit c0d60de

Browse files
remove table only for empty insert table (#8822)
1 parent 6ac4fc5 commit c0d60de

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+769
-492
lines changed

ydb/core/formats/arrow/modifier/schema.h

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,95 @@
11
#pragma once
2+
#include <ydb/library/accessor/accessor.h>
23
#include <ydb/library/actors/core/log.h>
34
#include <ydb/library/conclusion/status.h>
5+
46
#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
57
#include <util/generic/hash.h>
68

9+
namespace NKikimr::NArrow {
10+
11+
class TSchemaLite {
12+
private:
13+
YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Field>>, Fields);
14+
15+
public:
16+
TSchemaLite() = default;
17+
TSchemaLite(const std::shared_ptr<arrow::Schema>& schema) {
18+
AFL_VERIFY(schema);
19+
Fields = schema->fields();
20+
}
21+
22+
const std::shared_ptr<arrow::Field>& field(const ui32 index) const {
23+
return GetFieldByIndexVerified(index);
24+
}
25+
26+
bool Equals(const TSchemaLite& schema, const bool withMetadata = false) const {
27+
if (Fields.size() != schema.Fields.size()) {
28+
return false;
29+
}
30+
for (ui32 i = 0; i < Fields.size(); ++i) {
31+
if (!Fields[i]->Equals(schema.Fields[i], withMetadata)) {
32+
return false;
33+
}
34+
}
35+
return true;
36+
}
37+
38+
const std::vector<std::shared_ptr<arrow::Field>>& fields() const {
39+
return Fields;
40+
}
41+
42+
int num_fields() const {
43+
return Fields.size();
44+
}
45+
46+
std::vector<std::string> field_names() const {
47+
std::vector<std::string> result;
48+
result.reserve(Fields.size());
49+
for (auto&& f : Fields) {
50+
result.emplace_back(f->name());
51+
}
52+
return result;
53+
}
54+
55+
TString DebugString() const {
56+
TStringBuilder sb;
57+
sb << "[";
58+
for (auto&& f : Fields) {
59+
sb << f->ToString() << ";";
60+
}
61+
sb << "]";
62+
63+
return sb;
64+
}
65+
66+
TString ToString() const {
67+
return DebugString();
68+
}
69+
70+
const std::shared_ptr<arrow::Field>& GetFieldByIndexVerified(const ui32 index) const {
71+
AFL_VERIFY(index < Fields.size());
72+
return Fields[index];
73+
}
74+
75+
const std::shared_ptr<arrow::Field>& GetFieldByIndexOptional(const ui32 index) const {
76+
if (index < Fields.size()) {
77+
return Fields[index];
78+
}
79+
return Default<std::shared_ptr<arrow::Field>>();
80+
}
81+
82+
TSchemaLite(std::vector<std::shared_ptr<arrow::Field>>&& fields)
83+
: Fields(std::move(fields)) {
84+
}
85+
86+
TSchemaLite(const std::vector<std::shared_ptr<arrow::Field>>& fields)
87+
: Fields(fields) {
88+
}
89+
};
90+
91+
} // namespace NKikimr::NArrow
92+
793
namespace NKikimr::NArrow::NModifier {
894
class TSchema {
995
private:
@@ -13,6 +99,7 @@ class TSchema {
1399
bool Finished = false;
14100

15101
void Initialize(const std::vector<std::shared_ptr<arrow::Field>>& fields);
102+
16103
public:
17104
TSchema() = default;
18105
TSchema(const std::shared_ptr<TSchema>& schema);
@@ -75,4 +162,4 @@ class TSchema {
75162
}
76163
};
77164
};
78-
}
165+
} // namespace NKikimr::NArrow::NModifier

ydb/core/formats/arrow/process_columns.cpp

Lines changed: 47 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include "process_columns.h"
2+
23
#include "common/adapter.h"
4+
#include "modifier/schema.h"
35
#include "modifier/subset.h"
46

57
#include <util/string/join.h>
@@ -8,8 +10,8 @@ namespace NKikimr::NArrow {
810

911
namespace {
1012
template <class TDataContainer, class TStringImpl>
11-
std::shared_ptr<TDataContainer> ExtractColumnsValidateImpl(const std::shared_ptr<TDataContainer>& srcBatch,
12-
const std::vector<TStringImpl>& columnNames) {
13+
std::shared_ptr<TDataContainer> ExtractColumnsValidateImpl(
14+
const std::shared_ptr<TDataContainer>& srcBatch, const std::vector<TStringImpl>& columnNames) {
1315
std::vector<std::shared_ptr<arrow::Field>> fields;
1416
fields.reserve(columnNames.size());
1517
std::vector<std::shared_ptr<typename NAdapter::TDataBuilderPolicy<TDataContainer>::TColumn>> columns;
@@ -27,9 +29,9 @@ std::shared_ptr<TDataContainer> ExtractColumnsValidateImpl(const std::shared_ptr
2729
return NAdapter::TDataBuilderPolicy<TDataContainer>::Build(std::move(fields), std::move(columns), srcBatch->num_rows());
2830
}
2931

30-
template <class TDataContainer>
31-
TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(const std::shared_ptr<TDataContainer>& srcBatch,
32-
const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
32+
template <class TDataContainer, class TSchemaImpl>
33+
TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(
34+
const std::shared_ptr<TDataContainer>& srcBatch, const std::shared_ptr<TSchemaImpl>& dstSchema, TSchemaSubset* subset) {
3335
AFL_VERIFY(srcBatch);
3436
AFL_VERIFY(dstSchema);
3537
std::vector<std::shared_ptr<typename NAdapter::TDataBuilderPolicy<TDataContainer>::TColumn>> columns;
@@ -48,16 +50,16 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(const std::shared_
4850
fields.emplace_back(field);
4951
auto srcField = srcBatch->schema()->field(index);
5052
if (field->Equals(srcField)) {
51-
AFL_VERIFY(columns.back()->type()->Equals(field->type()))("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name())
52-
("column_type", field->type()->ToString())("incoming_type", columns.back()->type()->ToString());
53+
AFL_VERIFY(columns.back()->type()->Equals(field->type()))("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
54+
"column", field->name())("column_type", field->type()->ToString())("incoming_type", columns.back()->type()->ToString());
5355
} else {
54-
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name())
55-
("column_type", field->ToString(true))("incoming_type", srcField->ToString(true));
56+
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
57+
"column", field->name())("column_type", field->ToString(true))("incoming_type", srcField->ToString(true));
5658
return TConclusionStatus::Fail("incompatible column types");
5759
}
5860
} else if (!subset) {
59-
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "not_found_column")("column", field->name())
60-
("column_type", field->type()->ToString())("columns", JoinSeq(",", srcBatch->schema()->field_names()));
61+
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "not_found_column")("column", field->name())(
62+
"column_type", field->type()->ToString())("columns", JoinSeq(",", srcBatch->schema()->field_names()));
6163
return TConclusionStatus::Fail("not found column '" + field->name() + "'");
6264
}
6365
++idx;
@@ -76,7 +78,8 @@ std::shared_ptr<TDataContainer> ExtractImpl(const TColumnOperator::EExtractProbl
7678
auto result = ExtractColumnsValidateImpl(incoming, columnNames);
7779
switch (policy) {
7880
case TColumnOperator::EExtractProblemsPolicy::Verify:
79-
AFL_VERIFY((ui32)result->num_columns() == columnNames.size())("schema", incoming->schema()->ToString())("required", JoinSeq(",", columnNames));
81+
AFL_VERIFY((ui32)result->num_columns() == columnNames.size())("schema", incoming->schema()->ToString())(
82+
"required", JoinSeq(",", columnNames));
8083
break;
8184
case TColumnOperator::EExtractProblemsPolicy::Null:
8285
if ((ui32)result->num_columns() != columnNames.size()) {
@@ -90,7 +93,8 @@ std::shared_ptr<TDataContainer> ExtractImpl(const TColumnOperator::EExtractProbl
9093
}
9194

9295
template <class TDataContainer, class TStringType>
93-
TConclusion<std::shared_ptr<TDataContainer>> ReorderImpl(const std::shared_ptr<TDataContainer>& incoming, const std::vector<TStringType>& columnNames) {
96+
TConclusion<std::shared_ptr<TDataContainer>> ReorderImpl(
97+
const std::shared_ptr<TDataContainer>& incoming, const std::vector<TStringType>& columnNames) {
9498
AFL_VERIFY(!!incoming);
9599
AFL_VERIFY(columnNames.size());
96100
if ((ui32)incoming->num_columns() < columnNames.size()) {
@@ -107,46 +111,65 @@ TConclusion<std::shared_ptr<TDataContainer>> ReorderImpl(const std::shared_ptr<T
107111
return result;
108112
}
109113

110-
}
114+
} // namespace
111115

112-
std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames) {
116+
std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(
117+
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames) {
113118
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
114119
}
115120

116-
std::shared_ptr<arrow::Table> TColumnOperator::Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames) {
121+
std::shared_ptr<arrow::Table> TColumnOperator::Extract(
122+
const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames) {
117123
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
118124
}
119125

120-
std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames) {
126+
std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(
127+
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames) {
121128
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
122129
}
123130

124131
std::shared_ptr<arrow::Table> TColumnOperator::Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames) {
125132
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
126133
}
127134

128-
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
135+
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Adapt(
136+
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
137+
return AdaptColumnsImpl(incoming, dstSchema, subset);
138+
}
139+
140+
NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Adapt(
141+
const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
142+
return AdaptColumnsImpl(incoming, dstSchema, subset);
143+
}
144+
145+
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Adapt(
146+
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema, TSchemaSubset* subset) {
129147
return AdaptColumnsImpl(incoming, dstSchema, subset);
130148
}
131149

132-
NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Adapt(const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
150+
NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Adapt(
151+
const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema, TSchemaSubset* subset) {
133152
return AdaptColumnsImpl(incoming, dstSchema, subset);
134153
}
135154

136-
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Reorder(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames) {
155+
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Reorder(
156+
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames) {
137157
return ReorderImpl(incoming, columnNames);
138158
}
139159

140-
NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames) {
160+
NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(
161+
const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames) {
141162
return ReorderImpl(incoming, columnNames);
142163
}
143164

144-
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Reorder(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames) {
165+
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Reorder(
166+
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames) {
145167
return ReorderImpl(incoming, columnNames);
146168
}
147169

148-
NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames) {
170+
NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(
171+
const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames) {
149172
return ReorderImpl(incoming, columnNames);
150173
}
151174

152-
}
175+
} // namespace NKikimr::NArrow

ydb/core/formats/arrow/process_columns.h

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
namespace NKikimr::NArrow {
77

88
class TSchemaSubset;
9+
class TSchemaLite;
910

1011
class TColumnOperator {
1112
public:
@@ -14,6 +15,7 @@ class TColumnOperator {
1415
Verify,
1516
Skip
1617
};
18+
1719
private:
1820
EExtractProblemsPolicy AbsentColumnPolicy = EExtractProblemsPolicy::Verify;
1921

@@ -33,18 +35,28 @@ class TColumnOperator {
3335
return *this;
3436
}
3537

36-
std::shared_ptr<arrow::RecordBatch> Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
38+
std::shared_ptr<arrow::RecordBatch> Extract(
39+
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
3740
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames);
3841
std::shared_ptr<arrow::RecordBatch> Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames);
3942
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames);
4043

41-
TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
42-
TConclusion<std::shared_ptr<arrow::Table>> Adapt(const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
43-
44-
TConclusion<std::shared_ptr<arrow::RecordBatch>> Reorder(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
45-
TConclusion<std::shared_ptr<arrow::Table>> Reorder(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames);
46-
TConclusion<std::shared_ptr<arrow::RecordBatch>> Reorder(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames);
44+
TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(
45+
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
46+
TConclusion<std::shared_ptr<arrow::Table>> Adapt(
47+
const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
48+
TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming,
49+
const std::shared_ptr<NArrow::TSchemaLite>& dstSchema, TSchemaSubset* subset = nullptr);
50+
TConclusion<std::shared_ptr<arrow::Table>> Adapt(
51+
const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema, TSchemaSubset* subset = nullptr);
52+
53+
TConclusion<std::shared_ptr<arrow::RecordBatch>> Reorder(
54+
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
55+
TConclusion<std::shared_ptr<arrow::Table>> Reorder(
56+
const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames);
57+
TConclusion<std::shared_ptr<arrow::RecordBatch>> Reorder(
58+
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames);
4759
TConclusion<std::shared_ptr<arrow::Table>> Reorder(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames);
4860
};
4961

50-
}
62+
} // namespace NKikimr::NArrow

ydb/core/tx/columnshard/columnshard__propose_transaction.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,10 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
9292
auto internalOp = Self->GetProgressTxController().GetTxOperatorOptional(txId);
9393
if (!internalOp) {
9494
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "removed tx operator");
95+
return;
9596
}
96-
NActors::TLogContextGuard lGuardTx = NActors::TLogContextBuilder::Build()("int_op_tx", internalOp->GetTxInfo().DebugString());
97+
NActors::TLogContextGuard lGuardTx =
98+
NActors::TLogContextBuilder::Build()("int_op_tx", internalOp->GetTxInfo().DebugString())("int_this", (ui64)internalOp.get());
9799
if (!internalOp->CheckTxInfoForReply(*TxInfo)) {
98100
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "deprecated tx operator");
99101
return;
@@ -143,11 +145,14 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
143145
return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "No primary index for TTL");
144146
}
145147

146-
auto schema = Self->TablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema()->GetSchema();
147-
auto ttlColumn = schema->GetFieldByName(columnName);
148-
if (!ttlColumn) {
149-
return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL tx wrong TTL column '" + columnName + "'");
148+
auto schemaSnapshot = Self->TablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema();
149+
auto schema = schemaSnapshot->GetSchema();
150+
auto index = schemaSnapshot->GetColumnIdOptional(columnName);
151+
if (!index) {
152+
return TTxController::TProposeResult(
153+
NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL tx wrong TTL column '" + columnName + "'");
150154
}
155+
auto ttlColumn = schemaSnapshot->GetFieldByColumnIdVerified(*index);
151156

152157
const TInstant now = TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now();
153158
for (ui64 pathId : ttlBody.GetPathIds()) {

0 commit comments

Comments
 (0)