Skip to content

Commit 85c2f4d

Browse files
authored
fix(*): parallelState with completeType defaults to allOf (#106)
Signed-off-by: lsytj0413 <511121939@qq.com> Signed-off-by: lsytj0413 <511121939@qq.com>
1 parent 5a0a34f commit 85c2f4d

File tree

4 files changed

+325
-45
lines changed

4 files changed

+325
-45
lines changed

model/parallel_state.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Copyright 2022 The Serverless Workflow Specification Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package model
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
"fmt"
21+
"reflect"
22+
"strconv"
23+
24+
validator "github.com/go-playground/validator/v10"
25+
"k8s.io/apimachinery/pkg/util/intstr"
26+
27+
val "github.com/serverlessworkflow/sdk-go/v2/validator"
28+
)
29+
30+
// CompletionType define on how to complete branch execution.
31+
type CompletionType string
32+
33+
const (
34+
// CompletionTypeAllOf defines all branches must complete execution before the state can transition/end.
35+
CompletionTypeAllOf CompletionType = "allOf"
36+
// CompletionTypeAtLeast defines state can transition/end once at least the specified number of branches
37+
// have completed execution.
38+
CompletionTypeAtLeast CompletionType = "atLeast"
39+
)
40+
41+
// ParallelState Consists of a number of states that are executed in parallel
42+
type ParallelState struct {
43+
BaseState
44+
// Branch Definitions
45+
Branches []Branch `json:"branches" validate:"required,min=1,dive"`
46+
// Option types on how to complete branch execution.
47+
// Defaults to `allOf`
48+
CompletionType CompletionType `json:"completionType,omitempty" validate:"required,oneof=allOf atLeast"`
49+
50+
// Used when completionType is set to 'atLeast' to specify the minimum number of branches that must complete before the state will transition."
51+
// TODO: change this field to unmarshal result as int
52+
NumCompleted intstr.IntOrString `json:"numCompleted,omitempty"`
53+
// State specific timeouts
54+
Timeouts *ParallelStateTimeout `json:"timeouts,omitempty"`
55+
}
56+
57+
type parallelStateForUnmarshal ParallelState
58+
59+
// UnmarshalJSON unmarshal ParallelState object from json bytes
60+
func (s *ParallelState) UnmarshalJSON(b []byte) error {
61+
if len(b) == 0 {
62+
// TODO: Normalize error messages
63+
return fmt.Errorf("no bytes to unmarshal")
64+
}
65+
66+
v := &parallelStateForUnmarshal{
67+
CompletionType: CompletionTypeAllOf,
68+
}
69+
err := json.Unmarshal(b, v)
70+
if err != nil {
71+
return err
72+
}
73+
74+
*s = ParallelState(*v)
75+
76+
return nil
77+
}
78+
79+
// Branch Definition
80+
type Branch struct {
81+
// Branch name
82+
Name string `json:"name" validate:"required"`
83+
// Actions to be executed in this branch
84+
Actions []Action `json:"actions" validate:"required,min=1,dive"`
85+
// Timeouts State specific timeouts
86+
Timeouts *BranchTimeouts `json:"timeouts,omitempty"`
87+
}
88+
89+
// BranchTimeouts defines the specific timeout settings for branch
90+
type BranchTimeouts struct {
91+
// ActionExecTimeout Single actions definition execution timeout duration (ISO 8601 duration format)
92+
ActionExecTimeout string `json:"actionExecTimeout,omitempty" validate:"omitempty,iso8601duration"`
93+
// BranchExecTimeout Single branch execution timeout duration (ISO 8601 duration format)
94+
BranchExecTimeout string `json:"branchExecTimeout,omitempty" validate:"omitempty,iso8601duration"`
95+
}
96+
97+
// ParallelStateTimeout defines the specific timeout settings for parallel state
98+
type ParallelStateTimeout struct {
99+
StateExecTimeout *StateExecTimeout `json:"stateExecTimeout,omitempty"`
100+
BranchExecTimeout string `json:"branchExecTimeout,omitempty" validate:"omitempty,iso8601duration"`
101+
}
102+
103+
// ParallelStateStructLevelValidation custom validator for ParallelState
104+
func ParallelStateStructLevelValidation(_ context.Context, structLevel validator.StructLevel) {
105+
parallelStateObj := structLevel.Current().Interface().(ParallelState)
106+
107+
if parallelStateObj.CompletionType == CompletionTypeAllOf {
108+
return
109+
}
110+
111+
switch parallelStateObj.NumCompleted.Type {
112+
case intstr.Int:
113+
if parallelStateObj.NumCompleted.IntVal <= 0 {
114+
structLevel.ReportError(reflect.ValueOf(parallelStateObj.NumCompleted), "NumCompleted", "numCompleted", "gt0", "")
115+
}
116+
case intstr.String:
117+
v, err := strconv.Atoi(parallelStateObj.NumCompleted.StrVal)
118+
if err != nil {
119+
structLevel.ReportError(reflect.ValueOf(parallelStateObj.NumCompleted), "NumCompleted", "numCompleted", "gt0", err.Error())
120+
return
121+
}
122+
123+
if v <= 0 {
124+
structLevel.ReportError(reflect.ValueOf(parallelStateObj.NumCompleted), "NumCompleted", "numCompleted", "gt0", "")
125+
}
126+
}
127+
}
128+
129+
func init() {
130+
val.GetValidator().RegisterStructValidationCtx(
131+
ParallelStateStructLevelValidation,
132+
ParallelState{},
133+
)
134+
}

