Skip to content

Commit 7723e03

Browse files
authored
fix(109): split foreach state to separate file (#118)
Signed-off-by: lsytj0413 <511121939@qq.com> Signed-off-by: lsytj0413 <511121939@qq.com>
1 parent 85c2f4d commit 7723e03

File tree

3 files changed

+319
-52
lines changed

3 files changed

+319
-52
lines changed

model/foreach_state.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Copyright 2022 The Serverless Workflow Specification Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package model
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
"fmt"
21+
"reflect"
22+
"strconv"
23+
24+
validator "github.com/go-playground/validator/v10"
25+
"k8s.io/apimachinery/pkg/util/intstr"
26+
27+
val "github.com/serverlessworkflow/sdk-go/v2/validator"
28+
)
29+
30+
func init() {
31+
val.GetValidator().RegisterStructValidationCtx(ForEachStateStructLevelValidation, ForEachState{})
32+
}
33+
34+
// ForEachModeType Specifies how iterations are to be performed (sequentially or in parallel)
35+
type ForEachModeType string
36+
37+
const (
38+
// ForEachModeTypeSequential specifies iterations should be done sequentially.
39+
ForEachModeTypeSequential ForEachModeType = "sequential"
40+
// ForEachModeTypeParallel specifies iterations should be done parallel.
41+
ForEachModeTypeParallel ForEachModeType = "parallel"
42+
)
43+
44+
// ForEachState used to execute actions for each element of a data set.
45+
type ForEachState struct {
46+
BaseState
47+
// Workflow expression selecting an array element of the states data
48+
InputCollection string `json:"inputCollection" validate:"required"`
49+
// Workflow expression specifying an array element of the states data to add the results of each iteration
50+
OutputCollection string `json:"outputCollection,omitempty"`
51+
// Name of the iteration parameter that can be referenced in actions/workflow. For each parallel iteration, this param should contain an unique element of the inputCollection array
52+
IterationParam string `json:"iterationParam,omitempty"`
53+
// Specifies how upper bound on how many iterations may run in parallel
54+
BatchSize *intstr.IntOrString `json:"batchSize,omitempty"`
55+
// Actions to be executed for each of the elements of inputCollection
56+
Actions []Action `json:"actions,omitempty" validate:"required,min=1,dive"`
57+
// State specific timeout
58+
Timeouts *ForEachStateTimeout `json:"timeouts,omitempty"`
59+
// Mode Specifies how iterations are to be performed (sequentially or in parallel)
60+
// Defaults to parallel
61+
Mode ForEachModeType `json:"mode,omitempty"`
62+
}
63+
64+
type forEachStateForUnmarshal ForEachState
65+
66+
func (f *ForEachState) UnmarshalJSON(data []byte) error {
67+
v := forEachStateForUnmarshal{
68+
Mode: ForEachModeTypeParallel,
69+
}
70+
err := json.Unmarshal(data, &v)
71+
if err != nil {
72+
return fmt.Errorf("forEachState value '%s' is not supported, it must be an object or string", string(data))
73+
}
74+
75+
*f = ForEachState(v)
76+
return nil
77+
}
78+
79+
// ForEachStateStructLevelValidation custom validator for ForEachState
80+
func ForEachStateStructLevelValidation(_ context.Context, structLevel validator.StructLevel) {
81+
stateObj := structLevel.Current().Interface().(ForEachState)
82+
83+
if stateObj.Mode != ForEachModeTypeParallel {
84+
return
85+
}
86+
87+
if stateObj.BatchSize == nil {
88+
return
89+
}
90+
91+
switch stateObj.BatchSize.Type {
92+
case intstr.Int:
93+
if stateObj.BatchSize.IntVal <= 0 {
94+
structLevel.ReportError(reflect.ValueOf(stateObj.BatchSize), "BatchSize", "batchSize", "gt0", "")
95+
}
96+
case intstr.String:
97+
v, err := strconv.Atoi(stateObj.BatchSize.StrVal)
98+
if err != nil {
99+
structLevel.ReportError(reflect.ValueOf(stateObj.BatchSize), "BatchSize", "batchSize", "gt0", err.Error())
100+
return
101+
}
102+
103+
if v <= 0 {
104+
structLevel.ReportError(reflect.ValueOf(stateObj.BatchSize), "BatchSize", "batchSize", "gt0", "")
105+
}
106+
}
107+
}
108+
109+
// ForEachStateTimeout defines timeout settings for foreach state
110+
type ForEachStateTimeout struct {
111+
StateExecTimeout *StateExecTimeout `json:"stateExecTimeout,omitempty"`
112+
ActionExecTimeout string `json:"actionExecTimeout,omitempty" validate:"omitempty,iso8601duration"`
113+
}

