Skip to content

Commit 2a2768a

Browse files
authored
Support of autocommit mode for qplayer file storage (#8344)
1 parent 352f804 commit 2a2768a

File tree

7 files changed

+46
-14
lines changed

7 files changed

+46
-14
lines changed

ydb/library/yql/core/qplayer/storage/file/ut/yql_qstorage_file_ut.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,21 @@ IQStoragePtr MakeUnbufferedFileQStorage() {
1818
return MakeFileQStorage({}, settings);
1919
}
2020

21+
IQStoragePtr MakeUnbufferedFileWithFlushIndexQStorage() {
22+
TFileQStorageSettings settings;
23+
settings.BufferUntilCommit = false;
24+
settings.AlwaysFlushIndex = true;
25+
return MakeFileQStorage({}, settings);
26+
}
27+
2128
Y_UNIT_TEST_SUITE(TQStorageBufferedFileTests) {
22-
GENERATE_TESTS(MakeBufferedFileQStorage)
29+
GENERATE_TESTS(MakeBufferedFileQStorage, false)
2330
}
2431

2532
Y_UNIT_TEST_SUITE(TQStorageUnbufferedFileTests) {
26-
GENERATE_TESTS(MakeUnbufferedFileQStorage)
33+
GENERATE_TESTS(MakeUnbufferedFileQStorage, false)
34+
}
35+
36+
Y_UNIT_TEST_SUITE(TQStorageUnbufferedFileWithFlushIndexTests) {
37+
GENERATE_TESTS(MakeUnbufferedFileWithFlushIndexQStorage, true)
2738
}

ydb/library/yql/core/qplayer/storage/file/yql_qstorage_file.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,18 +101,21 @@ class TBufferedWriter : public TWriterBase {
101101

102102
class TUnbufferedWriter : public TWriterBase {
103103
public:
104-
TUnbufferedWriter(TFsPath& path, TInstant writtenAt, const TQWriterSettings& settings)
104+
TUnbufferedWriter(TFsPath& path, TInstant writtenAt, const TQWriterSettings& settings, bool alwaysFlushIndex)
105105
: TWriterBase(path, writtenAt)
106106
, Settings_(settings)
107+
, AlwaysFlushIndex_(alwaysFlushIndex)
107108
{
108109
DataFile_.ConstructInPlace(Path_.GetPath() + ".dat");
110+
DataFile_->SetFlushPropagateMode(false);
109111
DataFile_->Write(&WrittenAt_, sizeof(WrittenAt_));
110112
}
111113

112114
~TUnbufferedWriter() {
113115
if (!Committed_) {
114116
DataFile_.Clear();
115117
NFs::Remove(Path_.GetPath() + ".dat");
118+
NFs::Remove(Path_.GetPath() + ".idx");
116119
}
117120
}
118121

@@ -134,6 +137,11 @@ class TUnbufferedWriter : public TWriterBase {
134137
if (Settings_.BytesLimit && TotalBytes_ > *Settings_.BytesLimit) {
135138
Overflow_ = true;
136139
}
140+
141+
if (!Overflow_ && AlwaysFlushIndex_) {
142+
DataFile_->Flush();
143+
WriteIndex(TotalItems_, TotalBytes_, Checksum_);
144+
}
137145
}
138146

139147
return NThreading::MakeFuture();
@@ -150,13 +158,17 @@ class TUnbufferedWriter : public TWriterBase {
150158
Committed_ = true;
151159
DataFile_->Finish();
152160
DataFile_.Clear();
153-
WriteIndex(TotalItems_, TotalBytes_, Checksum_);
161+
if (!AlwaysFlushIndex_) {
162+
WriteIndex(TotalItems_, TotalBytes_, Checksum_);
163+
}
164+
154165
return NThreading::MakeFuture();
155166
}
156167
}
157168

158169
private:
159170
const TQWriterSettings Settings_;
171+
const bool AlwaysFlushIndex_;
160172
TMutex Mutex_;
161173
TMaybe<TFileOutput> DataFile_;
162174
ui64 TotalItems_ = 0;
@@ -185,7 +197,7 @@ class TStorage : public IQStorage {
185197
if (Settings_.BufferUntilCommit) {
186198
return std::make_shared<TBufferedWriter>(opPath, writtenAt, writerSettings);
187199
} else {
188-
return std::make_shared<TUnbufferedWriter>(opPath, writtenAt, writerSettings);
200+
return std::make_shared<TUnbufferedWriter>(opPath, writtenAt, writerSettings, Settings_.AlwaysFlushIndex);
189201
}
190202
}
191203

@@ -235,9 +247,9 @@ class TStorage : public IQStorage {
235247
Y_ENSURE(totalBytes <= loadedTotalBytes);
236248
}
237249

238-
Y_ENSURE(!indexFile.ReadChar(dummy));
239250
Y_ENSURE(totalBytes == loadedTotalBytes);
240251
Y_ENSURE(checksum == loadedChecksum);
252+
// data file may have extra data
241253
writer->Commit().GetValueSync();
242254
}
243255

ydb/library/yql/core/qplayer/storage/file/yql_qstorage_file.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ namespace NYql {
55

66
struct TFileQStorageSettings {
77
bool BufferUntilCommit = true;
8+
bool AlwaysFlushIndex = false;
89
};
910

1011
IQStoragePtr MakeFileQStorage(const TString& folder = {}, const TFileQStorageSettings& settings = {});

ydb/library/yql/core/qplayer/storage/memory/ut/yql_qstorage_memory_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@
77
using namespace NYql;
88

99
Y_UNIT_TEST_SUITE(TQStorageMemoryTests) {
10-
GENERATE_TESTS(MakeMemoryQStorage)
10+
GENERATE_TESTS(MakeMemoryQStorage, false)
1111
}

ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ void QStorageTestManyKeys_Impl(const NYql::IQStoragePtr& storage) {
7676
}
7777
}
7878

79-
void QStorageTestInterleaveReadWrite_Impl(const NYql::IQStoragePtr& storage) {
79+
void QStorageTestInterleaveReadWrite_Impl(const NYql::IQStoragePtr& storage, bool commit) {
8080
auto reader = storage->MakeReader("foo", {});
8181
auto value = reader->Get({"comp", "label"}).GetValueSync();
8282
UNIT_ASSERT(!value.Defined());
@@ -87,10 +87,10 @@ void QStorageTestInterleaveReadWrite_Impl(const NYql::IQStoragePtr& storage) {
8787
writer->Put({"comp", "label"}, "value").GetValueSync();
8888
reader = storage->MakeReader("foo", {});
8989
value = reader->Get({"comp", "label"}).GetValueSync();
90-
UNIT_ASSERT(!value.Defined());
90+
UNIT_ASSERT(!value.Defined() == !commit);
9191
auto iterator2 = storage->MakeIterator("foo", {});
9292
value = iterator2->Next().GetValueSync();
93-
UNIT_ASSERT(!value.Defined());
93+
UNIT_ASSERT(!value.Defined() == !commit);
9494
writer->Commit().GetValueSync();
9595
reader = storage->MakeReader("foo", {});
9696
value = reader->Get({"comp", "label"}).GetValueSync();

ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.h

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ void QStorageTestEmpty_Impl(const NYql::IQStoragePtr& storage);
77
void QStorageTestNoCommit_Impl(const NYql::IQStoragePtr& storage);
88
void QStorageTestOne_Impl(const NYql::IQStoragePtr& storage);
99
void QStorageTestManyKeys_Impl(const NYql::IQStoragePtr& storage);
10-
void QStorageTestInterleaveReadWrite_Impl(const NYql::IQStoragePtr& storage);
10+
void QStorageTestInterleaveReadWrite_Impl(const NYql::IQStoragePtr& storage, bool commit);
1111
void QStorageTestLimitWriterItems_Impl(const NYql::IQStoragePtr& storage);
1212
void QStorageTestLimitWriterBytes_Impl(const NYql::IQStoragePtr& storage);
1313

@@ -19,11 +19,19 @@ void QStorageTestLimitWriterBytes_Impl(const NYql::IQStoragePtr& storage);
1919
} \
2020
}
2121

22-
#define GENERATE_TESTS(FACTORY)\
22+
#define GENERATE_ONE_TEST_OPT(NAME, FACTORY, OPT) \
23+
Y_UNIT_TEST(NAME) { \
24+
auto storage = FACTORY(); \
25+
if (storage) { \
26+
QStorageTest##NAME##_Impl(storage, OPT); \
27+
} \
28+
}
29+
30+
#define GENERATE_TESTS(FACTORY, commit)\
2331
GENERATE_ONE_TEST(Empty, FACTORY) \
2432
GENERATE_ONE_TEST(NoCommit, FACTORY) \
2533
GENERATE_ONE_TEST(One, FACTORY) \
2634
GENERATE_ONE_TEST(ManyKeys, FACTORY) \
27-
GENERATE_ONE_TEST(InterleaveReadWrite, FACTORY) \
35+
GENERATE_ONE_TEST_OPT(InterleaveReadWrite, FACTORY, commit) \
2836
GENERATE_ONE_TEST(LimitWriterItems, FACTORY) \
2937
GENERATE_ONE_TEST(LimitWriterBytes, FACTORY)

ydb/library/yql/core/qplayer/storage/ydb/ut/yql_qstorage_ydb_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,5 @@ IQStoragePtr MakeTestYdbQStorage() {
2727
}
2828

2929
Y_UNIT_TEST_SUITE(TQStorageFileTests) {
30-
GENERATE_TESTS(MakeTestYdbQStorage)
30+
GENERATE_TESTS(MakeTestYdbQStorage, false)
3131
}

0 commit comments

Comments
 (0)