model/parallel_state_test.go

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
// Copyright 2022 The Serverless Workflow Specification Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package model
16+
17+
import (
18+
"encoding/json"
19+
"testing"
20+
21+
"github.com/stretchr/testify/assert"
22+
"k8s.io/apimachinery/pkg/util/intstr"
23+
24+
val "github.com/serverlessworkflow/sdk-go/v2/validator"
25+
)
26+
27+
func TestParallelStateUnmarshalJSON(t *testing.T) {
28+
type testCase struct {
29+
desp string
30+
data string
31+
expect *ParallelState
32+
err string
33+
}
34+
testCases := []testCase{
35+
{
36+
desp: "all field set",
37+
data: `{"completionType": "allOf", "numCompleted": 1}`,
38+
expect: &ParallelState{
39+
CompletionType: CompletionTypeAllOf,
40+
NumCompleted: intstr.FromInt(1),
41+
},
42+
err: ``,
43+
},
44+
{
45+
desp: "all optional field not set",
46+
data: `{"numCompleted": 1}`,
47+
expect: &ParallelState{
48+
CompletionType: CompletionTypeAllOf,
49+
NumCompleted: intstr.FromInt(1),
50+
},
51+
err: ``,
52+
},
53+
}
54+
for _, tc := range testCases {
55+
t.Run(tc.desp, func(t *testing.T) {
56+
var v ParallelState
57+
err := json.Unmarshal([]byte(tc.data), &v)
58+
59+
if tc.err != "" {
60+
assert.Error(t, err)
61+
assert.Regexp(t, tc.err, err)
62+
return
63+
}
64+
65+
assert.NoError(t, err)
66+
assert.Equal(t, tc.expect, &v)
67+
})
68+
}
69+
}
70+
71+
func TestParallelStateStructLevelValidation(t *testing.T) {
72+
type testCase struct {
73+
desp string
74+
state *ParallelState
75+
err string
76+
}
77+
testCases := []testCase{
78+
{
79+
desp: "normal",
80+
state: &ParallelState{
81+
BaseState: BaseState{
82+
Name: "1",
83+
Type: "parallel",
84+
},
85+
Branches: []Branch{
86+
{
87+
Name: "b1",
88+
Actions: []Action{
89+
{},
90+
},
91+
},
92+
},
93+
CompletionType: CompletionTypeAllOf,
94+
NumCompleted: intstr.FromInt(1),
95+
},
96+
err: ``,
97+
},
98+
{
99+
desp: "invalid completeType",
100+
state: &ParallelState{
101+
BaseState: BaseState{
102+
Name: "1",
103+
Type: "parallel",
104+
},
105+
Branches: []Branch{
106+
{
107+
Name: "b1",
108+
Actions: []Action{
109+
{},
110+
},
111+
},
112+
},
113+
CompletionType: CompletionTypeAllOf + "1",
114+
},
115+
err: `Key: 'ParallelState.CompletionType' Error:Field validation for 'CompletionType' failed on the 'oneof' tag`,
116+
},
117+
{
118+
desp: "invalid numCompleted `int`",
119+
state: &ParallelState{
120+
BaseState: BaseState{
121+
Name: "1",
122+
Type: "parallel",
123+
},
124+
Branches: []Branch{
125+
{
126+
Name: "b1",
127+
Actions: []Action{
128+
{},
129+
},
130+
},
131+
},
132+
CompletionType: CompletionTypeAtLeast,
133+
NumCompleted: intstr.FromInt(0),
134+
},
135+
err: `Key: 'ParallelState.NumCompleted' Error:Field validation for 'NumCompleted' failed on the 'gt0' tag`,
136+
},
137+
{
138+
desp: "invalid numCompleted string format",
139+
state: &ParallelState{
140+
BaseState: BaseState{
141+
Name: "1",
142+
Type: "parallel",
143+
},
144+
Branches: []Branch{
145+
{
146+
Name: "b1",
147+
Actions: []Action{
148+
{},
149+
},
150+
},
151+
},
152+
CompletionType: CompletionTypeAtLeast,
153+
NumCompleted: intstr.FromString("a"),
154+
},
155+
err: `Key: 'ParallelState.NumCompleted' Error:Field validation for 'NumCompleted' failed on the 'gt0' tag`,
156+
},
157+
{
158+
desp: "normal",
159+
state: &ParallelState{
160+
BaseState: BaseState{
161+
Name: "1",
162+
Type: "parallel",
163+
},
164+
Branches: []Branch{
165+
{
166+
Name: "b1",
167+
Actions: []Action{
168+
{},
169+
},
170+
},
171+
},
172+
CompletionType: CompletionTypeAtLeast,
173+
NumCompleted: intstr.FromString("0"),
174+
},
175+
err: `Key: 'ParallelState.NumCompleted' Error:Field validation for 'NumCompleted' failed on the 'gt0' tag`,
176+
},
177+
}
178+
for _, tc := range testCases {
179+
t.Run(tc.desp, func(t *testing.T) {
180+
err := val.GetValidator().Struct(tc.state)
181+
182+
if tc.err != "" {
183+
assert.Error(t, err)
184+
assert.Regexp(t, tc.err, err)
185+
return
186+
}
187+
188+
assert.NoError(t, err)
189+
})
190+
}
191+
}

