Skip to content

Commit 46f1b29

Browse files
authored
Add volatile transactions to datashard internal ui (#10859)
1 parent 827cf58 commit 46f1b29

12 files changed

+665
-427
lines changed

ydb/core/cms/ui/datashard_info.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ function onDataShardInfoLoaded(data) {
105105
$('#tablet-info-state').text(info.State + (info.IsActive ? ' (active)' : ' (inactive)'));
106106
$('#tablet-info-shared-blobs').text(info.HasSharedBlobs);
107107
$('#tablet-info-change-sender').html('<a href="app?TabletID=' + TabletId + '&page=change-sender">Viewer</a>');
108+
$('#tablet-info-volatile-txs').html(`<a href="app?TabletID=${TabletId}&page=volatile-txs">Viewer</a>`);
108109

109110
var activities = data.Activities;
110111
if (activities) {

ydb/core/cms/ui/datashard_rs.js

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,22 @@ var ReadSetsState = {
88
rs: new Map(),
99
acks: new Map(),
1010
delayedAcks: new Map(),
11+
expectations: new Map(),
12+
pipes: new Map(),
1113
};
1214

1315
function makeRSKey(info) {
1416
return `${info.TxId}-${info.Origin}-${info.Source}-${info.Destination}-${info.SeqNo}`;
1517
}
1618

19+
function makeRSExpectationKey(info) {
20+
return `${info.TxId}-${info.Source}`;
21+
}
22+
23+
function makeRSPipeKey(info) {
24+
return `${info.Destination}`;
25+
}
26+
1727
class RSInfo {
1828
constructor(info) {
1929
this.key = makeRSKey(info);
@@ -72,6 +82,114 @@ class RSAckInfo {
7282
}
7383
}
7484

85+
class RSExpectationInfo {
86+
constructor(info, body) {
87+
this.key = makeRSExpectationKey(info);
88+
89+
var trHtml = this._makeTrHtml(info);
90+
$(trHtml).appendTo($('#' + body));
91+
}
92+
93+
update(info) {
94+
$('#ds-rs-expectation-row-' + this.key).replaceWith(this._makeTrHtml(info));
95+
}
96+
97+
remove() {
98+
$('#ds-rs-expectation-row-' + this.key).remove();
99+
}
100+
101+
_makeTrHtml(info) {
102+
return `
103+
<tr id="ds-rs-expectation-row-${this.key}">
104+
<td>${info.TxId}</td>
105+
<td>${info.Step}</td>
106+
<td><a href="app?TabletID=${info.Source}">${info.Source}</a></td>
107+
</tr>
108+
`;
109+
}
110+
}
111+
112+
class RSPipeInfo {
113+
constructor(info, body) {
114+
this.key = makeRSPipeKey(info);
115+
116+
var trHtml = this._makeTrHtml(info);
117+
$(trHtml).appendTo($('#' + body));
118+
}
119+
120+
update(info) {
121+
$('#ds-rs-pipe-row-' + this.key).replaceWith(this._makeTrHtml(info));
122+
}
123+
124+
remove() {
125+
$('#ds-rs-pipe-row-' + this.key).remove();
126+
}
127+
128+
_makeTrHtml(info) {
129+
return `
130+
<tr id="ds-rs-pipe-row-${this.key}">
131+
<td><a href="app?TabletID=${info.Destination}">${info.Destination}</a></td>
132+
<td>${info.OutReadSets}</td>
133+
<td>${info.Subscribed}</td>
134+
</tr>
135+
`;
136+
}
137+
}
138+
139+
function updateReadSetExpectations(data) {
140+
var expectations = new Set();
141+
if (data.Expectations) {
142+
for (var info of data.Expectations) {
143+
var key = makeRSExpectationKey(info);
144+
expectations.add(key);
145+
if (ReadSetsState.expectations.has(key)) {
146+
ReadSetsState.expectations.get(key).update(info);
147+
} else {
148+
ReadSetsState.expectations.set(key, new RSExpectationInfo(info, 'ds-rs-expectations-body'));
149+
}
150+
}
151+
}
152+
153+
var toRemove = [];
154+
for (var key of ReadSetsState.expectations.keys()) {
155+
if (!expectations.has(key)) {
156+
toRemove.push(key);
157+
}
158+
}
159+
160+
for (var key of toRemove) {
161+
ReadSetsState.expectations.get(key).remove();
162+
ReadSetsState.expectations.delete(key);
163+
}
164+
}
165+
166+
function updateReadSetPipes(data) {
167+
var pipes = new Set();
168+
if (data.Pipes) {
169+
for (var info of data.Pipes) {
170+
var key = makeRSPipeKey(info);
171+
pipes.add(key);
172+
if (ReadSetsState.pipes.has(key)) {
173+
ReadSetsState.pipes.get(key).update(info);
174+
} else {
175+
ReadSetsState.pipes.set(key, new RSPipeInfo(info, 'ds-rs-pipes-body'));
176+
}
177+
}
178+
}
179+
180+
var toRemove = [];
181+
for (var key of ReadSetsState.pipes.keys()) {
182+
if (!pipes.has(key)) {
183+
toRemove.push(key);
184+
}
185+
}
186+
187+
for (var key of toRemove) {
188+
ReadSetsState.pipes.get(key).remove();
189+
ReadSetsState.pipes.delete(key);
190+
}
191+
}
192+
75193
function onReadSetsLoaded(data) {
76194
ReadSetsState.loading = false;
77195

@@ -154,9 +272,14 @@ function onReadSetsLoaded(data) {
154272
ReadSetsState.delayedAcks.delete(key);
155273
}
156274

275+
updateReadSetExpectations(data);
276+
updateReadSetPipes(data);
277+
157278
$('#ds-out-rs-table').trigger('update', [true]);
158279
$('#ds-out-rs-ack-table').trigger('update', [true]);
159280
$('#ds-delayed-ack-table').trigger('update', [true]);
281+
$('#ds-rs-expectations-table').trigger('update', [true]);
282+
$('#ds-rs-pipes-table').trigger('update', [true]);
160283

161284
scheduleLoadReadSets(ReadSetsState.fetchInterval);
162285
}
@@ -213,6 +336,16 @@ function initReadSetsTab() {
213336
sortList: [[0,0]],
214337
widgets : ['zebra', 'filter'],
215338
});
339+
$('#ds-rs-expectations-table').tablesorter({
340+
theme: 'blue',
341+
sortList: [[0,0]],
342+
widgets : ['zebra', 'filter'],
343+
});
344+
$('#ds-rs-pipes-table').tablesorter({
345+
theme: 'blue',
346+
sortList: [[0,0]],
347+
widgets : ['zebra', 'filter'],
348+
});
216349
scheduleLoadReadSets(0);
217350
}
218351
});

