Skip to content

Commit 2dede2b

Browse files
authored
YQ-4049 / YQ-4071 Sink fq to stable (#14018)
1 parent 65fb266 commit 2dede2b

File tree

7 files changed

+65
-8
lines changed

7 files changed

+65
-8
lines changed

ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -387,22 +387,29 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvCreateCheckpo
387387
const auto& issues = ev->Get()->Issues;
388388
CC_LOG_D("[" << checkpointId << "] Got TEvCreateCheckpointResponse");
389389

390-
if (issues) {
391-
CC_LOG_E("[" << checkpointId << "] StorageError: can't create checkpoint: " << issues.ToOneLineString());
390+
auto cancelCheckpoint = [&](const TString& str) {
391+
CC_LOG_E("[" << checkpointId << "] " << str);
392392
PendingCheckpoints.erase(checkpointId);
393393
FailedZeroCheckpoint = InitingZeroCheckpoint;
394394
UpdateInProgressMetric();
395395
++*Metrics.FailedToCreate;
396396
++*Metrics.StorageError;
397397
CheckpointingSnapshotRotationIndex = CheckpointingSnapshotRotationPeriod; // Next checkpoint is snapshot.
398+
};
399+
400+
if (issues) {
401+
cancelCheckpoint("StorageError: can't create checkpoint: " + issues.ToOneLineString());
398402
return;
399403
}
400404

401405
if (GraphDescId) {
402406
Y_ABORT_UNLESS(GraphDescId == ev->Get()->GraphDescId);
403407
} else {
404408
GraphDescId = ev->Get()->GraphDescId;
405-
Y_ABORT_UNLESS(GraphDescId);
409+
if (!GraphDescId) {
410+
cancelCheckpoint("StorageError (internal error), empty GraphDescId");
411+
return;
412+
}
406413
}
407414

408415
if (PendingInit) {

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,12 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
457457
return statistics;
458458
}
459459

460+
void ForceRefresh() override {
461+
if (Parser) {
462+
Parser->Refresh(true);
463+
}
464+
}
465+
460466
protected:
461467
NActors::TActorId GetSelfId() const override {
462468
return SelfId();

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class ITopicFormatHandler : public TNonCopyable {
5555
virtual bool HasClients() const = 0;
5656

5757
virtual TFormatHandlerStatistic GetStatistics() = 0;
58+
virtual void ForceRefresh() = 0;
5859

5960
protected:
6061
virtual NActors::TActorId GetSelfId() const = 0;

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,8 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
286286
void SubscribeOnNextEvent();
287287
void SendToParsing(const TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>& messages);
288288
void SendData(TClientsInfo& info);
289-
void FatalError(TStatus status);
289+
void FatalError(const TStatus& status);
290+
void ThrowFatalError(const TStatus& status);
290291
void SendDataArrived(TClientsInfo& client);
291292
void StopReadSession();
292293
TString GetSessionId() const;
@@ -308,6 +309,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
308309
TMaybe<ui64> GetOffset(const NFq::NRowDispatcherProto::TEvStartSession& settings);
309310
void SendSessionError(TActorId readActorId, TStatus status);
310311
void RestartSessionIfOldestClient(const TClientsInfo& info);
312+
void RefreshParsers();
311313

312314
private:
313315

@@ -330,7 +332,8 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
330332
IgnoreFunc(TEvRowDispatcher::TEvGetNextBatch);
331333
IgnoreFunc(NFq::TEvRowDispatcher::TEvStartSession);
332334
IgnoreFunc(NFq::TEvRowDispatcher::TEvStopSession);
333-
IgnoreFunc(NFq::TEvPrivate::TEvSendStatistic);,
335+
IgnoreFunc(NFq::TEvPrivate::TEvSendStatistic);
336+
IgnoreFunc(NFq::TEvPrivate::TEvReconnectSession);,
334337
ExceptionFunc(std::exception, HandleException)
335338
)
336339
};
@@ -497,6 +500,7 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&) {
497500
LOG_ROW_DISPATCHER_DEBUG("Reconnect topic session, " << TopicPathPartition
498501
<< ", StartingMessageTimestamp " << minTime
499502
<< ", BufferSize " << BufferSize << ", WithoutConsumer " << Config.GetWithoutConsumer());
503+
RefreshParsers();
500504
StopReadSession();
501505
CreateTopicSession();
502506
Schedule(ReconnectPeriod, new NFq::TEvPrivate::TEvReconnectSession());
@@ -565,7 +569,7 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TSessionClose
565569
const TString message = TStringBuilder() << "Read session to topic \"" << Self.TopicPathPartition << "\" was closed";
566570
LOG_ROW_DISPATCHER_DEBUG(message << ": " << ev.DebugString());
567571

568-
Self.FatalError(TStatus::Fail(
572+
Self.ThrowFatalError(TStatus::Fail(
569573
NYql::NDq::YdbStatusToDqStatus(static_cast<Ydb::StatusIds::StatusCode>(ev.GetStatus())),
570574
ev.GetIssues()
571575
).AddParentIssue(message));
@@ -773,14 +777,15 @@ void TTopicSession::RestartSessionIfOldestClient(const TClientsInfo& info) {
773777
Metrics.RestartSessionByOffsets->Inc();
774778
++RestartSessionByOffsets;
775779
info.RestartSessionByOffsetsByQuery->Inc();
780+
RefreshParsers();
776781
StopReadSession();
777782

778783
if (!ReadSession) {
779784
Schedule(TDuration::Seconds(Config.GetTimeoutBeforeStartSessionSec()), new NFq::TEvPrivate::TEvCreateSession());
780785
}
781786
}
782787

783-
void TTopicSession::FatalError(TStatus status) {
788+
void TTopicSession::FatalError(const TStatus& status) {
784789
LOG_ROW_DISPATCHER_ERROR("FatalError: " << status.GetErrorMessage());
785790

786791
for (auto& [readActorId, info] : Clients) {
@@ -789,7 +794,11 @@ void TTopicSession::FatalError(TStatus status) {
789794
}
790795
StopReadSession();
791796
Become(&TTopicSession::ErrorState);
792-
ythrow yexception() << "FatalError: " << status.GetErrorMessage(); // To exit from current stack and call once PassAway() in HandleException().
797+
}
798+
799+
void TTopicSession::ThrowFatalError(const TStatus& status) {
800+
FatalError(status);
801+
ythrow yexception() << "FatalError: " << status.GetErrorMessage();
793802
}
794803

795804
void TTopicSession::SendSessionError(TActorId readActorId, TStatus status) {
@@ -902,6 +911,12 @@ TMaybe<ui64> TTopicSession::GetOffset(const NFq::NRowDispatcherProto::TEvStartSe
902911
return Nothing();
903912
}
904913

914+
void TTopicSession::RefreshParsers() {
915+
for (const auto& [_, formatHandler] : FormatHandlers) {
916+
formatHandler->ForceRefresh();
917+
}
918+
}
919+
905920
} // anonymous namespace
906921

907922
////////////////////////////////////////////////////////////////////////////////

ydb/core/fq/libs/ydb/ut/ydb_ut.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,18 @@ Y_UNIT_TEST_SUITE(TFqYdbTest) {
466466
UNIT_ASSERT(issues.Size() == 1);
467467
UNIT_ASSERT(issues.ToString().Contains(text));
468468
}
469+
470+
Y_UNIT_TEST(ShouldStatusToIssuesProcessEmptyIssues)
471+
{
472+
auto promise = NThreading::NewPromise<NYdb::TStatus>();
473+
auto future = promise.GetFuture();
474+
promise.SetValue(TStatus(EStatus::BAD_REQUEST, NYql::TIssues{}));
475+
NThreading::TFuture<NYql::TIssues> future2 = NFq::StatusToIssues(future);
476+
477+
NYql::TIssues issues = future2.GetValueSync();
478+
UNIT_ASSERT_C(issues.Size() == 1, issues.ToString());
479+
UNIT_ASSERT(issues.ToString().Contains("empty issues"));
480+
}
469481
}
470482

471483
} // namespace NFq

ydb/core/fq/libs/ydb/ydb.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,11 @@ NYql::TIssues StatusToIssues(const NYdb::TStatus& status) {
203203
TIssues issues;
204204
if (!status.IsSuccess()) {
205205
issues = status.GetIssues();
206+
if (!issues) {
207+
TStringStream str;
208+
str << "Internal error: empty issues with failed status (" << status.GetStatus() << ")";
209+
issues.AddIssue(str.Str());
210+
}
206211
}
207212
return issues;
208213
}

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,18 +103,21 @@ struct TEvPrivate {
103103
EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
104104
EvPrintState = EvBegin + 20,
105105
EvProcessState = EvBegin + 21,
106+
EvNotifyCA = EvBegin + 22,
106107
EvEnd
107108
};
108109
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
109110
struct TEvPrintState : public NActors::TEventLocal<TEvPrintState, EvPrintState> {};
110111
struct TEvProcessState : public NActors::TEventLocal<TEvProcessState, EvProcessState> {};
112+
struct TEvNotifyCA : public NActors::TEventLocal<TEvNotifyCA, EvNotifyCA> {};
111113
};
112114

113115
class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::NDq::NInternal::TDqPqReadActorBase {
114116

115117
const ui64 PrintStatePeriodSec = 300;
116118
const ui64 ProcessStatePeriodSec = 1;
117119
const ui64 PrintStateToLogSplitSize = 64000;
120+
const ui64 NotifyCAPeriodSec = 10;
118121

119122
struct TReadyBatch {
120123
public:
@@ -263,6 +266,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
263266
void Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr&);
264267
void Handle(TEvPrivate::TEvPrintState::TPtr&);
265268
void Handle(TEvPrivate::TEvProcessState::TPtr&);
269+
void Handle(TEvPrivate::TEvNotifyCA::TPtr&);
266270

267271
STRICT_STFUNC(StateFunc, {
268272
hFunc(NFq::TEvRowDispatcher::TEvCoordinatorChanged, Handle);
@@ -283,6 +287,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
283287
hFunc(NFq::TEvRowDispatcher::TEvHeartbeat, Handle);
284288
hFunc(TEvPrivate::TEvPrintState, Handle);
285289
hFunc(TEvPrivate::TEvProcessState, Handle);
290+
hFunc(TEvPrivate::TEvNotifyCA, Handle);
286291
})
287292

288293
static constexpr char ActorName[] = "DQ_PQ_READ_ACTOR";
@@ -379,6 +384,7 @@ void TDqPqRdReadActor::Init() {
379384
Send(LocalRowDispatcherActorId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe());
380385

381386
Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState());
387+
Schedule(TDuration::Seconds(NotifyCAPeriodSec), new TEvPrivate::TEvNotifyCA());
382388
Inited = true;
383389
}
384390

@@ -1004,6 +1010,11 @@ void TDqPqRdReadActor::UpdateSessions() {
10041010
LastUsedPartitionDistribution = LastReceivedPartitionDistribution;
10051011
}
10061012

1013+
void TDqPqRdReadActor::Handle(TEvPrivate::TEvNotifyCA::TPtr&) {
1014+
Schedule(TDuration::Seconds(NotifyCAPeriodSec), new TEvPrivate::TEvNotifyCA());
1015+
NotifyCA();
1016+
}
1017+
10071018
std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor(
10081019
const TTypeEnvironment& typeEnv,
10091020
NPq::NProto::TDqPqTopicSource&& settings,

0 commit comments

Comments
 (0)