model/foreach_state_test.go

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
// Copyright 2022 The Serverless Workflow Specification Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package model
16+
17+
import (
18+
"encoding/json"
19+
"testing"
20+
21+
"github.com/stretchr/testify/assert"
22+
"k8s.io/apimachinery/pkg/util/intstr"
23+
24+
val "github.com/serverlessworkflow/sdk-go/v2/validator"
25+
)
26+
27+
func TestForEachStateUnmarshalJSON(t *testing.T) {
28+
type testCase struct {
29+
desp string
30+
data string
31+
expect *ForEachState
32+
err string
33+
}
34+
testCases := []testCase{
35+
{
36+
desp: "all field",
37+
data: `{"mode": "sequential"}`,
38+
expect: &ForEachState{
39+
Mode: ForEachModeTypeSequential,
40+
},
41+
err: ``,
42+
},
43+
{
44+
desp: "mode unset",
45+
data: `{}`,
46+
expect: &ForEachState{
47+
Mode: ForEachModeTypeParallel,
48+
},
49+
err: ``,
50+
},
51+
{
52+
desp: "invalid json format",
53+
data: `{"mode": 1}`,
54+
expect: nil,
55+
err: `forEachState value '{"mode": 1}' is not supported, it must be an object or string`,
56+
},
57+
}
58+
for _, tc := range testCases {
59+
t.Run(tc.desp, func(t *testing.T) {
60+
var v ForEachState
61+
err := json.Unmarshal([]byte(tc.data), &v)
62+
63+
if tc.err != "" {
64+
assert.Error(t, err)
65+
assert.Regexp(t, tc.err, err)
66+
return
67+
}
68+
69+
assert.NoError(t, err)
70+
assert.Equal(t, tc.expect, &v)
71+
})
72+
}
73+
}
74+
75+
func TestForEachStateStructLevelValidation(t *testing.T) {
76+
type testCase struct {
77+
desp string
78+
state ForEachState
79+
err string
80+
}
81+
testCases := []testCase{
82+
{
83+
desp: "normal test & sequential",
84+
state: ForEachState{
85+
BaseState: BaseState{
86+
Name: "1",
87+
Type: "2",
88+
},
89+
InputCollection: "3",
90+
Actions: []Action{
91+
{},
92+
},
93+
Mode: ForEachModeTypeSequential,
94+
},
95+
err: ``,
96+
},
97+
{
98+
desp: "normal test & parallel int",
99+
state: ForEachState{
100+
BaseState: BaseState{
101+
Name: "1",
102+
Type: "2",
103+
},
104+
InputCollection: "3",
105+
Actions: []Action{
106+
{},
107+
},
108+
Mode: ForEachModeTypeParallel,
109+
BatchSize: &intstr.IntOrString{
110+
Type: intstr.Int,
111+
IntVal: 1,
112+
},
113+
},
114+
err: ``,
115+
},
116+
{
117+
desp: "normal test & parallel string",
118+
state: ForEachState{
119+
BaseState: BaseState{
120+
Name: "1",
121+
Type: "2",
122+
},
123+
InputCollection: "3",
124+
Actions: []Action{
125+
{},
126+
},
127+
Mode: ForEachModeTypeParallel,
128+
BatchSize: &intstr.IntOrString{
129+
Type: intstr.String,
130+
StrVal: "1",
131+
},
132+
},
133+
err: ``,
134+
},
135+
{
136+
desp: "invalid parallel int",
137+
state: ForEachState{
138+
BaseState: BaseState{
139+
Name: "1",
140+
Type: "2",
141+
},
142+
InputCollection: "3",
143+
Actions: []Action{
144+
{},
145+
},
146+
Mode: ForEachModeTypeParallel,
147+
BatchSize: &intstr.IntOrString{
148+
Type: intstr.Int,
149+
IntVal: 0,
150+
},
151+
},
152+
err: `Key: 'ForEachState.BatchSize' Error:Field validation for 'BatchSize' failed on the 'gt0' tag`,
153+
},
154+
{
155+
desp: "invalid parallel string",
156+
state: ForEachState{
157+
BaseState: BaseState{
158+
Name: "1",
159+
Type: "2",
160+
},
161+
InputCollection: "3",
162+
Actions: []Action{
163+
{},
164+
},
165+
Mode: ForEachModeTypeParallel,
166+
BatchSize: &intstr.IntOrString{
167+
Type: intstr.String,
168+
StrVal: "0",
169+
},
170+
},
171+
err: `Key: 'ForEachState.BatchSize' Error:Field validation for 'BatchSize' failed on the 'gt0' tag`,
172+
},
173+
{
174+
desp: "invalid parallel string format",
175+
state: ForEachState{
176+
BaseState: BaseState{
177+
Name: "1",
178+
Type: "2",
179+
},
180+
InputCollection: "3",
181+
Actions: []Action{
182+
{},
183+
},
184+
Mode: ForEachModeTypeParallel,
185+
BatchSize: &intstr.IntOrString{
186+
Type: intstr.String,
187+
StrVal: "a",
188+
},
189+
},
190+
err: `Key: 'ForEachState.BatchSize' Error:Field validation for 'BatchSize' failed on the 'gt0' tag`,
191+
},
192+
}
193+
for _, tc := range testCases {
194+
t.Run(tc.desp, func(t *testing.T) {
195+
err := val.GetValidator().Struct(tc.state)
196+
197+
if tc.err != "" {
198+
assert.Error(t, err)
199+
assert.Regexp(t, tc.err, err)
200+
return
201+
}
202+
203+
assert.NoError(t, err)
204+
})
205+
}
206+
}

0 commit comments

Comments
 (0)