Skip to content

Commit 5e5de15

Browse files
kardymondsfedor-mironyumkam
authored andcommitted
YQ-3721 PQ sink: write to file (ydb-platform#10501)
Co-authored-by: Fiodar Miron <fedor-miron@ydb.tech> Co-authored-by: yumkam <yumkam7@ydb.tech>
1 parent 26b0dc5 commit 5e5de15

File tree

9 files changed

+219
-23
lines changed

9 files changed

+219
-23
lines changed

ydb/core/fq/libs/init/init.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ void Init(
225225
std::make_shared<NYql::TPqGatewayConfig>(protoConfig.GetGateways().GetPq()),
226226
appData->FunctionRegistry
227227
);
228-
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, NYql::CreatePqNativeGateway(std::move(pqServices)),
228+
auto pqGateway = NYql::CreatePqNativeGateway(std::move(pqServices));
229+
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, pqGateway,
229230
yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), protoConfig.GetCommon().GetPqReconnectPeriod());
230231

231232
s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
@@ -234,7 +235,7 @@ void Init(
234235
httpGateway, s3HttpRetryPolicy);
235236

236237
RegisterGenericProviderFactories(*asyncIoFactory, credentialsFactory, connectorClient);
237-
RegisterDqPqWriteActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, yqCounters->GetSubgroup("subsystem", "DqSinkTracker"));
238+
RegisterDqPqWriteActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, pqGateway, yqCounters->GetSubgroup("subsystem", "DqSinkTracker"));
238239
RegisterDQSolomonWriteActorFactory(*asyncIoFactory, credentialsFactory);
239240
}
240241

ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
142142
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
143143
IDqComputeActorAsyncOutput::ICallbacks* callbacks,
144144
const ::NMonitoring::TDynamicCounterPtr& counters,
145-
i64 freeSpace)
145+
i64 freeSpace,
146+
const IPqGateway::TPtr& pqGateway)
146147
: TActor<TDqPqWriteActor>(&TDqPqWriteActor::StateFunc)
147148
, OutputIndex(outputIndex)
148149
, TxId(txId)
@@ -153,7 +154,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
153154
, Callbacks(callbacks)
154155
, LogPrefix(TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", TaskId: " << taskId << ", PQ sink. ")
155156
, FreeSpace(freeSpace)
156-
, TopicClient(Driver, GetTopicClientSettings())
157+
, PqGateway(pqGateway)
157158
{
158159
EgressStats.Level = statsLevel;
159160
}
@@ -303,6 +304,13 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
303304
: NYdb::NTopic::ECodec::GZIP);
304305
}
305306

307+
ITopicClient& GetTopicClient() {
308+
if (!TopicClient) {
309+
TopicClient = PqGateway->GetTopicClient(Driver, GetTopicClientSettings());
310+
}
311+
return *TopicClient;
312+
}
313+
306314
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings() {
307315
return NYdb::NTopic::TTopicClientSettings()
308316
.Database(SinkParams.GetDatabase())
@@ -317,7 +325,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
317325

318326
void CreateSessionIfNotExists() {
319327
if (!WriteSession) {
320-
WriteSession = TopicClient.CreateWriteSession(GetWriteSessionSettings());
328+
WriteSession = GetTopicClient().CreateWriteSession(GetWriteSessionSettings());
321329
SubscribeOnNextEvent();
322330
}
323331
}
@@ -475,7 +483,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
475483
i64 FreeSpace = 0;
476484
bool Finished = false;
477485

478-
NYdb::NTopic::TTopicClient TopicClient;
486+
ITopicClient::TPtr TopicClient;
479487
std::shared_ptr<NYdb::NTopic::IWriteSession> WriteSession;
480488
TString SourceId;
481489
ui64 NextSeqNo = 1;
@@ -486,6 +494,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
486494
std::queue<TString> Buffer;
487495
std::queue<TAckInfo> WaitingAcks; // Size of items which are waiting for acks (used to update free space)
488496
std::queue<std::tuple<ui64, NDqProto::TCheckpoint>> DeferredCheckpoints;
497+
IPqGateway::TPtr PqGateway;
489498
};
490499

491500
std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(
@@ -499,6 +508,7 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(
499508
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
500509
IDqComputeActorAsyncOutput::ICallbacks* callbacks,
501510
const ::NMonitoring::TDynamicCounterPtr& counters,
511+
IPqGateway::TPtr pqGateway,
502512
i64 freeSpace)
503513
{
504514
const TString& tokenName = settings.GetToken().GetName();
@@ -515,13 +525,14 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(
515525
CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token, addBearerToToken),
516526
callbacks,
517527
counters,
518-
freeSpace);
528+
freeSpace,
529+
pqGateway);
519530
return {actor, actor};
520531
}
521532