ydb/core/protos/tx_datashard.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,10 +1205,24 @@ message TEvGetRSInfoResponse {
12051205
optional uint64 SeqNo = 6;
12061206
}
12071207

1208+
message TExpectation {
1209+
optional uint64 Source = 1;
1210+
optional uint64 TxId = 2;
1211+
optional uint64 Step = 3;
1212+
}
1213+
1214+
message TPipe {
1215+
optional uint64 Destination = 1;
1216+
optional uint64 OutReadSets = 2;
1217+
optional bool Subscribed = 3;
1218+
}
1219+
12081220
optional TStatus Status = 1;
12091221
repeated TOutRSInfo OutReadSets = 2;
12101222
repeated TRSAckInfo OutRSAcks = 3;
12111223
repeated TRSAckInfo DelayedRSAcks = 4;
1224+
repeated TExpectation Expectations = 5;
1225+
repeated TPipe Pipes = 6;
12121226
}
12131227

12141228
message TEvGetDataHistogramRequest {

ydb/core/tx/datashard/datashard.cpp

Lines changed: 6 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -2223,12 +2223,12 @@ bool TDataShard::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TAc
22232223

22242224
if (const auto& action = cgi.Get("action")) {
22252225
if (action == "cleanup-borrowed-parts") {
2226-
Execute(CreateTxMonitoringCleanupBorrowedParts(this, ev), ctx);
2226+
HandleMonCleanupBorrowedParts(ev);
22272227
return true;
22282228
}
22292229

22302230
if (action == "reset-schema-version") {
2231-
Execute(CreateTxMonitoringResetSchemaVersion(this, ev), ctx);
2231+
HandleMonResetSchemaVersion(ev);
22322232
return true;
22332233
}
22342234

@@ -2254,13 +2254,16 @@ bool TDataShard::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TAc
22542254
ctx.Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes("Change sender is not running"));
22552255
return true;
22562256
}
2257+
} else if (page == "volatile-txs") {
2258+
HandleMonVolatileTxs(ev);
2259+
return true;
22572260
} else {
22582261
ctx.Send(ev->Sender, new NMon::TEvRemoteBinaryInfoRes(NMonitoring::HTTPNOTFOUND));
22592262
return true;
22602263
}
22612264
}
22622265

