@@ -131,6 +131,49 @@ NKikimrConfig::TAppConfig GetAppConfig() {
131
131
return appConfig;
132
132
}
133
133
134
+ void CreateExtTable (const TServerSettings& settings, ui32 shards = 2 ,
135
+ const TString& storeName = TTestOlap::StoreName, const TString& tableName = TTestOlap::TableName) {
136
+ TString tableDescr = Sprintf (R"(
137
+ Name: "%s"
138
+ ColumnShardCount: 4
139
+ SchemaPresets {
140
+ Name: "default"
141
+ Schema {
142
+ Columns { Name: "timestamp" Type: "Timestamp" NotNull : true }
143
+ Columns { Name: "resource_type" Type: "Utf8" }
144
+ Columns { Name: "resource_id" Type: "Utf8" }
145
+ Columns { Name: "uid" Type: "Utf8" NotNull : true }
146
+ Columns { Name: "level" Type: "Int32" }
147
+ Columns { Name: "message" Type: "Utf8" }
148
+ Columns { Name: "json_payload" Type: "JsonDocument" }
149
+ Columns { Name: "ingested_at" Type: "Timestamp" }
150
+ Columns { Name: "saved_at" Type: "Timestamp" }
151
+ Columns { Name: "request_id" Type: "Utf8" }
152
+ Columns { Name: "flt" Type: "Float" }
153
+ Columns { Name: "dbl" Type: "Double" }
154
+ KeyColumnNames: "timestamp"
155
+ KeyColumnNames: "uid"
156
+ }
157
+ }
158
+ )" , storeName.c_str ());
159
+
160
+ TClient annoyingClient (settings);
161
+ annoyingClient.SetSecurityToken (" root@builtin" );
162
+ NMsgBusProxy::EResponseStatus status = annoyingClient.CreateOlapStore (" /Root" , tableDescr);
163
+ UNIT_ASSERT_VALUES_EQUAL (status, NMsgBusProxy::EResponseStatus::MSTATUS_OK);
164
+ status = annoyingClient.CreateColumnTable (" /Root" , Sprintf (R"(
165
+ Name: "%s/%s"
166
+ ColumnShardCount : %d
167
+ Sharding {
168
+ HashSharding {
169
+ Function: HASH_FUNCTION_CLOUD_LOGS
170
+ Columns: ["timestamp", "uid"]
171
+ }
172
+ }
173
+ )" , storeName.c_str (), tableName.c_str (), shards));
174
+ UNIT_ASSERT_VALUES_EQUAL (status, NMsgBusProxy::EResponseStatus::MSTATUS_OK);
175
+ }
176
+
134
177
}
135
178
136
179
Y_UNIT_TEST_SUITE (YdbTableBulkUpsertOlap) {
@@ -454,7 +497,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
454
497
csvSettings.set_delimiter (" |" );
455
498
456
499
TString formatSettings;
457
- Y_PROTOBUF_SUPPRESS_NODISCARD csvSettings.SerializeToString (&formatSettings);
500
+ UNIT_ASSERT ( csvSettings.SerializeToString (&formatSettings) );
458
501
459
502
NYdb::NTable::TBulkUpsertSettings upsertSettings;
460
503
upsertSettings.FormatSettings (formatSettings);
@@ -510,7 +553,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
510
553
csvSettings.set_delimiter (" ," );
511
554
512
555
TString formatSettings;
513
- Y_PROTOBUF_SUPPRESS_NODISCARD csvSettings.SerializeToString (&formatSettings);
556
+ UNIT_ASSERT ( csvSettings.SerializeToString (&formatSettings) );
514
557
515
558
NYdb::NTable::TBulkUpsertSettings upsertSettings;
516
559
upsertSettings.FormatSettings (formatSettings);
@@ -559,7 +602,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
559
602
csvSettings.set_delimiter (" ," );
560
603
561
604
TString formatSettings;
562
- Y_PROTOBUF_SUPPRESS_NODISCARD csvSettings.SerializeToString (&formatSettings);
605
+ UNIT_ASSERT ( csvSettings.SerializeToString (&formatSettings) );
563
606
564
607
NYdb::NTable::TBulkUpsertSettings upsertSettings;
565
608
upsertSettings.FormatSettings (formatSettings);
@@ -669,7 +712,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
669
712
csvSettings.set_header (true );
670
713
671
714
TString formatSettings;
672
- Y_PROTOBUF_SUPPRESS_NODISCARD csvSettings.SerializeToString (&formatSettings);
715
+ UNIT_ASSERT ( csvSettings.SerializeToString (&formatSettings) );
673
716
upsertSettings.FormatSettings (formatSettings);
674
717
}
675
718
@@ -937,7 +980,7 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
937
980
csvSettings.set_header (true );
938
981
939
982
TString formatSettings;
940
- Y_PROTOBUF_SUPPRESS_NODISCARD csvSettings.SerializeToString (&formatSettings);
983
+ UNIT_ASSERT ( csvSettings.SerializeToString (&formatSettings) );
941
984
upsertSettings.FormatSettings (formatSettings);
942
985
}
943
986
@@ -948,4 +991,81 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
948
991
UNIT_ASSERT (res.GetStatus () == EStatus::SUCCESS);
949
992
}
950
993
}
994
+
995
+ Y_UNIT_TEST (UpsertMixed) {
996
+ TKikimrWithGrpcAndRootSchema server (GetAppConfig ());
997
+ server.Server_ ->GetRuntime ()->SetLogPriority (NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
998
+
999
+ CreateExtTable (*server.ServerSettings );
1000
+
1001
+ ui16 grpc = server.GetPort ();
1002
+ TString location = TStringBuilder () << " localhost:" << grpc;
1003
+ auto connection = NYdb::TDriver (TDriverConfig ().SetEndpoint (location));
1004
+
1005
+ NYdb::NTable::TTableClient client (connection);
1006
+ auto session = client.GetSession ().ExtractValueSync ().GetSession ();
1007
+ TString tablePath = TTestOlap::TablePath;
1008
+
1009
+ {
1010
+ TString csv =
1011
+ " timestamp|resource_type|resource_id|uid|level|message|json_payload|ingested_at|saved_at|request_id|flt|dbl\n "
1012
+ " 1970-01-01T00:00:00Z|OBJECT|123|guid|5|data|{\" key\" :\" value\" }|1970-01-01T00:00:00Z|1970-01-01T00:00:00Z|1250|1e-17|1e-80" ;
1013
+
1014
+ Ydb::Formats::CsvSettings csvSettings;
1015
+ csvSettings.set_header (true );
1016
+ csvSettings.set_delimiter (" |" );
1017
+
1018
+ TString formatSettings;
1019
+ UNIT_ASSERT (csvSettings.SerializeToString (&formatSettings));
1020
+
1021
+ NYdb::NTable::TBulkUpsertSettings upsertSettings;
1022
+ upsertSettings.FormatSettings (formatSettings);
1023
+
1024
+ auto res = client.BulkUpsert (tablePath,
1025
+ NYdb::NTable::EDataFormat::CSV, csv, {}, upsertSettings).GetValueSync ();
1026
+
1027
+ UNIT_ASSERT_EQUAL_C (res.GetStatus (), EStatus::SUCCESS, res.GetIssues ().ToString ());
1028
+ }
1029
+
1030
+ {
1031
+ auto rowsBuilder = NYdb::TValueBuilder ();
1032
+ rowsBuilder.BeginList ();
1033
+ rowsBuilder.AddListItem ()
1034
+ .BeginStruct ()
1035
+ .AddMember (" timestamp" ).OptionalTimestamp (TInstant::Now ())
1036
+ .AddMember (" resource_type" ).OptionalUtf8 (" FILE" )
1037
+ .AddMember (" resource_id" ).OptionalUtf8 (" 12" )
1038
+ .AddMember (" uid" ).OptionalUtf8 (" guid7" )
1039
+ .AddMember (" level" ).OptionalInt32 (687 )
1040
+ .AddMember (" message" ).OptionalUtf8 (" data2" )
1041
+ .AddMember (" json_payload" ).OptionalJsonDocument (" {\" key1\" : \" value1\" }" )
1042
+ .AddMember (" ingested_at" ).OptionalTimestamp (TInstant::Now ())
1043
+ .AddMember (" saved_at" ).OptionalTimestamp (TInstant::Now ())
1044
+ .AddMember (" request_id" ).OptionalUtf8 (" 126" )
1045
+ .AddMember (" flt" ).OptionalFloat (23.0 )
1046
+ .AddMember (" dbl" ).OptionalDouble (1e+113 )
1047
+ .EndStruct ();
1048
+ rowsBuilder.EndList ();
1049
+
1050
+ auto result = client.BulkUpsert (tablePath, rowsBuilder.Build ()).ExtractValueSync ();
1051
+ UNIT_ASSERT_VALUES_EQUAL_C (result.GetStatus (), NYdb::EStatus::SUCCESS, result.GetIssues ().ToString ());
1052
+ }
1053
+
1054
+ {
1055
+ auto srcBatch = TTestOlap::SampleBatch ();
1056
+ auto schema = srcBatch->schema ();
1057
+ TString strSchema = NArrow::SerializeSchema (*schema);
1058
+ TString strBatch = NArrow::SerializeBatchNoCompression (srcBatch);
1059
+
1060
+ auto res = client.BulkUpsert (tablePath,
1061
+ NYdb::NTable::EDataFormat::ApacheArrow, strBatch, strSchema).GetValueSync ();
1062
+
1063
+ UNIT_ASSERT_EQUAL_C (res.GetStatus (), EStatus::SUCCESS, res.GetIssues ().ToString ());
1064
+ }
1065
+
1066
+ { // Read all
1067
+ auto rows = ScanQuerySelect (client, tablePath);
1068
+ UNIT_ASSERT_EQUAL (rows.size (), 102 );
1069
+ }
1070
+ }
951
1071
}
0 commit comments