522-
void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters) {
533+
void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters) {
523534
factory.RegisterSink<NPq::NProto::TDqPqTopicSink>("PqSink",
524-
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters](
535+
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway](
525536
NPq::NProto::TDqPqTopicSink&& settings,
526537
IDqAsyncIoFactory::TSinkArguments&& args)
527538
{
@@ -536,7 +547,8 @@ void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver dri
536547
driver,
537548
credentialsFactory,
538549
args.Callback,
539-
counters
550+
counters,
551+
pqGateway
540552
);
541553
});
542554
}

ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
55

66
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
7+
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
78
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
89

910
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
@@ -31,8 +32,9 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(
3132
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
3233
IDqComputeActorAsyncOutput::ICallbacks* callbacks,
3334
const ::NMonitoring::TDynamicCounterPtr& counters,
35+
IPqGateway::TPtr pqGateway,
3436
i64 freeSpace = DqPqDefaultFreeSpace);
3537

36-
void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>());
38+
void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>());
3739

3840
} // namespace NYql::NDq

ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp

Lines changed: 176 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
3636
Y_UNUSED(maxByteSize);
3737

3838
TVector<NYdb::NTopic::TReadSessionEvent::TEvent> res;
39-
for (auto event = EventsQ_.Pop(block); !event.Empty() && res.size() <= maxEventsCount.GetOrElse(std::numeric_limits<size_t>::max()); event = EventsQ_.Pop(/*block=*/ false)) {
40-
res.push_back(*event);
39+
for (auto event = EventsQ_.Pop(block); !event.Empty() && res.size() < maxEventsCount.GetOrElse(std::numeric_limits<size_t>::max()); event = EventsQ_.Pop(/*block=*/ false)) {
40+
res.push_back(std::move(*event));
4141
}
4242
return res;
4343
}
@@ -59,7 +59,6 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
5959

6060
bool Close(TDuration timeout = TDuration::Max()) override {
6161
Y_UNUSED(timeout);
62-
// TOOD send TSessionClosedEvent
6362
EventsQ_.Stop();
6463
Pool_.Stop();
6564

@@ -133,7 +132,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
133132
Sleep(FILE_POLL_PERIOD);
134133
}
135134
}
136-
135+
137136
TFile File_;
138137
TBlockingEQueue<NYdb::NTopic::TReadSessionEvent::TEvent> EventsQ_ {4_MB};
139138
NYdb::NTopic::TPartitionSession::TPtr Session_;
@@ -146,6 +145,172 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
146145
ui64 SeqNo_ = 0;
147146
};
148147