2263-
Execute(CreateTxMonitoring(this, ev), ctx);
2266+
HandleMonIndexPage(ev);
22642267
return true;
22652268
}
22662269

@@ -4181,60 +4184,6 @@ void TDataShard::Handle(TEvSchemeShard::TEvDescribeSchemeResult::TPtr ev, const
41814184
Execute(new TTxStoreTablePath(this, pathId, rec.GetPath()), ctx);
41824185
}
41834186

4184-
void TDataShard::Handle(TEvDataShard::TEvGetInfoRequest::TPtr &ev,
4185-
const TActorContext &ctx)
4186-
{
4187-
Execute(CreateTxGetInfo(this, ev), ctx);
4188-
}
4189-
4190-
void TDataShard::Handle(TEvDataShard::TEvListOperationsRequest::TPtr &ev,
4191-
const TActorContext &ctx)
4192-
{
4193-
Execute(CreateTxListOperations(this, ev), ctx);
4194-
}
4195-
4196-
void TDataShard::Handle(TEvDataShard::TEvGetOperationRequest::TPtr &ev,
4197-
const TActorContext &ctx)
4198-
{
4199-
Execute(CreateTxGetOperation(this, ev), ctx);
4200-
}
4201-
4202-
void TDataShard::Handle(TEvDataShard::TEvGetDataHistogramRequest::TPtr &ev,
4203-
const TActorContext &ctx)
4204-
{
4205-
auto *response = new TEvDataShard::TEvGetDataHistogramResponse;
4206-
response->Record.MutableStatus()->SetCode(Ydb::StatusIds::SUCCESS);
4207-
const auto& rec = ev->Get()->Record;
4208-
4209-
if (rec.GetCollectKeySampleMs() > 0) {
4210-
EnableKeyAccessSampling(ctx,
4211-
AppData(ctx)->TimeProvider->Now() + TDuration::MilliSeconds(rec.GetCollectKeySampleMs()));
4212-
}
4213-
4214-
if (rec.GetActualData()) {
4215-
if (CurrentKeySampler == DisabledKeySampler) {
4216-
// datashard stores expired stats
4217-
ctx.Send(ev->Sender, response);
4218-
return;
4219-
}
4220-
}
4221-
4222-
for (const auto &pr : TableInfos) {
4223-
const auto &tinfo = *pr.second;
4224-
const NTable::TStats &stats = tinfo.Stats.DataStats;
4225-
4226-
auto &hist = *response->Record.AddTableHistograms();
4227-
hist.SetTableName(pr.second->Name);
4228-
for (ui32 ki : tinfo.KeyColumnIds)
4229-
hist.AddKeyNames(tinfo.Columns.FindPtr(ki)->Name);
4230-
SerializeHistogram(tinfo, stats.DataSizeHistogram, *hist.MutableSizeHistogram());
4231-
SerializeHistogram(tinfo, stats.RowCountHistogram, *hist.MutableCountHistogram());
4232-
SerializeKeySample(tinfo, tinfo.Stats.AccessStats, *hist.MutableKeyAccessSample());
4233-
}
4234-
4235-
ctx.Send(ev->Sender, response);
4236-
}
4237-
42384187
void TDataShard::Handle(TEvDataShard::TEvGetReadTableSinkStateRequest::TPtr &ev,
42394188
const TActorContext &ctx)
42404189
{
@@ -4329,60 +4278,6 @@ void TDataShard::Handle(TEvDataShard::TEvGetReadTableStreamStateRequest::TPtr &e
43294278
ctx.Send(ev->Forward(tx->GetStreamSink()));
43304279
}
43314280

4332-
void TDataShard::Handle(TEvDataShard::TEvGetRSInfoRequest::TPtr &ev,
4333-
const TActorContext &ctx)
4334-
{
4335-
auto *response = new TEvDataShard::TEvGetRSInfoResponse;
4336-
response->Record.MutableStatus()->SetCode(Ydb::StatusIds::SUCCESS);
4337-
4338-
for (auto &pr : OutReadSets.CurrentReadSets) {
4339-
auto &rs = *response->Record.AddOutReadSets();
4340-
rs.SetTxId(pr.second.TxId);
4341-
rs.SetOrigin(pr.second.Origin);
4342-
rs.SetSource(pr.second.From);
4343-
rs.SetDestination(pr.second.To);
4344-
rs.SetSeqNo(pr.first);
4345-
}
4346-
4347-
for (auto &p : OutReadSets.ReadSetAcks) {
4348-
auto &rec = p->Record;
4349-
auto &ack = *response->Record.AddOutRSAcks();
4350-
ack.SetTxId(rec.GetTxId());
4351-
ack.SetStep(rec.GetStep());
4352-
ack.SetOrigin(rec.GetTabletConsumer());
4353-
ack.SetSource(rec.GetTabletSource());
4354-
ack.SetDestination(rec.GetTabletDest());
4355-
ack.SetSeqNo(rec.GetSeqno());
4356-
}
4357-
4358-
for (auto &pr : Pipeline.GetDelayedAcks()) {
4359-
for (auto &ack : pr.second) {
4360-
auto *ev = ack->CastAsLocal<TEvTxProcessing::TEvReadSetAck>();
4361-
if (ev) {
4362-
auto &rec = ev->Record;
4363-
auto &ack = *response->Record.AddDelayedRSAcks();
4364-
ack.SetTxId(rec.GetTxId());
4365-
ack.SetStep(rec.GetStep());
4366-
ack.SetOrigin(rec.GetTabletConsumer());
4367-
ack.SetSource(rec.GetTabletSource());
4368-
ack.SetDestination(rec.GetTabletDest());
4369-
ack.SetSeqNo(rec.GetSeqno());
4370-
}
4371-
}
4372-
}
4373-
4374-
ctx.Send(ev->Sender, response);
4375-
}
4376-
4377-
void TDataShard::Handle(TEvDataShard::TEvGetSlowOpProfilesRequest::TPtr &ev,
4378-
const TActorContext &ctx)
4379-
{
4380-
auto *response = new TEvDataShard::TEvGetSlowOpProfilesResponse;
4381-
response->Record.MutableStatus()->SetCode(Ydb::StatusIds::SUCCESS);
4382-
Pipeline.FillStoredExecutionProfiles(response->Record);
4383-
ctx.Send(ev->Sender, response);
4384-
}
4385-
43864281
void TDataShard::Handle(TEvDataShard::TEvRefreshVolatileSnapshotRequest::TPtr& ev, const TActorContext& ctx) {
43874282
Execute(new TTxRefreshVolatileSnapshot(this, std::move(ev)), ctx);
43884283
}

ydb/core/tx/datashard/datashard__cleanup_borrowed.cpp

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -255,11 +255,8 @@ class TDataShard::TTxMonitoringCleanupBorrowedParts
255255
bool DryRun = true;
256256
};
257257

