Skip to content

Commit c541aea

Browse files
committed
Add QWriter::Close()
commit_hash:43d2f04e77089ac19624b5ba9db9192446c23d0e
1 parent 8049e8a commit c541aea

File tree

5 files changed

+52
-9
lines changed

5 files changed

+52
-9
lines changed

yql/essentials/core/qplayer/storage/file/yql_qstorage_file.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
#include <util/folder/tempdir.h>
88
#include <util/generic/hash_set.h>
99
#include <util/system/fs.h>
10-
#include <util/system/mutex.h>
1110
#include <util/stream/file.h>
11+
#include <util/system/mutex.h>
1212

1313
namespace NYql {
1414

@@ -166,6 +166,12 @@ class TUnbufferedWriter : public TWriterBase {
166166
}
167167
}
168168

169+
void Close() override final {
170+
with_lock(Mutex_) {
171+
DataFile_.Clear();
172+
}
173+
}
174+
169175
private:
170176
const TQWriterSettings Settings_;
171177
const bool AlwaysFlushIndex_;
@@ -195,9 +201,9 @@ class TStorage : public IQStorage {
195201
auto opPath = Folder_ / operationId;
196202
auto writtenAt = writerSettings.WrittenAt.GetOrElse(Now());
197203
if (Settings_.BufferUntilCommit) {
198-
return std::make_shared<TBufferedWriter>(opPath, writtenAt, writerSettings);
204+
return MakeCloseAwareWriterDecorator(std::make_shared<TBufferedWriter>(opPath, writtenAt, writerSettings));
199205
} else {
200-
return std::make_shared<TUnbufferedWriter>(opPath, writtenAt, writerSettings, Settings_.AlwaysFlushIndex);
206+
return MakeCloseAwareWriterDecorator(std::make_shared<TUnbufferedWriter>(opPath, writtenAt, writerSettings, Settings_.AlwaysFlushIndex));
201207
}
202208
}
203209

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,37 @@
11
#include "yql_qstorage.h"
2+
3+
namespace NYql {
4+
class TQWriterDecorator : public IQWriter {
5+
public:
6+
TQWriterDecorator(IQWriterPtr&& underlying) : Underlying_(std::move(underlying)) {}
7+
NThreading::TFuture<void> Put(const TQItemKey& key, const TString& value) override final {
8+
if (Closed_) {
9+
return NThreading::MakeFuture();
10+
}
11+
return Underlying_->Put(key, value);
12+
}
13+
14+
NThreading::TFuture<void> Commit() override final {
15+
if (Closed_) {
16+
throw yexception() << "QWriter closed";
17+
}
18+
return Underlying_->Commit();
19+
}
20+
21+
// Close all used files, doesn't commit anything
22+
void Close() override final {
23+
bool expected = false;
24+
if (Closed_.compare_exchange_strong(expected, true)) {
25+
Underlying_ = {};
26+
}
27+
}
28+
private:
29+
IQWriterPtr Underlying_;
30+
std::atomic<bool> Closed_ = false;
31+
};
32+
33+
IQWriterPtr MakeCloseAwareWriterDecorator(IQWriterPtr&& rhs) {
34+
return std::make_shared<TQWriterDecorator>(std::move(rhs));
35+
}
36+
37+
}

yql/essentials/core/qplayer/storage/interface/yql_qstorage.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@ using IQReaderPtr = std::shared_ptr<IQReader>;
4848
class IQWriter {
4949
public:
5050
virtual ~IQWriter() = default;
51-
5251
virtual NThreading::TFuture<void> Put(const TQItemKey& key, const TString& value) = 0;
5352
// Commmit should be called at most once, no more Put are allowed after it
5453
virtual NThreading::TFuture<void> Commit() = 0;
54+
virtual void Close() {};
5555
};
5656

5757
using IQWriterPtr = std::shared_ptr<IQWriter>;
@@ -134,6 +134,7 @@ class TQContext {
134134
IQWriterPtr Writer_;
135135
};
136136

137+
IQWriterPtr MakeCloseAwareWriterDecorator(IQWriterPtr&& rhs);
137138
}
138139

139140
template <>

yql/essentials/core/qplayer/storage/memory/yql_qstorage_memory.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ class TStorage : public IQStorage {
129129
}
130130

131131
IQWriterPtr MakeWriter(const TString& operationId, const TQWriterSettings& writerSettings) const final {
132-
return std::make_shared<TWriter>(GetOperation(operationId, true), writerSettings);
132+
return MakeCloseAwareWriterDecorator(std::make_shared<TWriter>(GetOperation(operationId, true), writerSettings));
133133
}
134134

135135
IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& iteratorSettings) const final {

yql/essentials/core/qplayer/storage/ydb/yql_qstorage_ydb.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ class TStorage : public IQStorage {
218218
}
219219

220220
IQWriterPtr MakeWriter(const TString& operationId, const TQWriterSettings& settings) const final {
221-
return std::make_shared<TWriter>(Settings_, operationId, settings);
221+
return MakeCloseAwareWriterDecorator(std::make_shared<TWriter>(Settings_, operationId, settings));
222222
}
223223

224224
IQReaderPtr MakeReader(const TString& operationId, const TQReaderSettings& settings) const final {
@@ -238,7 +238,7 @@ class TStorage : public IQStorage {
238238
void LoadTable(const TString& operationId, const IQStoragePtr& memory) const {
239239
auto driver = MakeDriver(Settings_);
240240
NYdb::NTable::TTableClient tableClient(driver);
241-
241+
242242
auto operationsTable = Settings_.TablesPrefix + "operations";
243243
auto fullOperationId = Settings_.OperationIdPrefix + operationId;
244244

@@ -287,7 +287,7 @@ class TStorage : public IQStorage {
287287
TString blobTable = Settings_.Database + "/" + Settings_.TablesPrefix + "blobs";
288288

289289
const auto maxBatchSize = Settings_.MaxBatchSize.GetOrElse(DefaultMaxBatchSize);
290-
auto rtResult = tableClient.RetryOperationSync([&tableIter, maxBatchSize, blobTable,
290+
auto rtResult = tableClient.RetryOperationSync([&tableIter, maxBatchSize, blobTable,
291291
fullOperationId, writtenAt, loadedTotalItems](NYdb::NTable::TSession session) {
292292
auto key1 = NYdb::TValueBuilder()
293293
.BeginTuple()
@@ -321,7 +321,7 @@ class TStorage : public IQStorage {
321321
if (res.IsSuccess()) {
322322
tableIter = res;
323323
}
324-
324+
325325
return res;
326326
}, readRetrySettings);
327327
ThrowOnError(rtResult);

0 commit comments

Comments
 (0)