148+
class TFileTopicWriteSession : public NYdb::NTopic::IWriteSession, private NYdb::NTopic::TContinuationTokenIssuer {
149+
public:
150+
TFileTopicWriteSession(TFile file):
151+
File_(std::move(file)), FileWriter_([this] () {
152+
PushToFile();
153+
}), Counters_()
154+
{
155+
Pool_.Start(1);
156+
EventsQ_.Push(NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
157+
}
158+
159+
NThreading::TFuture<void> WaitEvent() override {
160+
return NThreading::Async([this] () {
161+
EventsQ_.BlockUntilEvent();
162+
return NThreading::MakeFuture();
163+
}, Pool_);
164+
}
165+
166+
TMaybe<NYdb::NTopic::TWriteSessionEvent::TEvent> GetEvent(bool block) override {
167+
return EventsQ_.Pop(block);
168+
}
169+
170+
TVector<NYdb::NTopic::TWriteSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount) override {
171+
TVector<NYdb::NTopic::TWriteSessionEvent::TEvent> res;
172+
for (auto event = EventsQ_.Pop(block); !event.Empty() && res.size() < maxEventsCount.GetOrElse(std::numeric_limits<size_t>::max()); event = EventsQ_.Pop(/*block=*/ false)) {
173+
res.push_back(std::move(*event));
174+
}
175+
return res;
176+
}
177+
178+
NThreading::TFuture<ui64> GetInitSeqNo() override {
179+
return NThreading::MakeFuture(SeqNo_);
180+
}
181+
182+
void Write(NYdb::NTopic::TContinuationToken&&, NYdb::NTopic::TWriteMessage&& message,
183+
NYdb::NTable::TTransaction* tx) override {
184+
Y_UNUSED(tx);
185+
186+
auto size = message.Data.size();
187+
EventsMsgQ_.Push(TOwningWriteMessage(std::move(message)), size);
188+
}
189+
190+
void Write(NYdb::NTopic::TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo,
191+
TMaybe<TInstant> createTimestamp) override {
192+
NYdb::NTopic::TWriteMessage message(data);
193+
if (seqNo.Defined()) {
194+
message.SeqNo(*seqNo);
195+
}
196+
if (createTimestamp.Defined()) {
197+
message.CreateTimestamp(*createTimestamp);
198+
}
199+
200+
Write(std::move(token), std::move(message), nullptr);
201+
}
202+
203+
// Ignores codec in message and always writes raw for debugging purposes
204+
void WriteEncoded(NYdb::NTopic::TContinuationToken&& token, NYdb::NTopic::TWriteMessage&& params,
205+
NYdb::NTable::TTransaction* tx) override {
206+
Y_UNUSED(tx);
207+
208+
NYdb::NTopic::TWriteMessage message(params.Data);
209+
210+
if (params.CreateTimestamp_.Defined()) {
211+
message.CreateTimestamp(*params.CreateTimestamp_);
212+
}
213+
if (params.SeqNo_) {
214+
message.SeqNo(*params.SeqNo_);
215+
}
216+
message.MessageMeta(params.MessageMeta_);
217+
218+
Write(std::move(token), std::move(message), nullptr);
219+
}
220+
221+
// Ignores codec in message and always writes raw for debugging purposes
222+
void WriteEncoded(NYdb::NTopic::TContinuationToken&& token, TStringBuf data, NYdb::NTopic::ECodec codec, ui32 originalSize,
223+
TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) override {
224+
Y_UNUSED(codec);
225+
Y_UNUSED(originalSize);
226+
227+
NYdb::NTopic::TWriteMessage message(data);
228+
if (seqNo.Defined()) {
229+
message.SeqNo(*seqNo);
230+
}
231+
if (createTimestamp.Defined()) {
232+
message.CreateTimestamp(*createTimestamp);
233+
}
234+
235+
Write(std::move(token), std::move(message), nullptr);
236+
}
237+
238+
bool Close(TDuration timeout = TDuration::Max()) override {
239+
Y_UNUSED(timeout);
240+
EventsQ_.Stop();
241+
EventsMsgQ_.Stop();
242+
Pool_.Stop();
243+
244+
if (FileWriter_.joinable()) {
245+
FileWriter_.join();
246+
}
247+
return true;
248+
}
249+
250+
NYdb::NTopic::TWriterCounters::TPtr GetCounters() override {
251+
return Counters_;
252+
}
253+
254+
~TFileTopicWriteSession() override {
255+
EventsQ_.Stop();
256+
EventsMsgQ_.Stop();
257+
Pool_.Stop();
258+
if (FileWriter_.joinable()) {
259+
FileWriter_.join();
260+
}
261+
}
262+
263+
private:
264+
void PushToFile() {
265+
TFileOutput fo(File_);
266+
ui64 offset = 0; // FIXME dummy
267+
ui64 partitionId = 0; // FIXME dummy
268+
while (auto maybeMsg = EventsMsgQ_.Pop(true)) {
269+
NYdb::NTopic::TWriteSessionEvent::TAcksEvent acks;
270+
do {
271+
auto& [content, msg] = *maybeMsg;
272+
NYdb::NTopic::TWriteSessionEvent::TWriteAck ack;
273+
if (msg.SeqNo_.Defined()) { // FIXME should be auto generated otherwise
274+
ack.SeqNo = *msg.SeqNo_;
275+
}
276+
ack.State = NYdb::NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN;
277+
ack.Details.ConstructInPlace(offset, partitionId);
278+
acks.Acks.emplace_back(std::move(ack));
279+
offset += content.size() + 1;
280+
fo.Write(content);
281+
fo.Write('\n');
282+
} while ((maybeMsg = EventsMsgQ_.Pop(false)));
283+
fo.Flush();
284+
EventsQ_.Push(std::move(acks), 1 + acks.Acks.size());
285+
EventsQ_.Push(NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}, 1);
286+
if (EventsQ_.IsStopped()) {
287+
break;
288+
}
289+
}
290+
}
291+
292+
TFile File_;
293+
294+
// We acquire ownership of messages immediately
295+
// TODO: remove extra message copying to and from queue
296+
struct TOwningWriteMessage {
297+
TString content;
298+
NYdb::NTopic::TWriteMessage msg;
299+
300+
TOwningWriteMessage(NYdb::NTopic::TWriteMessage&& msg): content(msg.Data), msg(std::move(msg)) {
301+
msg.Data = content;
302+
}
303+
};
304+
TBlockingEQueue<TOwningWriteMessage> EventsMsgQ_ {4_MB};
305+
306+
TBlockingEQueue<NYdb::NTopic::TWriteSessionEvent::TEvent> EventsQ_ {128_KB};
307+
std::thread FileWriter_;
308+
309+
TThreadPool Pool_;
310+
NYdb::NTopic::TWriterCounters::TPtr Counters_;
311+
ui64 SeqNo_ = 0;
312+
};
313+
149314
struct TDummyPartitionSession: public NYdb::NTopic::TPartitionSession {
150315
TDummyPartitionSession(ui64 sessionId, const TString& topicPath, ui64 partId) {
151316
PartitionSessionId = sessionId;
@@ -231,8 +396,13 @@ std::shared_ptr<NYdb::NTopic::ISimpleBlockingWriteSession> TFileTopicClient::Cre
231396
}
232397

233398
std::shared_ptr<NYdb::NTopic::IWriteSession> TFileTopicClient::CreateWriteSession(const NYdb::NTopic::TWriteSessionSettings& settings) {
234-
Y_UNUSED(settings);
235-
return nullptr;
399+
TString topicPath = settings.Path_;
400+
auto topicsIt = Topics_.find(make_pair("pq", topicPath));
401+
Y_ENSURE(topicsIt != Topics_.end());
402+
auto filePath = topicsIt->second.FilePath;
403+
Y_ENSURE(filePath);
404+
405+
return std::make_shared<TFileTopicWriteSession>(TFile(*filePath, EOpenMode::TEnum::RdWr));
236406
}
237407

238408
NYdb::TAsyncStatus TFileTopicClient::CommitOffset(const TString& path, ui64 partitionId, const TString& consumerName, ui64 offset,

ydb/library/yql/providers/pq/provider/ut/yql_pq_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver, c
4545
auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
4646
RegisterDqPqReadActorFactory(*factory, driver, nullptr, pqGateway);
4747

48-
RegisterDqPqWriteActorFactory(*factory, driver, nullptr);
48+
RegisterDqPqWriteActorFactory(*factory, driver, nullptr, pqGateway);
4949
return factory;
5050
}
5151

ydb/library/yql/tools/dq/worker_node/main.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,12 @@ NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver, I
115115
std::make_shared<TPqGatewayConfig>(),
116116
nullptr
117117
);
118-
RegisterDqPqReadActorFactory(*factory, driver, nullptr, CreatePqNativeGateway(std::move(pqServices)));
118+
auto pqGateway = CreatePqNativeGateway(std::move(pqServices));
119+
RegisterDqPqReadActorFactory(*factory, driver, nullptr, pqGateway);
119120