model/states.go

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,6 @@ const (
4141
// StateTypeSleep ...
4242
StateTypeSleep = "sleep"
4343

44-
// CompletionTypeAllOf ...
45-
CompletionTypeAllOf CompletionType = "allOf"
46-
// CompletionTypeAtLeast ...
47-
CompletionTypeAtLeast CompletionType = "atLeast"
48-
4944
// ForEachModeTypeSequential ...
5045
ForEachModeTypeSequential ForEachModeType = "sequential"
5146
// ForEachModeTypeParallel ...
@@ -82,9 +77,6 @@ func getActionsModelMapping(stateType string, s map[string]interface{}) (State,
8277
// StateType ...
8378
type StateType string
8479

85-
// CompletionType Option types on how to complete branch execution.
86-
type CompletionType string
87-
8880
// ForEachModeType Specifies how iterations are to be performed (sequentially or in parallel)
8981
type ForEachModeType string
9082

@@ -172,25 +164,6 @@ type OperationStateTimeout struct {
172164
ActionExecTimeout string `json:"actionExecTimeout,omitempty" validate:"omitempty,min=1"`
173165
}
174166

175-
// ParallelState Consists of a number of states that are executed in parallel
176-
type ParallelState struct {
177-
BaseState
178-
// Branch Definitions
179-
Branches []Branch `json:"branches" validate:"required,min=1,dive"`
180-
// Option types on how to complete branch execution.
181-
CompletionType CompletionType `json:"completionType,omitempty"`
182-
// Used when completionType is set to 'atLeast' to specify the minimum number of branches that must complete before the state will transition."
183-
NumCompleted intstr.IntOrString `json:"numCompleted,omitempty"`
184-
// State specific timeouts
185-
Timeouts *ParallelStateTimeout `json:"timeouts,omitempty"`
186-
}
187-
188-
// ParallelStateTimeout ...
189-
type ParallelStateTimeout struct {
190-
StateExecTimeout StateExecTimeout `json:"stateExecTimeout,omitempty"`
191-
BranchExecTimeout string `json:"branchExecTimeout,omitempty" validate:"omitempty,min=1"`
192-
}
193-
194167
// InjectState ...
195168
type InjectState struct {
196169
BaseState

model/workflow.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -555,24 +555,6 @@ type StateDataFilter struct {
555555
Output string `json:"output,omitempty"`
556556
}
557557

558-
// Branch Definition
559-
type Branch struct {
560-
// Branch name
561-
Name string `json:"name" validate:"required"`
562-
// Actions to be executed in this branch
563-
Actions []Action `json:"actions" validate:"required,min=1"`
564-
// Timeouts State specific timeouts
565-
Timeouts *BranchTimeouts `json:"timeouts,omitempty"`
566-
}
567-
568-
// BranchTimeouts ...
569-
type BranchTimeouts struct {
570-
// ActionExecTimeout Single actions definition execution timeout duration (ISO 8601 duration format)
571-
ActionExecTimeout string `json:"actionExecTimeout,omitempty" validate:"omitempty,min=1"`
572-
// BranchExecTimeout Single branch execution timeout duration (ISO 8601 duration format)
573-
BranchExecTimeout string `json:"branchExecTimeout,omitempty" validate:"omitempty,min=1"`
574-
}
575-
576558
// DataInputSchema ...
577559
type DataInputSchema struct {
578560
Schema string `json:"schema" validate:"required"`

0 commit comments

Comments
 (0)