Skip to content

Commit ed5fb29

Browse files
committed
Fixed uninitializing value batch_size_bytes of the transfer (#17065)
1 parent 4956e87 commit ed5fb29

File tree

3 files changed

+33
-3
lines changed

3 files changed

+33
-3
lines changed

ydb/core/kqp/host/kqp_gateway_proxy.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2614,7 +2614,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
26142614
target.SetDstPath(AdjustPath(dst, GetDatabase()));
26152615
target.SetTransformLambda(lambda);
26162616
if (settings.Settings.Batching && settings.Settings.Batching->BatchSizeBytes) {
2617-
config.MutableTransferSpecific()->MutableBatching()->SetBatchSizeBytes(settings.Settings.Batching->BatchSizeBytes);
2617+
config.MutableTransferSpecific()->MutableBatching()->SetBatchSizeBytes(settings.Settings.Batching->BatchSizeBytes.value());
26182618
}
26192619
if (settings.Settings.Batching && settings.Settings.Batching->FlushInterval) {
26202620
config.MutableTransferSpecific()->MutableBatching()->SetFlushIntervalMilliSeconds(settings.Settings.Batching->FlushInterval.MilliSeconds());
@@ -2672,7 +2672,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
26722672
op.MutableAlterTransfer()->SetFlushIntervalMilliSeconds(batching->FlushInterval.MilliSeconds());
26732673
}
26742674
if (batching->BatchSizeBytes) {
2675-
op.MutableAlterTransfer()->SetBatchSizeBytes(batching->BatchSizeBytes);
2675+
op.MutableAlterTransfer()->SetBatchSizeBytes(batching->BatchSizeBytes.value());
26762676
}
26772677
}
26782678

ydb/core/kqp/provider/yql_kikimr_gateway.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,7 @@ struct TTransferSettings : public TReplicationSettingsBase {
916916

917917
struct TBatching {
918918
TDuration FlushInterval;
919-
ui64 BatchSizeBytes;
919+
std::optional<ui64> BatchSizeBytes;
920920
};
921921

922922
TMaybe<TString> ConsumerName;

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8174,6 +8174,21 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
81748174
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
81758175
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
81768176
}
8177+
8178+
{
8179+
auto query = Sprintf(R"(
8180+
--!syntax_v1
8181+
CREATE TRANSFER `/Root/transfer_fi`
8182+
FROM `/Root/topic` TO `/Root/table`
8183+
WITH (
8184+
CONNECTION_STRING = "%s",
8185+
FLUSH_INTERVAL = Interval('PT1S')
8186+
);
8187+
)", kikimr.GetEndpoint().c_str());
8188+
8189+
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
8190+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
8191+
}
81778192
}
81788193

81798194
Y_UNIT_TEST(CreateTransfer_QueryService) {
@@ -8417,6 +8432,21 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
84178432
const auto result = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
84188433
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
84198434
}
8435+
8436+
{
8437+
auto query = Sprintf(R"(
8438+
--!syntax_v1
8439+
CREATE TRANSFER `/Root/transfer_fi`
8440+
FROM `/Root/topic` TO `/Root/table`
8441+
WITH (
8442+
CONNECTION_STRING = "%s",
8443+
FLUSH_INTERVAL = Interval('PT1S')
8444+
);
8445+
)", kikimr.GetEndpoint().c_str());
8446+
8447+
const auto result = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
8448+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
8449+
}
84208450
}
84218451

84228452
Y_UNIT_TEST(AlterTransfer) {

0 commit comments

Comments
 (0)