120121
RegisterYdbReadActorFactory(*factory, driver, nullptr);
121122
RegisterClickHouseReadActorFactory(*factory, nullptr, httpGateway);
122-
RegisterDqPqWriteActorFactory(*factory, driver, nullptr);
123+
RegisterDqPqWriteActorFactory(*factory, driver, nullptr, pqGateway);
123124

124125
auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
125126
auto retryPolicy = GetHTTPDefaultRetryPolicy();

ydb/library/yql/tools/dqrun/dqrun.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(
311311
RegisterDQSolomonReadActorFactory(*factory, nullptr);
312312
RegisterClickHouseReadActorFactory(*factory, nullptr, httpGateway);
313313
RegisterGenericProviderFactories(*factory, credentialsFactory, genericClient);
314-
RegisterDqPqWriteActorFactory(*factory, driver, nullptr);
314+
RegisterDqPqWriteActorFactory(*factory, driver, nullptr, pqGateway);
315315

316316
auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
317317
s3ActorsFactory->RegisterS3WriteActorFactory(*factory, nullptr, httpGateway, GetHTTPDefaultRetryPolicy());

ydb/library/yql/tools/mrrun/mrrun.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,10 +245,11 @@ NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver, I
245245
std::make_shared<TPqGatewayConfig>(),
246246
nullptr
247247
);
248-
RegisterDqPqReadActorFactory(*factory, driver, nullptr, CreatePqNativeGateway(std::move(pqServices)));
248+
auto pqGateway = CreatePqNativeGateway(std::move(pqServices));
249+
RegisterDqPqReadActorFactory(*factory, driver, nullptr, pqGateway);
249250
RegisterYdbReadActorFactory(*factory, driver, nullptr);
250251
RegisterClickHouseReadActorFactory(*factory, nullptr, httpGateway);
251-
RegisterDqPqWriteActorFactory(*factory, driver, nullptr);
252+
RegisterDqPqWriteActorFactory(*factory, driver, nullptr, pqGateway);
252253

253254
auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
254255
auto retryPolicy = GetHTTPDefaultRetryPolicy();

0 commit comments

Comments
 (0)