258-
ITransaction* TDataShard::CreateTxMonitoringCleanupBorrowedParts(
259-
TDataShard* self,
260-
NMon::TEvRemoteHttpInfo::TPtr ev)
261-
{
262-
return new TTxMonitoringCleanupBorrowedParts(self, ev);
258+
void TDataShard::HandleMonCleanupBorrowedParts(NMon::TEvRemoteHttpInfo::TPtr& ev) {
259+
Execute(new TTxMonitoringCleanupBorrowedParts(this, ev));
263260
}
264261

265262
}

ydb/core/tx/datashard/datashard__mon_reset_schema_version.cpp

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,8 @@ class TDataShard::TTxMonitoringResetSchemaVersion
5454
size_t Updates = 0;
5555
};
5656

57-
ITransaction* TDataShard::CreateTxMonitoringResetSchemaVersion(
58-
TDataShard* self,
59-
NMon::TEvRemoteHttpInfo::TPtr ev)
60-
{
61-
return new TTxMonitoringResetSchemaVersion(self, ev);
57+
void TDataShard::HandleMonResetSchemaVersion(NMon::TEvRemoteHttpInfo::TPtr& ev) {
58+
Execute(new TTxMonitoringResetSchemaVersion(this, ev));
6259
}
6360

6461
}

0 commit comments

Comments
 (0)