Skip to content

Commit b8d4019

Browse files
committed
Add Merger node
1 parent c15cfee commit b8d4019

File tree

7 files changed

+96
-71
lines changed

7 files changed

+96
-71
lines changed

backend/workflow/node/node.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ const (
5757
TypeDelay
5858
TypeExtractor
5959
TypeIf
60+
TypeMerger
6061
)
6162

6263
func FromType(t Type) (Node, error) {
@@ -82,6 +83,8 @@ func FromType(t Type) (Node, error) {
8283
real = NewExtractor()
8384
case TypeIf:
8485
real = NewIf()
86+
case TypeMerger:
87+
real = NewMerger()
8588
default:
8689
return nil, fmt.Errorf("unknown node type: %v", t)
8790
}

backend/workflow/node/node_merger.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package node
2+
3+
import (
4+
"fmt"
5+
6+
"golang.org/x/net/context"
7+
8+
"github.com/ghostsecurity/reaper/backend/workflow/transmission"
9+
)
10+
11+
type MergerNode struct {
12+
*base
13+
noInjections
14+
}
15+
16+
func NewMerger() *MergerNode {
17+
return &MergerNode{
18+
base: newBase(
19+
"Merger",
20+
TypeMerger,
21+
false,
22+
NewVarStorage(
23+
Connectors{
24+
NewConnector("start", transmission.TypeStart, true),
25+
NewConnector("vars_1", transmission.TypeMap, true),
26+
NewConnector("vars_2", transmission.TypeMap, true),
27+
},
28+
Connectors{
29+
NewConnector("output", transmission.TypeMap, true),
30+
},
31+
nil,
32+
),
33+
),
34+
}
35+
}
36+
37+
func (n *MergerNode) Start(ctx context.Context, in <-chan Input, out chan<- OutputInstance, _ chan<- Output) error {
38+
39+
var v1s []map[string]string
40+
var v2s []map[string]string
41+
42+
defer n.setBusy(false)
43+
44+
for {
45+
select {
46+
case <-ctx.Done():
47+
return ctx.Err()
48+
case input, ok := <-in:
49+
if !ok {
50+
return nil
51+
}
52+
if input.Data == nil {
53+
return fmt.Errorf("input is nil")
54+
}
55+
56+
if vm1, err := n.ReadInputMap("vars_1", input.Data); err == nil {
57+
v1s = append(v1s, vm1)
58+
}
59+
if vm2, err := n.ReadInputMap("vars_2", input.Data); err == nil {
60+
v2s = append(v2s, vm2)
61+
}
62+
63+
if len(v1s) == 0 || len(v2s) == 0 {
64+
continue
65+
}
66+
67+
v1 := v1s[0]
68+
v2 := v2s[0]
69+
70+
for k, v := range v2 {
71+
v1[k] = v
72+
}
73+
v1s = v1s[1:]
74+
v2s = v2s[1:]
75+
76+
n.tryOut(ctx, out, OutputInstance{
77+
OutputName: "output",
78+
Complete: input.Last,
79+
Data: transmission.NewMap(v1),
80+
})
81+
}
82+
}
83+
}

frontend/package-lock.json

Lines changed: 3 additions & 48 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

frontend/package.json.md5

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
cb5e926b7a528855b91ab347cdea29f0
1+
b963a08cc62fff622468bc9a35f804f3

frontend/src/components/Workflow/WorkflowEditor.vue

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const availableNodeTypes = ref(<NodeType[]>[
4242
NodeType.DELAY,
4343
NodeType.EXTRACTOR,
4444
NodeType.IF,
45+
NodeType.MERGER,
4546
])
4647
4748
const linkColour = '#444444'

frontend/src/lib/Workflows.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export const enum NodeType {
1010
DELAY = 8,
1111
EXTRACTOR = 9,
1212
IF = 10,
13+
MERGER = 11,
1314
}
1415

1516
export const enum ParentType {
@@ -54,6 +55,8 @@ export function NodeTypeName(t: NodeType): string {
5455
return 'Extractor'
5556
case NodeType.IF:
5657
return 'If'
58+
case NodeType.MERGER:
59+
return 'Merger'
5760
default:
5861
return `Unknown (${t})`
5962
}

0 commit comments

Comments
 (0)