Skip to content

Commit b16068a

Browse files
committed
Merge branch 'main' of https://github.com/ydb-platform/ydb into mergelibs-yql
2 parents efc7356 + 02d2031 commit b16068a

File tree

54 files changed

+620
-206
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+620
-206
lines changed

ydb/core/base/ticket_parser.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,14 +154,24 @@ namespace NKikimr {
154154

155155
struct TError {
156156
TString Message;
157+
TString LogMessage;
157158
bool Retryable = true;
158159

159160
bool empty() const {
160-
return Message.empty();
161+
return Message.empty() && LogMessage.empty();
162+
}
163+
164+
bool HasMessage() const {
165+
return !Message.empty();
166+
}
167+
168+
bool HasLogMessage() const {
169+
return !LogMessage.empty();
161170
}
162171

163172
void clear() {
164173
Message.clear();
174+
LogMessage.clear();
165175
Retryable = true;
166176
}
167177

ydb/core/blobstorage/backpressure/defs.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,5 @@
1717
#include <ydb/library/actors/core/actor_bootstrapped.h>
1818
#include <ydb/library/actors/core/mailbox.h>
1919
#include <ydb/library/actors/core/mon.h>
20-
#include <library/cpp/containers/intrusive_rb_tree/rb_tree.h>
2120
#include <ydb/library/actors/wilson/wilson_span.h>
2221
#include <google/protobuf/message.h>

ydb/core/blobstorage/backpressure/queue.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,6 @@ TBlobStorageQueue::TItemList::iterator TBlobStorageQueue::EraseItem(TItemList& q
339339
TItemList::iterator nextIter = std::next(it);
340340
if (Queues.Unused.size() < MaxUnusedItems) {
341341
Queues.Unused.splice(Queues.Unused.end(), queue, it);
342-
it->TSenderNode::UnLink();
343342
it->Event.Discard();
344343
} else {
345344
queue.erase(it);

ydb/core/blobstorage/backpressure/queue.h

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,7 @@ class TBlobStorageQueue {
3131
}
3232
};
3333

34-
template<typename TDerived>
35-
struct TSenderNode : public TRbTreeItem<TSenderNode<TDerived>, TCompare<TActorId>> {
36-
const TActorId& GetKey() const {
37-
return static_cast<const TDerived&>(*this).Event.GetSender();
38-
}
39-
};
40-
41-
struct TItem
42-
: public TSenderNode<TItem>
43-
{
34+
struct TItem {
4435
EItemQueue Queue;
4536
TCostModel::TMessageCostEssence CostEssence;
4637
NWilson::TSpan Span;
@@ -103,10 +94,7 @@ class TBlobStorageQueue {
10394
{}
10495
};
10596

106-
using TSenderMap = TRbTree<TSenderNode<TItem>, TCompare<TActorId>>;
107-
10897
TQueues Queues;
109-
TSenderMap SenderToItems;
11098
THashMap<std::pair<ui64, ui64>, TItemList::iterator> InFlightLookup;
11199

112100
ui64 WindowSize;
@@ -233,7 +221,6 @@ class TBlobStorageQueue {
233221

234222
newIt->Iterator = newIt;
235223
SetItemQueue(*newIt, EItemQueue::Waiting);
236-
SenderToItems.Insert(&*newIt);
237224

238225
// count item
239226
++*QueueItemsPut;

ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2591,7 +2591,7 @@ namespace NKikimr {
25912591
};
25922592
auto balancingCtx = std::make_shared<TBalancingCtx>(
25932593
balancingCfg, VCtx, PDiskCtx, HugeBlobCtx, SelfId(), Hull->GetSnapshot(), Config, GInfo, MinREALHugeBlobInBytes);
2594-
BalancingId = ctx.Register(CreateBalancingActor(balancingCtx));
2594+
BalancingId = RunInBatchPool(ctx, CreateBalancingActor(balancingCtx));
25952595
ActiveActors.Insert(BalancingId, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE);
25962596
}
25972597

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -446,8 +446,8 @@ static TInterconnectSettings GetInterconnectSettings(const NKikimrConfig::TInter
446446
try {
447447
return TFileInput(*path).ReadAll();
448448
} catch (const std::exception& ex) {
449-
Cerr << "failed to read " << name << " file '" << *path << "': " << ex.what() << Endl;
450-
exit(1);
449+
ythrow yexception()
450+
<< "failed to read " << name << " file '" << *path << "': " << ex.what();
451451
}
452452
}
453453
return TString();
@@ -743,9 +743,9 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
743743
auto listener = new TInterconnectListenerTCP(
744744
address, node.second.second, icCommon);
745745
if (int err = listener->Bind()) {
746-
Cerr << "Failed to set up IC listener on port " << node.second.second
747-
<< " errno# " << err << " (" << strerror(err) << ")" << Endl;
748-
exit(1);
746+
ythrow yexception()
747+
<< "Failed to set up IC listener on port " << node.second.second
748+
<< " errno# " << err << " (" << strerror(err) << ")";
749749
}
750750
setup->LocalServices.emplace_back(MakeInterconnectListenerActorId(false), TActorSetupCmd(listener,
751751
TMailboxType::ReadAsFilled, interconnectPoolId));
@@ -763,9 +763,9 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
763763
}
764764
auto listener = new TInterconnectListenerTCP(address, info.GetPort(), icCommon);
765765
if (int err = listener->Bind()) {
766-
Cerr << "Failed to set up IC listener on port " << info.GetPort()
767-
<< " errno# " << err << " (" << strerror(err) << ")" << Endl;
768-
exit(1);
766+
ythrow yexception()
767+
<< "Failed to set up IC listener on port " << info.GetPort()
768+
<< " errno# " << err << " (" << strerror(err) << ")";
769769
}
770770
setup->LocalServices.emplace_back(MakeInterconnectListenerActorId(true), TActorSetupCmd(listener,
771771
TMailboxType::ReadAsFilled, interconnectPoolId));
@@ -779,9 +779,9 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
779779
icCommon->TechnicalSelfHostName = nodesManagerConfig.GetHost();
780780
auto listener = new TInterconnectListenerTCP({}, nodesManagerConfig.GetPort(), icCommon);
781781
if (int err = listener->Bind()) {
782-
Cerr << "Failed to set up IC listener on port " << nodesManagerConfig.GetPort()
783-
<< " errno# " << err << " (" << strerror(err) << ")" << Endl;
784-
exit(1);
782+
ythrow yexception()
783+
<< "Failed to set up IC listener on port " << nodesManagerConfig.GetPort()
784+
<< " errno# " << err << " (" << strerror(err) << ")";
785785
}
786786
setup->LocalServices.emplace_back(MakeInterconnectListenerActorId(true), TActorSetupCmd(listener,
787787
TMailboxType::ReadAsFilled, interconnectPoolId));

ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,10 @@ void TCheckpointCoordinator::Handle(NYql::NDqs::TEvReadyState::TPtr& ev) {
7373

7474
int tasksSize = GetTasksSize();
7575
const auto& actorIds = ev->Get()->Record.GetActorId();
76-
Y_ABORT_UNLESS(tasksSize == actorIds.size(), "tasksSize %d, actorIds size %d, graph id %s", tasksSize, int(actorIds.size()), CoordinatorId.GraphId.c_str());
76+
if (tasksSize != actorIds.size()) {
77+
OnInternalError(TStringBuilder() << "tasksSize != actorIds.size(), tasksSize " << tasksSize << ", actorIds size " << actorIds.size() << ", graph id " << CoordinatorId.GraphId);
78+
return;
79+
}
7780

7881
for (int i = 0; i < tasksSize; ++i) {
7982
const auto& task = GetTask(i);

ydb/core/fq/libs/config/protos/common.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ message TCommonConfig {
3131
bool DisableSslForGenericDataSources = 15;
3232
bool ShowQueryTimeline = 16;
3333
uint64 MaxQueryTimelineSize = 17; // default: 200KB
34+
string PqReconnectPeriod = 18; // default: disabled
3435
}

ydb/core/fq/libs/init/init.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ void Init(
224224
appData->FunctionRegistry
225225
);
226226
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, NYql::CreatePqNativeGateway(std::move(pqServices)),
227-
yqCounters->GetSubgroup("subsystem", "DqSourceTracker"));
227+
yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), protoConfig.GetCommon().GetPqReconnectPeriod());
228228

229229
s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
230230
yqCounters->GetSubgroup("subsystem", "S3ReadActor"), protoConfig.GetGateways().GetS3().GetAllowLocalFiles());

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
#include <ydb/library/yql/providers/dq/counters/counters.h>
1010
#include <ydb/library/yql/public/purecalc/common/interface.h>
1111

12+
#include <ydb/core/base/appdata_fwd.h>
1213
#include <ydb/core/fq/libs/actors/logging/log.h>
1314
#include <ydb/core/fq/libs/events/events.h>
15+
#include <ydb/core/mon/mon.h>
1416

1517
#include <ydb/core/fq/libs/row_dispatcher/actors_factory.h>
1618
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
@@ -223,11 +225,12 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
223225
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
224226
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
225227
void Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&);
228+
void Handle(const NMon::TEvHttpInfo::TPtr&);
226229

227230
void DeleteConsumer(const ConsumerSessionKey& key);
228231
void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession);
229232
void UpdateMetrics();
230-
void PrintInternalState();
233+
TString GetInternalState();
231234

