Skip to content

Commit b9e1b38

Browse files
authored
Add reshuffle kmeans actor scan (#9725)
1 parent 48dc88f commit b9e1b38

File tree

13 files changed

+1893
-560
lines changed

13 files changed

+1893
-560
lines changed

ydb/core/protos/tx_datashard.proto

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1524,7 +1524,7 @@ message TEvLocalKMeansRequest {
15241524

15251525
// id of parent cluster
15261526
optional uint32 Parent = 15;
1527-
// [Child ... Child + K] ids reserved for our clusters
1527+
// [Child ... Child + K) ids reserved for this kmeans clusters
15281528
optional uint32 Child = 16;
15291529

15301530
optional string LevelName = 17;
@@ -1534,7 +1534,7 @@ message TEvLocalKMeansRequest {
15341534
repeated string DataColumns = 20;
15351535
}
15361536

1537-
message TEvLocalKMeansProgressResponse {
1537+
message TEvLocalKMeansResponse {
15381538
optional uint64 Id = 1;
15391539

15401540
optional uint64 TabletId = 2;
@@ -1554,6 +1554,53 @@ message TEvLocalKMeansProgressResponse {
15541554
// optional uint32 DoneRounds = 11;
15551555
}
15561556

1557+
message TEvReshuffleKMeansRequest {
1558+
optional uint64 Id = 1;
1559+
1560+
optional uint64 TabletId = 2;
1561+
optional NKikimrProto.TPathID PathId = 3;
1562+
1563+
optional uint64 SnapshotTxId = 4;
1564+
optional uint64 SnapshotStep = 5;
1565+
1566+
optional uint64 SeqNoGeneration = 6;
1567+
optional uint64 SeqNoRound = 7;
1568+
1569+
optional Ydb.Table.VectorIndexSettings Settings = 8;
1570+
1571+
optional TEvLocalKMeansRequest.EState Upload = 9;
1572+
1573+
// id of parent cluster
1574+
optional uint32 Parent = 10;
1575+
// [Child ... Child + ClustersSize) ids of this kmeans clusters
1576+
optional uint32 Child = 11;
1577+
// centroids of clusters
1578+
repeated string Clusters = 12;
1579+
1580+
optional string PostingName = 13;
1581+
1582+
optional string EmbeddingColumn = 14;
1583+
repeated string DataColumns = 15;
1584+
}
1585+
1586+
message TEvReshuffleKMeansResponse {
1587+
optional uint64 Id = 1;
1588+
1589+
optional uint64 TabletId = 2;
1590+
optional NKikimrProto.TPathID PathId = 3;
1591+
1592+
optional uint64 RequestSeqNoGeneration = 4;
1593+
optional uint64 RequestSeqNoRound = 5;
1594+
1595+
optional NKikimrIndexBuilder.EBuildStatus Status = 6;
1596+
repeated Ydb.Issue.IssueMessage Issues = 7;
1597+
1598+
// TODO(mbkkt) implement slow-path (reliable-path)
1599+
// optional uint64 RowsDelta = 8;
1600+
// optional uint64 BytesDelta = 9;
1601+
// optional last written primary key
1602+
}
1603+
15571604
message TEvCdcStreamScanRequest {
15581605
message TLimits {
15591606
optional uint32 BatchMaxBytes = 1 [default = 512000];

ydb/core/tx/datashard/buffer_data.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#pragma once
2+
13
#include "ydb/core/scheme/scheme_tablecell.h"
24
#include "ydb/core/tx/datashard/upload_stats.h"
35
#include "ydb/core/tx/tx_proxy/upload_rows.h"

ydb/core/tx/datashard/datashard.h

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,10 @@ struct TEvDataShard {
333333
EvSampleKResponse,
334334

335335
EvLocalKMeansRequest,
336-
EvLocalKMeansProgressResponse,
336+
EvLocalKMeansResponse,
337+
338+
EvReshuffleKMeansRequest,
339+
EvReshuffleKMeansResponse,
337340

338341
EvEnd
339342
};
@@ -1457,16 +1460,28 @@ struct TEvDataShard {
14571460
TEvDataShard::EvSampleKResponse> {
14581461
};
14591462

1463+
struct TEvReshuffleKMeansRequest
1464+
: public TEventPB<TEvReshuffleKMeansRequest,
1465+
NKikimrTxDataShard::TEvReshuffleKMeansRequest,
1466+
TEvDataShard::EvReshuffleKMeansRequest> {
1467+
};
1468+
1469+
struct TEvReshuffleKMeansResponse
1470+
: public TEventPB<TEvReshuffleKMeansResponse,
1471+
NKikimrTxDataShard::TEvReshuffleKMeansResponse,
1472+
TEvDataShard::EvReshuffleKMeansResponse> {
1473+
};
1474+
14601475
struct TEvLocalKMeansRequest
14611476
: public TEventPB<TEvLocalKMeansRequest,
14621477
NKikimrTxDataShard::TEvLocalKMeansRequest,
14631478
TEvDataShard::EvLocalKMeansRequest> {
14641479
};
14651480

1466-
struct TEvLocalKMeansProgressResponse
1467-
: public TEventPB<TEvLocalKMeansProgressResponse,
1468-
NKikimrTxDataShard::TEvLocalKMeansProgressResponse,
1469-
TEvDataShard::EvLocalKMeansProgressResponse> {
1481+
struct TEvLocalKMeansResponse
1482+
: public TEventPB<TEvLocalKMeansResponse,
1483+
NKikimrTxDataShard::TEvLocalKMeansResponse,
1484+
TEvDataShard::EvLocalKMeansResponse> {
14701485
};
14711486

14721487
struct TEvKqpScan

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ class TDataShard
255255
class TTxHandleSafeBuildIndexScan;
256256
class TTxHandleSafeSampleKScan;
257257
class TTxHandleSafeLocalKMeansScan;
258+
class TTxHandleSafeReshuffleKMeansScan;
258259
class TTxHandleSafeStatisticsScan;
259260

260261
class TTxMediatorStateRestored;
@@ -1327,6 +1328,8 @@ class TDataShard
13271328
void HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx);
13281329
void Handle(TEvDataShard::TEvSampleKRequest::TPtr& ev, const TActorContext& ctx);
13291330
void HandleSafe(TEvDataShard::TEvSampleKRequest::TPtr& ev, const TActorContext& ctx);
1331+
void Handle(TEvDataShard::TEvReshuffleKMeansRequest::TPtr& ev, const TActorContext& ctx);
1332+
void HandleSafe(TEvDataShard::TEvReshuffleKMeansRequest::TPtr& ev, const TActorContext& ctx);
13301333
void Handle(TEvDataShard::TEvLocalKMeansRequest::TPtr& ev, const TActorContext& ctx);
13311334
void HandleSafe(TEvDataShard::TEvLocalKMeansRequest::TPtr& ev, const TActorContext& ctx);
13321335
void Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev, const TActorContext& ctx);
@@ -3132,6 +3135,7 @@ class TDataShard
31323135
HFunc(TEvDataShard::TEvDiscardVolatileSnapshotRequest, Handle);
31333136
HFuncTraced(TEvDataShard::TEvBuildIndexCreateRequest, Handle);
31343137
HFunc(TEvDataShard::TEvSampleKRequest, Handle);
3138+
HFunc(TEvDataShard::TEvReshuffleKMeansRequest, Handle);
31353139
HFunc(TEvDataShard::TEvLocalKMeansRequest, Handle);
31363140
HFunc(TEvDataShard::TEvCdcStreamScanRequest, Handle);
31373141
HFunc(TEvPrivate::TEvCdcStreamScanRegistered, Handle);

0 commit comments

Comments
 (0)