@@ -520,146 +520,150 @@ void TDataShard::HandleSafe(TEvDataShard::TEvLocalKMeansRequest::TPtr& ev, const
520
520
: GetMvccTxVersion (EMvccTxMode::ReadOnly);
521
521
TScanRecord::TSeqNo seqNo = {request.GetSeqNoGeneration (), request.GetSeqNoRound ()};
522
522
523
- auto response = MakeHolder<TEvDataShard::TEvLocalKMeansResponse>();
524
- response->Record .SetId (id);
525
- response->Record .SetTabletId (TabletID ());
526
- response->Record .SetRequestSeqNoGeneration (seqNo.Generation );
527
- response->Record .SetRequestSeqNoRound (seqNo.Round );
528
-
529
- LOG_N (" Starting TLocalKMeansScan TabletId: " << TabletID ()
530
- << " " << request.ShortDebugString ()
531
- << " row version " << rowVersion);
532
-
533
- // Note: it's very unlikely that we have volatile txs before this snapshot
534
- if (VolatileTxManager.HasVolatileTxsAtSnapshot (rowVersion)) {
535
- VolatileTxManager.AttachWaitingSnapshotEvent (rowVersion, std::unique_ptr<IEventHandle>(ev.Release ()));
536
- return ;
537
- }
538
-
539
- auto badRequest = [&](const TString& error) {
540
- response->Record .SetStatus (NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST);
541
- auto issue = response->Record .AddIssues ();
542
- issue->set_severity (NYql::TSeverityIds::S_ERROR);
543
- issue->set_message (error);
544
- };
545
- auto trySendBadRequest = [&] {
546
- if (response->Record .GetStatus () == NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST) {
547
- LOG_E (" Rejecting TLocalKMeansScan bad request TabletId: " << TabletID ()
548
- << " " << request.ShortDebugString ()
549
- << " with response " << response->Record .ShortDebugString ());
550
- ctx.Send (ev->Sender , std::move (response));
551
- return true ;
552
- } else {
553
- return false ;
523
+ try {
524
+ auto response = MakeHolder<TEvDataShard::TEvLocalKMeansResponse>();
525
+ response->Record .SetId (id);
526
+ response->Record .SetTabletId (TabletID ());
527
+ response->Record .SetRequestSeqNoGeneration (seqNo.Generation );
528
+ response->Record .SetRequestSeqNoRound (seqNo.Round );
529
+
530
+ LOG_N (" Starting TLocalKMeansScan TabletId: " << TabletID ()
531
+ << " " << request.ShortDebugString ()
532
+ << " row version " << rowVersion);
533
+
534
+ // Note: it's very unlikely that we have volatile txs before this snapshot
535
+ if (VolatileTxManager.HasVolatileTxsAtSnapshot (rowVersion)) {
536
+ VolatileTxManager.AttachWaitingSnapshotEvent (rowVersion, std::unique_ptr<IEventHandle>(ev.Release ()));
537
+ return ;
554
538
}
555
- };
556
539
557
- // 1. Validating table and path existence
558
- if (request.GetTabletId () != TabletID ()) {
559
- badRequest (TStringBuilder () << " Wrong shard " << request.GetTabletId () << " this is " << TabletID ());
560
- }
561
- if (!IsStateActive ()) {
562
- badRequest (TStringBuilder () << " Shard " << TabletID () << " is " << State << " and not ready for requests" );
563
- }
564
- const auto pathId = TPathId::FromProto (request.GetPathId ());
565
- const auto * userTableIt = GetUserTables ().FindPtr (pathId.LocalPathId );
566
- if (!userTableIt) {
567
- badRequest (TStringBuilder () << " Unknown table id: " << pathId.LocalPathId );
568
- }
569
- if (trySendBadRequest ()) {
570
- return ;
571
- }
572
- const auto & userTable = **userTableIt;
540
+ auto badRequest = [&](const TString& error) {
541
+ response->Record .SetStatus (NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST);
542
+ auto issue = response->Record .AddIssues ();
543
+ issue->set_severity (NYql::TSeverityIds::S_ERROR);
544
+ issue->set_message (error);
545
+ };
546
+ auto trySendBadRequest = [&] {
547
+ if (response->Record .GetStatus () == NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST) {
548
+ LOG_E (" Rejecting TLocalKMeansScan bad request TabletId: " << TabletID ()
549
+ << " " << request.ShortDebugString ()
550
+ << " with response " << response->Record .ShortDebugString ());
551
+ ctx.Send (ev->Sender , std::move (response));
552
+ return true ;
553
+ } else {
554
+ return false ;
555
+ }
556
+ };
573
557
574
- // 2. Validating request fields
575
- if (request.HasSnapshotStep () || request.HasSnapshotTxId ()) {
576
- const TSnapshotKey snapshotKey (pathId, rowVersion.Step , rowVersion.TxId );
577
- if (!SnapshotManager.FindAvailable (snapshotKey)) {
578
- badRequest (TStringBuilder () << " Unknown snapshot for path id " << pathId.OwnerId << " :" << pathId.LocalPathId
579
- << " , snapshot step is " << snapshotKey.Step << " , snapshot tx is " << snapshotKey.TxId );
558
+ // 1. Validating table and path existence
559
+ if (request.GetTabletId () != TabletID ()) {
560
+ badRequest (TStringBuilder () << " Wrong shard " << request.GetTabletId () << " this is " << TabletID ());
561
+ }
562
+ if (!IsStateActive ()) {
563
+ badRequest (TStringBuilder () << " Shard " << TabletID () << " is " << State << " and not ready for requests" );
564
+ }
565
+ const auto pathId = TPathId::FromProto (request.GetPathId ());
566
+ const auto * userTableIt = GetUserTables ().FindPtr (pathId.LocalPathId );
567
+ if (!userTableIt) {
568
+ badRequest (TStringBuilder () << " Unknown table id: " << pathId.LocalPathId );
569
+ }
570
+ if (trySendBadRequest ()) {
571
+ return ;
572
+ }
573
+ const auto & userTable = **userTableIt;
574
+
575
+ // 2. Validating request fields
576
+ if (request.HasSnapshotStep () || request.HasSnapshotTxId ()) {
577
+ const TSnapshotKey snapshotKey (pathId, rowVersion.Step , rowVersion.TxId );
578
+ if (!SnapshotManager.FindAvailable (snapshotKey)) {
579
+ badRequest (TStringBuilder () << " Unknown snapshot for path id " << pathId.OwnerId << " :" << pathId.LocalPathId
580
+ << " , snapshot step is " << snapshotKey.Step << " , snapshot tx is " << snapshotKey.TxId );
581
+ }
580
582
}
581
- }
582
583
583
- if (request.GetUpload () != NKikimrTxDataShard::UPLOAD_MAIN_TO_BUILD
584
- && request.GetUpload () != NKikimrTxDataShard::UPLOAD_MAIN_TO_POSTING
585
- && request.GetUpload () != NKikimrTxDataShard::UPLOAD_BUILD_TO_BUILD
586
- && request.GetUpload () != NKikimrTxDataShard::UPLOAD_BUILD_TO_POSTING)
587
- {
588
- badRequest (" Wrong upload" );
589
- }
584
+ if (request.GetUpload () != NKikimrTxDataShard::UPLOAD_MAIN_TO_BUILD
585
+ && request.GetUpload () != NKikimrTxDataShard::UPLOAD_MAIN_TO_POSTING
586
+ && request.GetUpload () != NKikimrTxDataShard::UPLOAD_BUILD_TO_BUILD
587
+ && request.GetUpload () != NKikimrTxDataShard::UPLOAD_BUILD_TO_POSTING)
588
+ {
589
+ badRequest (" Wrong upload" );
590
+ }
590
591
591
- if (request.GetK () < 2 ) {
592
- badRequest (" Should be requested partition on at least two rows" );
593
- }
592
+ if (request.GetK () < 2 ) {
593
+ badRequest (" Should be requested partition on at least two rows" );
594
+ }
594
595
595
- const auto parentFrom = request.GetParentFrom ();
596
- const auto parentTo = request.GetParentTo ();
597
- NTable::TLead lead;
598
- if (parentFrom == 0 ) {
599
- if (request.GetUpload () == NKikimrTxDataShard::UPLOAD_BUILD_TO_BUILD
600
- || request.GetUpload () == NKikimrTxDataShard::UPLOAD_BUILD_TO_POSTING)
596
+ const auto parentFrom = request.GetParentFrom ();
597
+ const auto parentTo = request.GetParentTo ();
598
+ NTable::TLead lead;
599
+ if (parentFrom == 0 ) {
600
+ if (request.GetUpload () == NKikimrTxDataShard::UPLOAD_BUILD_TO_BUILD
601
+ || request.GetUpload () == NKikimrTxDataShard::UPLOAD_BUILD_TO_POSTING)
602
+ {
603
+ badRequest (" Wrong upload for zero parent" );
604
+ }
605
+ lead.To ({}, NTable::ESeek::Lower);
606
+ } else if (parentFrom > parentTo) {
607
+ badRequest (TStringBuilder () << " Parent from " << parentFrom << " should be less or equal to parent to " << parentTo);
608
+ } else if (request.GetUpload () == NKikimrTxDataShard::UPLOAD_MAIN_TO_BUILD
609
+ || request.GetUpload () == NKikimrTxDataShard::UPLOAD_MAIN_TO_POSTING)
601
610
{
602
- badRequest (" Wrong upload for zero parent" );
611
+ badRequest (" Wrong upload for non-zero parent" );
612
+ } else {
613
+ TCell from = TCell::Make (parentFrom - 1 );
614
+ TCell to = TCell::Make (parentTo);
615
+ TTableRange range{{&from, 1 }, false , {&to, 1 }, true };
616
+ auto scanRange = Intersect (userTable.KeyColumnTypes , range, userTable.Range .ToTableRange ());
617
+ if (scanRange.IsEmptyRange (userTable.KeyColumnTypes )) {
618
+ badRequest (TStringBuilder () << " Requested range doesn't intersect with table range:"
619
+ << " requestedRange: " << DebugPrintRange (userTable.KeyColumnTypes , range, *AppData ()->TypeRegistry )
620
+ << " tableRange: " << DebugPrintRange (userTable.KeyColumnTypes , userTable.Range .ToTableRange (), *AppData ()->TypeRegistry )
621
+ << " scanRange: " << DebugPrintRange (userTable.KeyColumnTypes , scanRange, *AppData ()->TypeRegistry ));
622
+ }
623
+ lead.To (range.From , NTable::ESeek::Upper);
624
+ lead.Until (range.To , true );
603
625
}
604
- lead.To ({}, NTable::ESeek::Lower);
605
- } else if (parentFrom > parentTo) {
606
- badRequest (TStringBuilder () << " Parent from " << parentFrom << " should be less or equal to parent to " << parentTo);
607
- } else if (request.GetUpload () == NKikimrTxDataShard::UPLOAD_MAIN_TO_BUILD
608
- || request.GetUpload () == NKikimrTxDataShard::UPLOAD_MAIN_TO_POSTING)
609
- {
610
- badRequest (" Wrong upload for non-zero parent" );
611
- } else {
612
- TCell from = TCell::Make (parentFrom - 1 );
613
- TCell to = TCell::Make (parentTo);
614
- TTableRange range{{&from, 1 }, false , {&to, 1 }, true };
615
- auto scanRange = Intersect (userTable.KeyColumnTypes , range, userTable.Range .ToTableRange ());
616
- if (scanRange.IsEmptyRange (userTable.KeyColumnTypes )) {
617
- badRequest (TStringBuilder () << " Requested range doesn't intersect with table range:"
618
- << " requestedRange: " << DebugPrintRange (userTable.KeyColumnTypes , range, *AppData ()->TypeRegistry )
619
- << " tableRange: " << DebugPrintRange (userTable.KeyColumnTypes , userTable.Range .ToTableRange (), *AppData ()->TypeRegistry )
620
- << " scanRange: " << DebugPrintRange (userTable.KeyColumnTypes , scanRange, *AppData ()->TypeRegistry ));
621
- }
622
- lead.To (range.From , NTable::ESeek::Upper);
623
- lead.Until (range.To , true );
624
- }
625
626
626
- if (!request.HasLevelName ()) {
627
- badRequest (TStringBuilder () << " Empty level table name" );
628
- }
629
- if (!request.HasOutputName ()) {
630
- badRequest (TStringBuilder () << " Empty output table name" );
631
- }
627
+ if (!request.HasLevelName ()) {
628
+ badRequest (TStringBuilder () << " Empty level table name" );
629
+ }
630
+ if (!request.HasOutputName ()) {
631
+ badRequest (TStringBuilder () << " Empty output table name" );
632
+ }
632
633
633
- auto tags = GetAllTags (userTable);
634
- if (!tags.contains (request.GetEmbeddingColumn ())) {
635
- badRequest (TStringBuilder () << " Unknown embedding column: " << request.GetEmbeddingColumn ());
636
- }
637
- for (auto dataColumn : request.GetDataColumns ()) {
638
- if (!tags.contains (dataColumn)) {
639
- badRequest (TStringBuilder () << " Unknown data column: " << dataColumn);
634
+ auto tags = GetAllTags (userTable);
635
+ if (!tags.contains (request.GetEmbeddingColumn ())) {
636
+ badRequest (TStringBuilder () << " Unknown embedding column: " << request.GetEmbeddingColumn ());
637
+ }
638
+ for (auto dataColumn : request.GetDataColumns ()) {
639
+ if (!tags.contains (dataColumn)) {
640
+ badRequest (TStringBuilder () << " Unknown data column: " << dataColumn);
641
+ }
640
642
}
641
- }
642
643
643
- if (trySendBadRequest ()) {
644
- return ;
645
- }
644
+ if (trySendBadRequest ()) {
645
+ return ;
646
+ }
646
647
647
- // 3. Validating vector index settings
648
- TAutoPtr<NTable::IScan> scan;
649
- auto createScan = [&]<typename T> {
650
- scan = new TLocalKMeansScan<T>{
651
- TabletID (), userTable, request, ev->Sender , std::move (response),
652
- std::move (lead)
648
+ // 3. Validating vector index settings
649
+ TAutoPtr<NTable::IScan> scan;
650
+ auto createScan = [&]<typename T> {
651
+ scan = new TLocalKMeansScan<T>{
652
+ TabletID (), userTable, request, ev->Sender , std::move (response),
653
+ std::move (lead)
654
+ };
653
655
};
654
- };
655
- MakeScan (request, createScan, badRequest);
656
- if (!scan) {
657
- auto sent = trySendBadRequest ();
658
- Y_ENSURE (sent);
659
- return ;
660
- }
656
+ MakeScan (request, createScan, badRequest);
657
+ if (!scan) {
658
+ auto sent = trySendBadRequest ();
659
+ Y_ENSURE (sent);
660
+ return ;
661
+ }
661
662
662
- StartScan (this , std::move (scan), id, seqNo, rowVersion, userTable.LocalTid );
663
+ StartScan (this , std::move (scan), id, seqNo, rowVersion, userTable.LocalTid );
664
+ } catch (const std::exception& exc) {
665
+ FailScan<TEvDataShard::TEvLocalKMeansResponse>(id, TabletID (), ev->Sender , seqNo, exc, " TLocalKMeansScan" );
666
+ }
663
667
}
664
668
665
669
}
0 commit comments