Skip to content

Commit a945664

Browse files
authored
Move executor code around to make it less clunky (#1605)
1 parent 82200a5 commit a945664

File tree

1 file changed

+68
-80
lines changed

1 file changed

+68
-80
lines changed

router/relay/executor.go

Lines changed: 68 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,63 @@ func (s *QueryStateExecutorImpl) copyFromExecutor(mgr meta.EntityMgr, q plan.Pla
606606
// TODO : unit tests
607607
func (s *QueryStateExecutorImpl) ExecuteSlice(qd *QueryDesc, mgr meta.EntityMgr, replyCl bool) error {
608608

609+
/* XXX: refactor this */
610+
expectRowDesc := false
611+
612+
switch qd.Msg.(type) {
613+
case *pgproto3.Query:
614+
expectRowDesc = true
615+
// ok
616+
case *pgproto3.Sync:
617+
// ok
618+
default:
619+
return rerrors.ErrExecutorSyncLost
620+
}
621+
622+
serv := s.Client().Server()
623+
624+
doFinalizeTx := false
625+
attachedCopy := false
626+
627+
switch qd.P.(type) {
628+
case *plan.VirtualPlan:
629+
default:
630+
if serv == nil {
631+
return fmt.Errorf("client %p is out of transaction sync with router", s.Client())
632+
}
633+
634+
attachedCopy = s.cl.ExecuteOn() != "" || s.TxStatus() == txstatus.TXACT
635+
636+
if p := qd.P; p != nil {
637+
638+
stmt := qd.P.Stmt()
639+
640+
if stmt != nil {
641+
switch stmt.(type) {
642+
case *lyx.Copy:
643+
spqrlog.Zero.Debug().Str("txstatus", serv.TxStatus().String()).Msg("prepared copy state")
644+
645+
if serv.TxStatus() == txstatus.TXIDLE {
646+
if err := s.DeploySliceTransactionQuery(serv, "BEGIN"); err != nil {
647+
return err
648+
}
649+
doFinalizeTx = true
650+
}
651+
}
652+
}
653+
}
654+
655+
spqrlog.Zero.Debug().
656+
Uints("shards", shard.ShardIDs(serv.Datashards())).
657+
Type("query-type", qd.Msg).Type("plan-type", qd.P).
658+
Msg("relay process plan")
659+
660+
statistics.RecordStartTime(statistics.StatisticsTypeShard, time.Now(), s.Client())
661+
if err := DispatchPlan(qd, serv, s.Client(), replyCl); err != nil {
662+
return err
663+
}
664+
}
665+
609666
switch q := qd.P.(type) {
610667
case *plan.VirtualPlan:
611668
/* execute logic without shard dispatch */
@@ -615,92 +672,36 @@ func (s *QueryStateExecutorImpl) ExecuteSlice(qd *QueryDesc, mgr meta.EntityMgr,
615672
if q.SubPlan == nil {
616673

617674
/* only send row description for simple proto case */
618-
switch qd.Msg.(type) {
619-
case *pgproto3.Query:
675+
676+
if expectRowDesc {
620677

621678
if err := s.Client().Send(&pgproto3.RowDescription{
622679
Fields: q.VirtualRowCols,
623680
}); err != nil {
624681
return err
625682
}
626683

627-
for _, vals := range q.VirtualRowVals {
684+
}
628685

629-
if err := s.Client().Send(&pgproto3.DataRow{
630-
Values: vals,
631-
}); err != nil {
632-
return err
633-
}
634-
}
686+
for _, vals := range q.VirtualRowVals {
635687

636-
if err := s.Client().Send(&pgproto3.CommandComplete{
637-
CommandTag: []byte("SELECT 1"),
688+
if err := s.Client().Send(&pgproto3.DataRow{
689+
Values: vals,
638690
}); err != nil {
639691
return err
640692
}
641-
case *pgproto3.Sync:
642-
643-
for _, vals := range q.VirtualRowVals {
644-
645-
if err := s.Client().Send(&pgproto3.DataRow{
646-
Values: vals,
647-
}); err != nil {
648-
return err
649-
}
650-
}
693+
}
651694

652-
if err := s.Client().Send(&pgproto3.CommandComplete{
653-
CommandTag: []byte("SELECT 1"),
654-
}); err != nil {
655-
return err
656-
}
695+
if err := s.Client().Send(&pgproto3.CommandComplete{
696+
CommandTag: []byte("SELECT 1"),
697+
}); err != nil {
698+
return err
657699
}
658700

659701
return nil
660702
} else {
661703
return rerrors.ErrExecutorSyncLost
662704
}
663-
}
664-
665-
serv := s.Client().Server()
666-
667-
if serv == nil {
668-
return fmt.Errorf("client %p is out of transaction sync with router", s.Client())
669-
}
670-
671-
doFinalizeTx := false
672-
attachedCopy := s.cl.ExecuteOn() != "" || s.TxStatus() == txstatus.TXACT
673-
674-
if p := qd.P; p != nil {
675-
676-
stmt := qd.P.Stmt()
677-
678-
if stmt != nil {
679-
switch stmt.(type) {
680-
case *lyx.Copy:
681-
spqrlog.Zero.Debug().Str("txstatus", serv.TxStatus().String()).Msg("prepared copy state")
682-
683-
if serv.TxStatus() == txstatus.TXIDLE {
684-
if err := s.DeploySliceTransactionQuery(serv, "BEGIN"); err != nil {
685-
return err
686-
}
687-
doFinalizeTx = true
688-
}
689-
}
690-
}
691-
}
692-
693-
spqrlog.Zero.Debug().
694-
Uints("shards", shard.ShardIDs(serv.Datashards())).
695-
Type("query-type", qd.Msg).Type("plan-type", qd.P).
696-
Msg("relay process plan")
697-
698-
statistics.RecordStartTime(statistics.StatisticsTypeShard, time.Now(), s.Client())
699-
if err := DispatchPlan(qd, serv, s.Client(), replyCl); err != nil {
700-
return err
701-
}
702-
703-
switch qd.P.(type) {
704705
case *plan.CopyPlan:
705706

706707
msg, _, err := serv.Receive()
@@ -727,19 +728,6 @@ func (s *QueryStateExecutorImpl) ExecuteSlice(qd *QueryDesc, mgr meta.EntityMgr,
727728
}
728729
}
729730

730-
/* XXX: refactor this */
731-
expectRowDesc := false
732-
733-
switch qd.Msg.(type) {
734-
case *pgproto3.Query:
735-
expectRowDesc = true
736-
// ok
737-
case *pgproto3.Sync:
738-
// ok
739-
default:
740-
return rerrors.ErrExecutorSyncLost
741-
}
742-
743731
for {
744732
msg, recvIndex, err := serv.Receive()
745733
if err != nil {

0 commit comments

Comments
 (0)