232235
STRICT_STFUNC(
233236
StateFunc, {
@@ -252,6 +255,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
252255
hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, Handle);
253256
hFunc(NFq::TEvPrivate::TEvUpdateMetrics, Handle);
254257
hFunc(NFq::TEvPrivate::TEvPrintStateToLog, Handle);
258+
hFunc(NMon::TEvHttpInfo, Handle);
255259
})
256260
};
257261

@@ -287,6 +291,13 @@ void TRowDispatcher::Bootstrap() {
287291
Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing());
288292
Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics());
289293
Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog());
294+
295+
NActors::TMon* mon = NKikimr::AppData()->Mon;
296+
if (mon) {
297+
::NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors");
298+
mon->RegisterActorPage(actorsMonPage, "row_dispatcher", "Row Dispatcher", false,
299+
TlsActivationContext->ExecutorThread.ActorSystem, SelfId());
300+
}
290301
}
291302

292303
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
@@ -372,7 +383,7 @@ void TRowDispatcher::UpdateMetrics() {
372383
}
373384
}
374385

375-
void TRowDispatcher::PrintInternalState() {
386+
TString TRowDispatcher::GetInternalState() {
376387
TStringStream str;
377388
str << "Statistics:\n";
378389
for (auto& [key, sessionsInfo] : TopicSessions) {
@@ -390,7 +401,7 @@ void TRowDispatcher::PrintInternalState() {
390401
}
391402
}
392403
}
393-
LOG_ROW_DISPATCHER_DEBUG(str.Str());
404+
return str.Str();
394405
}
395406

396407
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
@@ -632,10 +643,22 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&) {
632643
}
633644

634645
void TRowDispatcher::Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&) {
635-
PrintInternalState();
646+
LOG_ROW_DISPATCHER_DEBUG(GetInternalState());
636647
Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog());
637648
}
638649

650+
void TRowDispatcher::Handle(const NMon::TEvHttpInfo::TPtr& ev) {
651+
TStringStream str;
652+
HTML(str) {
653+
PRE() {
654+
str << "Current state:" << Endl;
655+
str << GetInternalState() << Endl;
656+
str << Endl;
657+
}
658+
}
659+
Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str()));
660+
}
661+
639662
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev) {
640663
LOG_ROW_DISPATCHER_TRACE("TEvSessionStatistic from " << ev->Sender);
641664
const auto& key = ev->Get()->Stat.SessionKey;

0 commit comments

Comments
 (0)