Skip to content

Commit 4a0b2a7

Browse files
authored
Add direct conversion from Ydb::Value tuple to Cell in rpc_read_table (#11615)
1 parent 2f2d32d commit 4a0b2a7

File tree

13 files changed

+883
-310
lines changed

13 files changed

+883
-310
lines changed

ydb/core/grpc_services/rpc_object_storage.cpp

Lines changed: 14 additions & 202 deletions
Original file line numberDiff line numberDiff line change
@@ -22,197 +22,6 @@ namespace NGRpcService {
2222

2323
using TEvObjectStorageListingRequest = TGrpcRequestOperationCall<Ydb::ObjectStorage::ListingRequest, Ydb::ObjectStorage::ListingResponse>;
2424

25-
#define CHECK_OR_RETURN_ERROR(cond, descr) \
26-
if (!(cond)) { \
27-
errStr = descr; \
28-
return false; \
29-
}
30-
31-
bool CellFromTuple(NScheme::TTypeInfo type,
32-
const Ydb::Value& tupleValue,
33-
ui32 position,
34-
bool allowCastFromString,
35-
TVector<TCell>& cells,
36-
TString& errStr,
37-
TVector<TString>& memoryOwner) {
38-
auto value_case = tupleValue.value_case();
39-
40-
CHECK_OR_RETURN_ERROR(value_case != Ydb::Value::VALUE_NOT_SET,
41-
Sprintf("Data must be present at position %" PRIu32, position));
42-
43-
CHECK_OR_RETURN_ERROR(tupleValue.itemsSize() == 0 &&
44-
tupleValue.pairsSize() == 0,
45-
Sprintf("Simple type is expected in tuple at position %" PRIu32, position));
46-
47-
TCell c;
48-
auto typeId = type.GetTypeId();
49-
switch (typeId) {
50-
51-
#define CASE_SIMPLE_TYPE(name, type, protoField) \
52-
case NScheme::NTypeIds::name: \
53-
{ \
54-
bool valuePresent = tupleValue.Has##protoField##_value(); \
55-
if (valuePresent) { \
56-
type val = tupleValue.Get##protoField##_value(); \
57-
c = TCell((const char*)&val, sizeof(val)); \
58-
} else if (allowCastFromString && tupleValue.Hastext_value()) { \
59-
const auto slot = NUdf::GetDataSlot(typeId); \
60-
const auto out = NMiniKQL::ValueFromString(slot, tupleValue.Gettext_value()); \
61-
CHECK_OR_RETURN_ERROR(out, Sprintf("Cannot parse value of type " #name " from text '%s' in tuple at position %" PRIu32, tupleValue.Gettext_value().data(), position)); \
62-
const auto val = out.Get<type>(); \
63-
c = TCell((const char*)&val, sizeof(val)); \
64-
} else { \
65-
CHECK_OR_RETURN_ERROR(false, Sprintf("Value of type " #name " expected in tuple at position %" PRIu32, position)); \
66-
} \
67-
Y_ABORT_UNLESS(c.IsInline()); \
68-
break; \
69-
}
70-
71-
CASE_SIMPLE_TYPE(Bool, bool, bool);
72-
CASE_SIMPLE_TYPE(Int8, i8, int32);
73-
CASE_SIMPLE_TYPE(Uint8, ui8, uint32);
74-
CASE_SIMPLE_TYPE(Int16, i16, int32);
75-
CASE_SIMPLE_TYPE(Uint16, ui16, uint32);
76-
CASE_SIMPLE_TYPE(Int32, i32, int32);
77-
CASE_SIMPLE_TYPE(Uint32, ui32, uint32);
78-
CASE_SIMPLE_TYPE(Int64, i64, int64);
79-
CASE_SIMPLE_TYPE(Uint64, ui64, uint64);
80-
CASE_SIMPLE_TYPE(Float, float, float);
81-
CASE_SIMPLE_TYPE(Double, double, double);
82-
CASE_SIMPLE_TYPE(Date, ui16, uint32);
83-
CASE_SIMPLE_TYPE(Datetime, ui32, uint32);
84-
CASE_SIMPLE_TYPE(Timestamp, ui64, uint64);
85-
CASE_SIMPLE_TYPE(Interval, i64, int64);
86-
87-
88-
#undef CASE_SIMPLE_TYPE
89-
90-
case NScheme::NTypeIds::Yson:
91-
case NScheme::NTypeIds::Json:
92-
case NScheme::NTypeIds::Utf8:
93-
{
94-
c = TCell(tupleValue.Gettext_value().data(), tupleValue.Gettext_value().size());
95-
break;
96-
}
97-
case NScheme::NTypeIds::JsonDocument:
98-
case NScheme::NTypeIds::DyNumber:
99-
{
100-
c = TCell(tupleValue.Getbytes_value().data(), tupleValue.Getbytes_value().size());
101-
break;
102-
}
103-
case NScheme::NTypeIds::String:
104-
{
105-
if (tupleValue.Hasbytes_value()) {
106-
c = TCell(tupleValue.Getbytes_value().data(), tupleValue.Getbytes_value().size());
107-
} else if (allowCastFromString && tupleValue.Hastext_value()) {
108-
c = TCell(tupleValue.Gettext_value().data(), tupleValue.Gettext_value().size());
109-
} else {
110-
CHECK_OR_RETURN_ERROR(false, Sprintf("Cannot parse value of type String in tuple at position %" PRIu32, position));
111-
}
112-
break;
113-
}
114-
case NScheme::NTypeIds::Pg:
115-
{
116-
if (tupleValue.Hasbytes_value()) {
117-
c = TCell(tupleValue.Getbytes_value().data(), tupleValue.Getbytes_value().size());
118-
} else if (tupleValue.Hastext_value()) {
119-
auto typeDesc = type.GetPgTypeDesc();
120-
auto convert = NPg::PgNativeBinaryFromNativeText(tupleValue.Gettext_value(), NPg::PgTypeIdFromTypeDesc(typeDesc));
121-
if (convert.Error) {
122-
CHECK_OR_RETURN_ERROR(false, Sprintf("Cannot parse value of type Pg: %s in tuple at position %" PRIu32, convert.Error->data(), position));
123-
} else {
124-
auto &data = memoryOwner.emplace_back(convert.Str);
125-
c = TCell(data);
126-
}
127-
} else {
128-
CHECK_OR_RETURN_ERROR(false, Sprintf("Cannot parse value of type Pg in tuple at position %" PRIu32, position));
129-
}
130-
break;
131-
}
132-
case NScheme::NTypeIds::Uuid:
133-
{
134-
if (tupleValue.Haslow_128()) {
135-
auto &data = memoryOwner.emplace_back();
136-
data.resize(NUuid::UUID_LEN);
137-
NUuid::UuidHalfsToBytes(data.Detach(), data.size(), tupleValue.Gethigh_128(), tupleValue.Getlow_128());
138-
c = TCell(data);
139-
} else if (tupleValue.Hasbytes_value()) {
140-
Y_ABORT_UNLESS(tupleValue.Getbytes_value().size() == NUuid::UUID_LEN);
141-
c = TCell(tupleValue.Getbytes_value().data(), tupleValue.Getbytes_value().size());
142-
} else {
143-
CHECK_OR_RETURN_ERROR(false, Sprintf("Cannot parse value of type Uuid in tuple at position %" PRIu32, position));
144-
}
145-
break;
146-
}
147-
case NScheme::NTypeIds::Decimal:
148-
{
149-
if (tupleValue.Haslow_128()) {
150-
NYql::NDecimal::TInt128 int128 = NYql::NDecimal::FromHalfs(tupleValue.Getlow_128(), tupleValue.Gethigh_128());
151-
auto &data = memoryOwner.emplace_back();
152-
data.resize(sizeof(NYql::NDecimal::TInt128));
153-
std::memcpy(data.Detach(), &int128, sizeof(NYql::NDecimal::TInt128));
154-
c = TCell(data);
155-
} else {
156-
CHECK_OR_RETURN_ERROR(false, Sprintf("Cannot parse value of type Decimal in tuple at position %" PRIu32, position));
157-
}
158-
break;
159-
}
160-
default:
161-
CHECK_OR_RETURN_ERROR(false, Sprintf("Unsupported typeId %" PRIu16 " at index %" PRIu32, typeId, position));
162-
break;
163-
}
164-
165-
CHECK_OR_RETURN_ERROR(!c.IsNull(), Sprintf("Invalid non-NULL value at index %" PRIu32, position));
166-
cells.push_back(c);
167-
168-
return true;
169-
}
170-
171-
// NOTE: TCell's can reference memory from tupleValue
172-
bool CellsFromTuple(const Ydb::Type* tupleType,
173-
const Ydb::Value& tupleValue,
174-
const TConstArrayRef<NScheme::TTypeInfo>& types,
175-
bool allowCastFromString,
176-
TVector<TCell>& key,
177-
TString& errStr,
178-
TVector<TString>& memoryOwner) {
179-
if (tupleType) {
180-
Ydb::Type::TypeCase typeCase = tupleType->type_case();
181-
CHECK_OR_RETURN_ERROR(typeCase == Ydb::Type::kTupleType ||
182-
(typeCase == Ydb::Type::TYPE_NOT_SET && tupleType->tuple_type().elementsSize() == 0), "Must be a tuple");
183-
CHECK_OR_RETURN_ERROR(tupleType->tuple_type().elementsSize() <= types.size(),
184-
"Tuple size " + ToString(tupleType->tuple_type().elementsSize()) + " is greater that expected size " + ToString(types.size()));
185-
186-
for (size_t i = 0; i < tupleType->tuple_type().elementsSize(); ++i) {
187-
const auto& ti = tupleType->tuple_type().Getelements(i);
188-
CHECK_OR_RETURN_ERROR(ti.type_case() == Ydb::Type::kTypeId, "Element at index " + ToString(i) + " in not a TypeId");
189-
const auto& typeId = ti.Gettype_id();
190-
CHECK_OR_RETURN_ERROR(typeId == types[i].GetTypeId() ||
191-
allowCastFromString && (typeId == NScheme::NTypeIds::Utf8),
192-
"Element at index " + ToString(i) + " has type " + Type_PrimitiveTypeId_Name(typeId) + " but expected type is " + ToString(types[i].GetTypeId()));
193-
}
194-
195-
CHECK_OR_RETURN_ERROR(tupleType->Gettuple_type().elementsSize() == tupleValue.itemsSize(),
196-
Sprintf("Tuple value length %" PRISZT " doesn't match the length in type %" PRISZT, tupleValue.itemsSize(), tupleType->Gettuple_type().elementsSize()));
197-
} else {
198-
CHECK_OR_RETURN_ERROR(types.size() >= tupleValue.itemsSize(),
199-
Sprintf("Tuple length %" PRISZT " is greater than key column count %" PRISZT, tupleValue.itemsSize(), types.size()));
200-
}
201-
202-
for (ui32 i = 0; i < tupleValue.itemsSize(); ++i) {
203-
auto& v = tupleValue.Getitems(i);
204-
205-
bool parsed = CellFromTuple(types[i], v, i, allowCastFromString, key, errStr, memoryOwner);
206-
207-
if (!parsed) {
208-
return false;
209-
}
210-
}
211-
212-
return true;
213-
}
214-
#undef CHECK_OR_RETURN_ERROR
215-
21625
struct TFilter {
21726
TVector<ui32> ColumnIds;
21827
TSerializedCellVec FilterValues;
@@ -240,6 +49,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
24049
bool Finished;
24150
TAutoPtr<NSchemeCache::TSchemeCacheNavigate> ResolveNamesResult;
24251
TVector<NScheme::TTypeInfo> KeyColumnTypes;
52+
TVector<TConversionTypeInfo> KeyColumnTypeInfos;
24353
TSysTables::TTableColumnInfo PathColumnInfo;
24454
TVector<TSysTables::TTableColumnInfo> CommonPrefixesColumns;
24555
TVector<TSysTables::TTableColumnInfo> ContentsColumns;
@@ -409,15 +219,17 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
409219
KeyColumnTypes[keyOrder] = ci.second.PType;
410220
keyColumnIds.resize(Max<size_t>(keyColumnIds.size(), keyOrder + 1));
411221
keyColumnIds[keyOrder] = ci.second.Id;
222+
KeyColumnTypeInfos.resize(Max<size_t>(keyColumnIds.size(), keyOrder + 1));
223+
KeyColumnTypeInfos[keyOrder] = {ci.second.PType, ci.second.PTypeMod, ci.second.IsNotNullColumn};
412224
}
413225
}
414226

415227
TString errStr;
416228
TVector<TCell> prefixCells;
417-
TVector<TString> prefixMemoryOwner;
418-
TConstArrayRef<NScheme::TTypeInfo> prefixTypes(KeyColumnTypes.data(), KeyColumnTypes.size() - 1); // -1 for path column
229+
TMemoryPool prefixMemoryOwner(256);
230+
TConstArrayRef<TConversionTypeInfo> prefixTypes(KeyColumnTypeInfos.data(), KeyColumnTypeInfos.size() - 1); // -1 for path column
419231
bool prefixParsedOk = CellsFromTuple(&Request->Getkey_prefix().Gettype(), Request->Getkey_prefix().Getvalue(),
420-
prefixTypes, true, prefixCells, errStr, prefixMemoryOwner);
232+
prefixTypes, true, false, prefixCells, errStr, prefixMemoryOwner);
421233

422234
if (!prefixParsedOk) {
423235
ReplyWithError(Ydb::StatusIds::BAD_REQUEST, "Invalid KeyPrefix: " + errStr, ctx);
@@ -440,10 +252,10 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
440252
CommonPrefixesColumns.push_back(PathColumnInfo);
441253

442254
TVector<TCell> suffixCells;
443-
TVector<TString> suffixMemoryOwner;
444-
TConstArrayRef<NScheme::TTypeInfo> suffixTypes(KeyColumnTypes.data() + pathColPos, KeyColumnTypes.size() - pathColPos); // starts at path column
255+
TMemoryPool suffixMemoryOwner(256);
256+
TConstArrayRef<TConversionTypeInfo> suffixTypes(KeyColumnTypeInfos.data() + pathColPos, KeyColumnTypeInfos.size() - pathColPos); // starts at path column
445257
bool suffixParsedOk = CellsFromTuple(&Request->Getstart_after_key_suffix().Gettype(), Request->Getstart_after_key_suffix().Getvalue(),
446-
suffixTypes, true, suffixCells, errStr, suffixMemoryOwner);
258+
suffixTypes, true, false, suffixCells, errStr, suffixMemoryOwner);
447259
if (!suffixParsedOk) {
448260
ReplyWithError(Ydb::StatusIds::BAD_REQUEST,
449261
"Invalid StartAfterKeySuffix: " + errStr, ctx);
@@ -506,7 +318,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
506318
return false;
507319
}
508320

509-
TVector<NScheme::TTypeInfo> types;
321+
TVector<TConversionTypeInfo> types;
510322

511323
for (int i = 0; i < columnNames.items_size(); i++) {
512324
const auto& colNameValue = columnNames.get_idx_items(i);
@@ -523,7 +335,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
523335
const auto& columnInfo = entry.Columns[colIdIt->second];
524336
const auto& type = columnInfo.PType;
525337

526-
types.push_back(type);
338+
types.emplace_back(type, columnInfo.PTypeMod, columnInfo.IsNotNullColumn);
527339

528340
const auto [it, inserted] = columnToRequestIndex.try_emplace(colName, columnToRequestIndex.size());
529341

@@ -552,14 +364,14 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
552364
Filter.MatchTypes.push_back(dsMatchType);
553365
}
554366

555-
TConstArrayRef<NScheme::TTypeInfo> typesRef(types.data(), types.size());
367+
TConstArrayRef<TConversionTypeInfo> typesRef(types.data(), types.size());
556368

557369
TVector<TCell> cells;
558-
TVector<TString> owner;
370+
TMemoryPool owner(256);
559371

560372
TString err;
561373

562-
bool filterParsedOk = CellsFromTuple(&filterType, columnValues, typesRef, true, cells, err, owner);
374+
bool filterParsedOk = CellsFromTuple(&filterType, columnValues, typesRef, true, false, cells, err, owner);
563375

564376
if (!filterParsedOk) {
565377
ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Sprintf("Invalid filter: '%s'", err.data()), ctx);

ydb/core/grpc_services/rpc_read_table.cpp

Lines changed: 1 addition & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -69,49 +69,6 @@ static void NullSerializeReadTableResponse(const google::protobuf::RepeatedPtrFi
6969
Y_PROTOBUF_SUPPRESS_NODISCARD readTableResponse.SerializeToString(output);
7070
}
7171

72-
static NKikimrMiniKQL::TParams ConvertKey(const Ydb::TypedValue& key) {
73-
NKikimrMiniKQL::TParams protobuf;
74-
ConvertYdbTypeToMiniKQLType(key.type(), *protobuf.MutableType());
75-
ConvertYdbValueToMiniKQLValue(key.type(), key.value(), *protobuf.MutableValue());
76-
return protobuf;
77-
}
78-
79-
template<class TGetOutput>
80-
static void ConvertKeyRange(const Ydb::Table::KeyRange& keyRange, const TGetOutput& getOutput) {
81-
switch (keyRange.from_bound_case()) {
82-
case Ydb::Table::KeyRange::kGreaterOrEqual: {
83-
auto* output = getOutput();
84-
output->SetFromInclusive(true);
85-
output->MutableFrom()->CopyFrom(ConvertKey(keyRange.greater_or_equal()));
86-
break;
87-
}
88-
case Ydb::Table::KeyRange::kGreater: {
89-
auto* output = getOutput();
90-
output->SetFromInclusive(false);
91-
output->MutableFrom()->CopyFrom(ConvertKey(keyRange.greater()));
92-
break;
93-
}
94-
default:
95-
break;
96-
}
97-
switch (keyRange.to_bound_case()) {
98-
case Ydb::Table::KeyRange::kLessOrEqual: {
99-
auto* output = getOutput();
100-
output->SetToInclusive(true);
101-
output->MutableTo()->CopyFrom(ConvertKey(keyRange.less_or_equal()));
102-
break;
103-
}
104-
case Ydb::Table::KeyRange::kLess: {
105-
auto* output = getOutput();
106-
output->SetToInclusive(false);
107-
output->MutableTo()->CopyFrom(ConvertKey(keyRange.less()));
108-
break;
109-
}
110-
default:
111-
break;
112-
}
113-
}
114-
11572
class TReadTableRPC : public TActorBootstrapped<TReadTableRPC> {
11673
enum EWakeupTag : ui64 {
11774
RlSendAllowed = 1,
@@ -555,15 +512,7 @@ class TReadTableRPC : public TActorBootstrapped<TReadTableRPC> {
555512
settings.Columns.push_back(col);
556513
}
557514

558-
try {
559-
ConvertKeyRange(req->key_range(), [&]{ return &settings.KeyRange; });
560-
} catch (const std::exception& ex) {
561-
const NYql::TIssue& issue = NYql::ExceptionToIssue(ex);
562-
google::protobuf::RepeatedPtrField<TYdbIssueMessageType> message;
563-
auto item = message.Add();
564-
NYql::IssueToMessage(issue, item);
565-
return ReplyFinishStream(StatusIds::BAD_REQUEST, message, ctx);
566-
}
515+
settings.KeyRange.CopyFrom(req->key_range());
567516

568517
ReadTableActor = ctx.RegisterWithSameMailbox(NKikimr::NTxProxy::CreateReadTableSnapshotWorker(settings));
569518
}

ydb/core/tx/datashard/build_index.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ bool BuildExtraColumns(TVector<TCell>& cells, const NKikimrIndexBuilder::TColumn
8888
}
8989

9090
auto& back = cells.emplace_back();
91-
if (!CellFromProtoVal(typeInfo, typeMod, &column.default_from_literal().value(), back, err, valueDataPool)) {
91+
if (!CellFromProtoVal(typeInfo, typeMod, &column.default_from_literal().value(), false, back, err, valueDataPool)) {
9292
return false;
9393
}
9494
}

ydb/core/tx/schemeshard/ut_column_build/ut_column_build.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ Y_UNIT_TEST_SUITE(ColumnBuildTest) {
115115
TestBuildColumn(runtime, ++txId, tenantSchemeShard, "/MyRoot/ServerLessDB", "/MyRoot/ServerLessDB/Table", "value", defaultValue, Ydb::StatusIds::BAD_REQUEST);
116116
}
117117

118-
Y_UNIT_TEST(InvalidValue) {
118+
// TODO: rename to InvalidValue and check invalid default value
119+
Y_UNIT_TEST(ValidDefaultValue) {
119120
TTestBasicRuntime runtime;
120121
TTestEnv env(runtime, TTestEnvOptions().EnableAddColumsWithDefaults(true));
121122
ui64 txId = 100;
@@ -215,7 +216,7 @@ Y_UNIT_TEST_SUITE(ColumnBuildTest) {
215216

216217
Ydb::TypedValue defaultValue;
217218
defaultValue.mutable_type()->set_type_id(Ydb::Type::UINT64);
218-
defaultValue.mutable_value()->set_text_value("1111");
219+
defaultValue.mutable_value()->set_uint64_value(1111); // TODO: check invalid value
219220

220221
TestBuildColumn(runtime, ++txId, tenantSchemeShard, "/MyRoot/ServerLessDB", "/MyRoot/ServerLessDB/Table", "ColumnValue", defaultValue, Ydb::StatusIds::SUCCESS);
221222

ydb/core/tx/tx_proxy/read_table.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
#include "defs.h"
33

44
#include <ydb/core/base/row_version.h>
5-
#include <ydb/library/mkql_proto/protos/minikql.pb.h>
65
#include <ydb/core/protos/tx_proxy.pb.h>
6+
#include <ydb/public/api/protos/ydb_table.pb.h>
77

88
namespace NKikimr {
99
namespace NTxProxy {
@@ -21,7 +21,7 @@ namespace NTxProxy {
2121
TString DatabaseName;
2222
TString TablePath;
2323
TVector<TString> Columns;
24-
NKikimrTxUserProxy::TKeyRange KeyRange;
24+
Ydb::Table::KeyRange KeyRange;
2525
ui64 MaxRows = Max<ui64>();
2626
TRowVersion ReadVersion = TRowVersion::Max();
2727
TString UserToken;

0 commit comments

Comments
 (0)