Skip to content

Commit f0122e1

Browse files
authored
Fix S3 buffer min bytes setting (#17556)
1 parent 9731c04 commit f0122e1

File tree

6 files changed

+139
-3
lines changed

6 files changed

+139
-3
lines changed

ydb/core/tx/datashard/export_s3.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ class TS3Export: public IExport {
3636
bufferSettings
3737
.WithColumns(Columns)
3838
.WithMaxRows(maxRows)
39-
.WithMaxBytes(maxBytes);
39+
.WithMaxBytes(maxBytes)
40+
.WithMinBytes(minBytes); // S3 API returns EntityTooSmall error if file part is smaller that 5MB: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
4041
if (Task.GetEnableChecksums()) {
4142
bufferSettings.WithChecksum(TS3ExportBufferSettings::Sha256Checksum());
4243
}
@@ -46,7 +47,6 @@ class TS3Export: public IExport {
4647
break;
4748
case ECompressionCodec::Zstd:
4849
bufferSettings
49-
.WithMinBytes(minBytes)
5050
.WithCompression(TS3ExportBufferSettings::ZstdCompression(Task.GetCompression().GetLevel()));
5151
break;
5252
case ECompressionCodec::Invalid:

ydb/core/tx/datashard/export_s3_buffer.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ class TZStdCompressionProcessor {
4545

4646
TMaybe<TBuffer> Flush();
4747

48+
size_t GetReadyOutputBytes() const {
49+
return Buffer.Size();
50+
}
51+
4852
private:
4953
enum ECompressionResult {
5054
CONTINUE,
@@ -324,7 +328,11 @@ void TS3Buffer::Clear() {
324328
}
325329

326330
bool TS3Buffer::IsFilled() const {
327-
if (Buffer.Size() < MinBytes) {
331+
size_t outputSize = Buffer.Size();
332+
if (Compression) {
333+
outputSize = Compression->GetReadyOutputBytes();
334+
}
335+
if (outputSize < MinBytes) {
328336
return false;
329337
}
330338

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
#include "export_s3_buffer.h"
2+
#include <ydb/core/tx/datashard/export_scan.h>
3+
4+
#include <library/cpp/testing/unittest/registar.h>
5+
6+
#include <util/generic/array_ref.h>
7+
8+
#ifndef KIKIMR_DISABLE_S3_OPS
9+
10+
namespace NKikimr::NDataShard {
11+
12+
class TExportS3BufferFixture : public NUnitTest::TBaseFixture {
13+
public:
14+
void SetUp(NUnitTest::TTestContext&) override {
15+
Columns[0] = TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::Uint32), "", "key", true);
16+
Columns[1] = TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::String), "", "value", false);
17+
}
18+
19+
TS3ExportBufferSettings& Settings() {
20+
return S3ExportBufferSettings;
21+
}
22+
23+
IExport::TTableColumns& TableColumns() {
24+
return Columns;
25+
}
26+
27+
NExportScan::IBuffer& Buffer() {
28+
if (!S3ExportBuffer) {
29+
TS3ExportBufferSettings settings = S3ExportBufferSettings;
30+
settings.WithColumns(Columns);
31+
S3ExportBuffer.Reset(CreateS3ExportBuffer(std::move(settings)));
32+
33+
TVector<ui32> tags;
34+
tags.reserve(Columns.size());
35+
for (auto&& [tag, _] : Columns) {
36+
tags.push_back(tag);
37+
}
38+
S3ExportBuffer->ColumnsOrder(tags);
39+
}
40+
return *S3ExportBuffer;
41+
}
42+
43+
bool CollectKeyValue(ui32 k, TStringBuf v) {
44+
NTable::IScan::TRow row;
45+
row.Init(2);
46+
row.Set(0, NKikimr::NTable::ECellOp::Set, NKikimr::TCell::Make(k));
47+
row.Set(1, NKikimr::NTable::ECellOp::Set, NKikimr::TCell(v.data(), v.size()));
48+
return Buffer().Collect(row);
49+
}
50+
51+
// Tests impl
52+
void TestMinBufferSize(ui64 minBufferSize) {
53+
for (ui32 i = 0; i < 100; ++i) {
54+
UNIT_ASSERT(CollectKeyValue(i, "1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"));
55+
NExportScan::IBuffer::TStats stats;
56+
if (Buffer().IsFilled()) {
57+
NActors::IEventBase* event = Buffer().PrepareEvent(false, stats);
58+
UNIT_ASSERT(event);
59+
auto* evBuffer = dynamic_cast<NKikimr::NDataShard::TEvExportScan::TEvBuffer<TBuffer>*>(event);
60+
UNIT_ASSERT(evBuffer);
61+
UNIT_ASSERT_GE_C(evBuffer->Buffer.Size(), minBufferSize, "Got buffer size " << evBuffer->Buffer.Size() << ". Iteration: " << i);
62+
}
63+
}
64+
}
65+
66+
public:
67+
IExport::TTableColumns Columns;
68+
TS3ExportBufferSettings S3ExportBufferSettings;
69+
THolder<NExportScan::IBuffer> S3ExportBuffer;
70+
};
71+
72+
Y_UNIT_TEST_SUITE_F(ExportS3BufferTest, TExportS3BufferFixture) {
73+
Y_UNIT_TEST(MinBufferSize) {
74+
ui64 minBufferSize = 5000;
75+
Settings()
76+
.WithMaxRows(2)
77+
.WithMinBytes(minBufferSize)
78+
.WithMaxBytes(1'000'000);
79+
80+
TestMinBufferSize(minBufferSize);
81+
}
82+
83+
Y_UNIT_TEST(MinBufferSizeWithCompression) {
84+
ui64 minBufferSize = 5000;
85+
Settings()
86+
.WithCompression(TS3ExportBufferSettings::ZstdCompression(20))
87+
.WithMaxRows(2)
88+
.WithMinBytes(minBufferSize)
89+
.WithMaxBytes(1'000'000);
90+
91+
TestMinBufferSize(minBufferSize);
92+
}
93+
94+
Y_UNIT_TEST(MinBufferSizeWithCompressionAndEncryption) {
95+
ui64 minBufferSize = 5000;
96+
Settings()
97+
.WithCompression(TS3ExportBufferSettings::ZstdCompression(20))
98+
.WithEncryption(TS3ExportBufferSettings::TEncryptionSettings()
99+
.WithAlgorithm("AES-256-GCM")
100+
.WithIV(NBackup::TEncryptionIV::Generate())
101+
.WithKey(NBackup::TEncryptionKey("256 bit test symmetric key bytes")))
102+
.WithMaxRows(2)
103+
.WithMinBytes(minBufferSize)
104+
.WithMaxBytes(1'000'000);
105+
106+
TestMinBufferSize(minBufferSize);
107+
}
108+
}
109+
110+
} // namespace NKikimr::NDataShard
111+
112+
#endif // KIKIMR_DISABLE_S3_OPS
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
UNITTEST_FOR(ydb/core/tx/datashard)
2+
3+
PEERDIR(
4+
ydb/core/testlib/default
5+
)
6+
7+
YQL_LAST_ABI_VERSION()
8+
9+
SRCS(
10+
export_s3_buffer_ut.cpp
11+
)
12+
13+
END()

ydb/core/tx/datashard/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ RECURSE_FOR_TESTS(
313313
ut_compaction
314314
ut_data_cleanup
315315
ut_erase_rows
316+
ut_export
316317
ut_external_blobs
317318
ut_followers
318319
ut_incremental_backup

ydb/core/tx/schemeshard/ut_export/ut_export.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,7 @@ namespace {
521521
backupTxId = record.GetTxId();
522522
// hijack
523523
schemeTx.MutableBackup()->MutableScanSettings()->SetRowsBatchSize(1);
524+
schemeTx.MutableBackup()->MutableS3Settings()->MutableLimits()->SetMinWriteBatchSize(1);
524525
record.SetTxBody(schemeTx.SerializeAsString());
525526
}
526527

@@ -1665,6 +1666,7 @@ partitioning_settings {
16651666

16661667
if (schemeTx.HasBackup()) {
16671668
schemeTx.MutableBackup()->MutableScanSettings()->SetRowsBatchSize(1);
1669+
schemeTx.MutableBackup()->MutableS3Settings()->MutableLimits()->SetMinWriteBatchSize(1);
16681670
record.SetTxBody(schemeTx.SerializeAsString());
16691671
}
16701672

0 commit comments

Comments
 (0)