@@ -309,6 +309,11 @@ bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext&
309
309
TExprBase providerArg (node.Ref ().Child (1 ));
310
310
if (auto maybeDataSource = providerArg.Maybe <TCoDataSource>()) {
311
311
TStringBuf dataSourceCategory = maybeDataSource.Cast ().Category ();
312
+ if (dataSourceCategory == NYql::PgProviderName) {
313
+ // All pg reads should be replaced on TPgTableContent
314
+ return false ;
315
+ }
316
+
312
317
auto dataSourceProviderIt = types.DataSourceMap .find (dataSourceCategory);
313
318
if (dataSourceProviderIt != types.DataSourceMap .end ()) {
314
319
if (auto * dqIntegration = dataSourceProviderIt->second ->GetDqIntegration ()) {
@@ -362,7 +367,7 @@ bool IsDqWrite(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext&
362
367
return false ;
363
368
}
364
369
365
- bool ExploreTx (TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, TKiExploreTxResults& txRes,
370
+ bool ExploreNode (TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, TKiExploreTxResults& txRes,
366
371
TIntrusivePtr<TKikimrTablesData> tablesData, TTypeAnnotationContext& types) {
367
372
368
373
if (txRes.Ops .cend () != txRes.Ops .find (node.Raw ())) {
@@ -374,9 +379,9 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
374
379
return true ;
375
380
}
376
381
377
- if (auto maybeLeft = node.Maybe <TCoLeft>()) {
382
+ if (node.Maybe <TCoLeft>()) {
378
383
txRes.Ops .insert (node.Raw ());
379
- return ExploreTx (maybeLeft. Cast (). Input (), ctx, dataSink, txRes, tablesData, types) ;
384
+ return true ;
380
385
}
381
386
382
387
auto checkDataSource = [dataSink] (const TKiDataSource& ds) {
@@ -400,27 +405,24 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
400
405
YQL_ENSURE (key.GetKeyType () == TKikimrKey::Type::Table);
401
406
auto table = key.GetTablePath ();
402
407
txRes.Ops .insert (node.Raw ());
403
- auto result = ExploreTx (maybeRead.Cast ().World (), ctx, dataSink, txRes, tablesData, types);
404
408
405
409
YQL_ENSURE (tablesData);
406
410
const auto & tableData = tablesData->ExistingTable (cluster, table);
407
411
YQL_ENSURE (tableData.Metadata );
408
412
auto readColumns = read.GetSelectColumns (ctx, tableData);
409
413
txRes.AddReadOpToQueryBlock (key, readColumns, tableData.Metadata );
410
414
txRes.AddTableOperation (BuildTableOpNode (cluster, table, TYdbOperation::Select, read.Pos (), ctx));
411
- return result ;
415
+ return true ;
412
416
}
413
417
414
418
if (IsDqRead (node, ctx, types, true , &txRes.HasErrors )) {
415
419
txRes.Ops .insert (node.Raw ());
416
- TExprNode::TPtr worldChild = node.Raw ()->ChildPtr (0 );
417
- return ExploreTx (TExprBase (worldChild), ctx, dataSink, txRes, tablesData, types);
420
+ return true ;
418
421
}
419
422
420
423
if (IsPgRead (node, types)) {
421
424
txRes.Ops .insert (node.Raw ());
422
- TExprNode::TPtr worldChild = node.Raw ()->ChildPtr (0 );
423
- return ExploreTx (TExprBase (worldChild), ctx, dataSink, txRes, tablesData, types);
425
+ return true ;
424
426
}
425
427
426
428
if (auto maybeWrite = node.Maybe <TKiWriteTable>()) {
@@ -431,7 +433,6 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
431
433
432
434
auto table = write.Table ().Value ();
433
435
txRes.Ops .insert (node.Raw ());
434
- auto result = ExploreTx (write.World (), ctx, dataSink, txRes, tablesData, types);
435
436
auto tableOp = GetTableOp (write);
436
437
437
438
YQL_ENSURE (tablesData);
@@ -472,14 +473,13 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
472
473
}
473
474
474
475
txRes.AddTableOperation (BuildTableOpNode (cluster, table, tableOp, write.Pos (), ctx));
475
- return result ;
476
+ return true ;
476
477
}
477
478
478
479
if (IsDqWrite (node, ctx, types)) {
479
480
txRes.Ops .insert (node.Raw ());
480
481
txRes.AddEffect (node, THashMap<TString, TPrimitiveYdbOperations>{});
481
- TExprNode::TPtr worldChild = node.Raw ()->ChildPtr (0 );
482
- return ExploreTx (TExprBase (worldChild), ctx, dataSink, txRes, tablesData, types);
482
+ return true ;
483
483
}
484
484
485
485
if (auto maybeUpdate = node.Maybe <TKiUpdateTable>()) {
@@ -490,7 +490,6 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
490
490
491
491
auto table = update.Table ().Value ();
492
492
txRes.Ops .insert (node.Raw ());
493
- auto result = ExploreTx (update.World (), ctx, dataSink, txRes, tablesData, types);
494
493
const auto tableOp = TYdbOperation::Update;
495
494
496
495
YQL_ENSURE (tablesData);
@@ -525,7 +524,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
525
524
}
526
525
527
526
txRes.AddTableOperation (BuildTableOpNode (cluster, table, tableOp, update.Pos (), ctx));
528
- return result ;
527
+ return true ;
529
528
}
530
529
531
530
if (auto maybeDelete = node.Maybe <TKiDeleteTable>()) {
@@ -536,7 +535,6 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
536
535
537
536
auto table = del.Table ().Value ();
538
537
txRes.Ops .insert (node.Raw ());
539
- auto result = ExploreTx (del.World (), ctx, dataSink, txRes, tablesData, types);
540
538
const auto tableOp = TYdbOperation::Delete;
541
539
542
540
YQL_ENSURE (tablesData);
@@ -564,7 +562,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
564
562
}
565
563
566
564
txRes.AddTableOperation (BuildTableOpNode (cluster, table, tableOp, del.Pos (), ctx));
567
- return result ;
565
+ return true ;
568
566
}
569
567
570
568
if (auto maybeCreate = node.Maybe <TKiCreateTable>()) {
@@ -575,9 +573,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
575
573
576
574
auto table = create.Table ().Value ();
577
575
txRes.Ops .insert (node.Raw ());
578
- auto result = ExploreTx (create.World (), ctx, dataSink, txRes, tablesData, types);
579
576
txRes.AddTableOperation (BuildTableOpNode (cluster, table, TYdbOperation::CreateTable, create.Pos (), ctx));
580
- return result ;
577
+ return true ;
581
578
}
582
579
583
580
if (auto maybeDrop = node.Maybe <TKiDropTable>()) {
@@ -588,9 +585,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
588
585
589
586
auto table = drop.Table ().Value ();
590
587
txRes.Ops .insert (node.Raw ());
591
- auto result = ExploreTx (drop.World (), ctx, dataSink, txRes, tablesData, types);
592
588
txRes.AddTableOperation (BuildTableOpNode (cluster, table, TYdbOperation::DropTable, drop.Pos (), ctx));
593
- return result ;
589
+ return true ;
594
590
}
595
591
596
592
if (auto maybeAlter = node.Maybe <TKiAlterTable>()) {
@@ -601,9 +597,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
601
597
602
598
auto table = alter.Table ().Value ();
603
599
txRes.Ops .insert (node.Raw ());
604
- auto result = ExploreTx (alter.World (), ctx, dataSink, txRes, tablesData, types);
605
600
txRes.AddTableOperation (BuildTableOpNode (cluster, table, TYdbOperation::AlterTable, alter.Pos (), ctx));
606
- return result ;
601
+ return true ;
607
602
}
608
603
609
604
if (auto maybeCreateUser = node.Maybe <TKiCreateUser>()) {
@@ -613,9 +608,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
613
608
}
614
609
615
610
txRes.Ops .insert (node.Raw ());
616
- auto result = ExploreTx (createUser.World (), ctx, dataSink, txRes, tablesData, types);
617
611
txRes.AddTableOperation (BuildYdbOpNode (cluster, TYdbOperation::CreateUser, createUser.Pos (), ctx));
618
- return result ;
612
+ return true ;
619
613
}
620
614
621
615
if (auto maybeAlterUser = node.Maybe <TKiAlterUser>()) {
@@ -625,9 +619,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
625
619
}
626
620
627
621
txRes.Ops .insert (node.Raw ());
628
- auto result = ExploreTx (alterUser.World (), ctx, dataSink, txRes, tablesData, types);
629
622
txRes.AddTableOperation (BuildYdbOpNode (cluster, TYdbOperation::AlterUser, alterUser.Pos (), ctx));
630
- return result ;
623
+ return true ;
631
624
}
632
625
633
626
if (auto maybeDropUser = node.Maybe <TKiDropUser>()) {
@@ -637,9 +630,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
637
630
}
638
631
639
632
txRes.Ops .insert (node.Raw ());
640
- auto result = ExploreTx (dropUser.World (), ctx, dataSink, txRes, tablesData, types);
641
633
txRes.AddTableOperation (BuildYdbOpNode (cluster, TYdbOperation::DropUser, dropUser.Pos (), ctx));
642
- return result ;
634
+ return true ;
643
635
}
644
636
645
637
if (auto maybeCreateGroup = node.Maybe <TKiCreateGroup>()) {
@@ -649,9 +641,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
649
641
}
650
642
651
643
txRes.Ops .insert (node.Raw ());
652
- auto result = ExploreTx (createGroup.World (), ctx, dataSink, txRes, tablesData, types);
653
644
txRes.AddTableOperation (BuildYdbOpNode (cluster, TYdbOperation::CreateGroup, createGroup.Pos (), ctx));
654
- return result ;
645
+ return true ;
655
646
}
656
647
657
648
if (auto maybeAlterGroup = node.Maybe <TKiAlterGroup>()) {
@@ -661,9 +652,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
661
652
}
662
653
663
654
txRes.Ops .insert (node.Raw ());
664
- auto result = ExploreTx (alterGroup.World (), ctx, dataSink, txRes, tablesData, types);
665
655
txRes.AddTableOperation (BuildYdbOpNode (cluster, TYdbOperation::AlterGroup, alterGroup.Pos (), ctx));
666
- return result ;
656
+ return true ;
667
657
}
668
658
669
659
if (auto maybeRenameGroup = node.Maybe <TKiRenameGroup>()) {
@@ -673,9 +663,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
673
663
}
674
664
675
665
txRes.Ops .insert (node.Raw ());
676
- auto result = ExploreTx (renameGroup.World (), ctx, dataSink, txRes, tablesData, types);
677
666
txRes.AddTableOperation (BuildYdbOpNode (cluster, TYdbOperation::RenameGroup, renameGroup.Pos (), ctx));
678
- return result ;
667
+ return true ;
679
668
}
680
669
681
670
if (auto maybeDropGroup = node.Maybe <TKiDropGroup>()) {
@@ -685,9 +674,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
685
674
}
686
675
687
676
txRes.Ops .insert (node.Raw ());
688
- auto result = ExploreTx (dropGroup.World (), ctx, dataSink, txRes, tablesData, types);
689
677
txRes.AddTableOperation (BuildYdbOpNode (cluster, TYdbOperation::DropGroup, dropGroup.Pos (), ctx));
690
- return result ;
678
+ return true ;
691
679
}
692
680
693
681
if (auto maybeExecQuery = node.Maybe <TKiExecDataQuery>()) {
@@ -700,39 +688,29 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
700
688
return true ;
701
689
}
702
690
703
- if (auto maybeCommit = node.Maybe <TCoCommit>()) {
704
- auto commit = maybeCommit.Cast ();
705
-
706
- if (commit.DataSink ().Maybe <TKiDataSink>() && checkDataSink (commit.DataSink ().Cast <TKiDataSink>())) {
707
- txRes.Sync .push_back (commit);
708
- return true ;
709
- }
710
-
711
- return ExploreTx (commit.World (), ctx, dataSink, txRes, tablesData, types);
691
+ if (node.Maybe <TCoCommit>()) {
692
+ return true ;
712
693
}
713
694
714
- if (auto maybeSync = node.Maybe <TCoSync>()) {
695
+ if (node.Maybe <TCoSync>()) {
715
696
txRes.Ops .insert (node.Raw ());
716
- for (auto child : maybeSync.Cast ()) {
717
- if (!ExploreTx (child, ctx, dataSink, txRes, tablesData, types)) {
718
- return false ;
719
- }
720
- }
721
697
return true ;
722
698
}
723
699
724
700
if (node.Maybe <TResWrite>() ||
725
701
node.Maybe <TResPull>())
726
702
{
727
703
txRes.Ops .insert (node.Raw ());
728
- bool result = ExploreTx (TExprBase (node.Ref ().ChildPtr (0 )), ctx, dataSink, txRes, tablesData, types);
729
- // Cerr << KqpExprToPrettyString(*node.Raw(), ctx) << Endl;
730
704
txRes.AddResult (node);
731
- return result ;
705
+ return true ;
732
706
}
733
707
734
708
if (node.Ref ().IsCallable (ConfigureName)) {
735
- txRes.Sync .push_back (node);
709
+ return true ;
710
+ }
711
+
712
+ if (node.Maybe <TCoCons>()) {
713
+ txRes.Ops .insert (node.Raw ());
736
714
return true ;
737
715
}
738
716
@@ -757,32 +735,42 @@ bool IsKikimrPureNode(const TExprNode::TPtr& node) {
757
735
return true ;
758
736
}
759
737
760
- bool CheckTx (TExprBase txStart, const TKiDataSink& dataSink, const THashSet< const TExprNode*>& txOps ,
761
- const THashSet< const TExprNode*>& txSync )
738
+ bool ExploreTx (TExprBase root, TExprContext& ctx, const TKiDataSink& dataSink, TKiExploreTxResults& txRes ,
739
+ TIntrusivePtr<TKikimrTablesData> tablesData, TTypeAnnotationContext& types )
762
740
{
763
- bool hasErrors = false ;
764
- VisitExpr (txStart.Ptr (), [&txOps, &txSync, &hasErrors, dataSink] (const TExprNode::TPtr& node) {
765
- if (hasErrors) {
766
- return false ;
741
+ const auto preFunc = [&dataSink, &txRes](const TExprNode::TPtr& node) {
742
+ if (const auto maybeCommit = TExprBase (node).Maybe <TCoCommit>()) {
743
+ const auto commit = maybeCommit.Cast ();
744
+ if (commit.DataSink ().Maybe <TKiDataSink>() && commit.DataSink ().Cast <TKiDataSink>().Raw () == dataSink.Raw ()) {
745
+ txRes.Sync .push_back (commit);
746
+ return false ;
747
+ }
748
+ return true ;
767
749
}
768
750
769
- if (txSync.find (node.Get ()) != txSync.cend ()) {
751
+ if (node->IsCallable (ConfigureName)) {
752
+ txRes.Sync .push_back (TExprBase (node));
770
753
return false ;
771
754
}
772
755
773
- if (auto maybeCommit = TMaybeNode<TCoCommit>(node)) {
774
- if (maybeCommit.Cast ().DataSink ().Raw () != dataSink.Raw ()) {
775
- return true ;
776
- }
756
+ return true ;
757
+ };
758
+
759
+ bool hasErrors = false ;
760
+ const auto postFunc = [&hasErrors, &ctx, &dataSink, &txRes, tablesData, &types](const TExprNode::TPtr& node) {
761
+ if (hasErrors) {
762
+ return false ;
777
763
}
778
764
779
- if (!IsKikimrPureNode ( node) && txOps. find (node. Get ()) == txOps. cend ( )) {
765
+ if (!ExploreNode ( TExprBase ( node), ctx, dataSink, txRes, tablesData, types) && ! IsKikimrPureNode (node )) {
780
766
hasErrors = true ;
781
767
return false ;
782
768
}
783
769
784
770
return true ;
785
- });
771
+ };
772
+
773
+ VisitExpr (root.Ptr (), preFunc, postFunc);
786
774
787
775
return !hasErrors;
788
776
}
@@ -964,11 +952,6 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TStringBuf datab
964
952
return node.Ptr ();
965
953
}
966
954
967
- auto txSyncSet = txExplore.GetSyncSet ();
968
- if (!CheckTx (commit.World (), kiDataSink, txExplore.Ops , txSyncSet)) {
969
- return node.Ptr ();
970
- }
971
-
972
955
bool hasScheme;
973
956
bool hasData;
974
957
txExplore.GetTableOperations (hasScheme, hasData);
0 commit comments