Skip to content

Commit 7db0311

Browse files
authored
fix: update local executor code to include outputs (#626)
1 parent 8a2f07a commit 7db0311

File tree

10 files changed

+700
-4
lines changed

10 files changed

+700
-4
lines changed

executor/local/build.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,11 @@ func (c *client) AssembleBuild(ctx context.Context) error {
237237
// output a new line for readability to stdout
238238
fmt.Fprintln(c.stdout, "")
239239

240+
c.err = c.outputs.create(ctx, c.OutputCtn, (int64(60) * 30))
241+
if c.err != nil {
242+
return fmt.Errorf("unable to create outputs container: %w", c.err)
243+
}
244+
240245
// assemble runtime build just before any containers execute
241246
c.err = c.Runtime.AssembleBuild(ctx, c.pipeline)
242247
if c.err != nil {
@@ -253,6 +258,15 @@ func (c *client) ExecBuild(ctx context.Context) error {
253258
// https://pkg.go.dev/github.com/go-vela/worker/internal/build#Upload
254259
defer func() { build.Upload(c.build, nil, c.err, nil) }()
255260

261+
// output maps for dynamic environment variables captured from volume
262+
var opEnv, maskEnv map[string]string
263+
264+
// execute outputs container
265+
c.err = c.outputs.exec(ctx, c.OutputCtn)
266+
if c.err != nil {
267+
return fmt.Errorf("unable to exec outputs container: %w", c.err)
268+
}
269+
256270
// execute the services for the pipeline
257271
for _, _service := range c.pipeline.Services {
258272
// plan the service
@@ -294,6 +308,22 @@ func (c *client) ExecBuild(ctx context.Context) error {
294308
return fmt.Errorf("unable to plan step: %w", c.err)
295309
}
296310

311+
// poll outputs
312+
opEnv, maskEnv, c.err = c.outputs.poll(ctx, c.OutputCtn)
313+
if c.err != nil {
314+
return fmt.Errorf("unable to exec outputs container: %w", c.err)
315+
}
316+
317+
// merge env from outputs
318+
//
319+
//nolint:errcheck // only errors with empty environment input, which does not matter here
320+
_step.MergeEnv(opEnv)
321+
322+
// merge env from masked outputs
323+
//
324+
//nolint:errcheck // only errors with empty environment input, which does not matter here
325+
_step.MergeEnv(maskEnv)
326+
297327
// execute the step
298328
c.err = c.ExecStep(ctx, _step)
299329
if c.err != nil {
@@ -451,6 +481,12 @@ func (c *client) DestroyBuild(ctx context.Context) error {
451481
}
452482
}
453483

484+
// destroy output container
485+
err = c.outputs.destroy(ctx, c.OutputCtn)
486+
if err != nil {
487+
fmt.Fprintln(c.stdout, "unable to destroy output container:", err)
488+
}
489+
454490
// remove the runtime volume for the pipeline
455491
err = c.Runtime.RemoveVolume(ctx, c.pipeline)
456492
if err != nil {

executor/local/build_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func TestLocal_CreateBuild(t *testing.T) {
7373
WithBuild(_build),
7474
WithPipeline(_pipeline),
7575
WithRuntime(_runtime),
76+
WithOutputCtn(testOutputsCtn()),
7677
)
7778
if err != nil {
7879
t.Errorf("unable to create executor engine: %v", err)
@@ -152,6 +153,7 @@ func TestLocal_PlanBuild(t *testing.T) {
152153
WithBuild(_build),
153154
WithPipeline(_pipeline),
154155
WithRuntime(_runtime),
156+
WithOutputCtn(testOutputsCtn()),
155157
)
156158
if err != nil {
157159
t.Errorf("unable to create executor engine: %v", err)
@@ -270,6 +272,7 @@ func TestLocal_AssembleBuild(t *testing.T) {
270272
WithBuild(_build),
271273
WithPipeline(_pipeline),
272274
WithRuntime(_runtime),
275+
WithOutputCtn(testOutputsCtn()),
273276
withStreamRequests(streamRequests),
274277
)
275278
if err != nil {
@@ -374,6 +377,7 @@ func TestLocal_ExecBuild(t *testing.T) {
374377
WithBuild(_build),
375378
WithPipeline(_pipeline),
376379
WithRuntime(_runtime),
380+
WithOutputCtn(testOutputsCtn()),
377381
withStreamRequests(streamRequests),
378382
)
379383
if err != nil {
@@ -570,6 +574,7 @@ func TestLocal_StreamBuild(t *testing.T) {
570574
WithBuild(_build),
571575
WithPipeline(_pipeline),
572576
WithRuntime(_runtime),
577+
WithOutputCtn(testOutputsCtn()),
573578
withStreamRequests(streamRequests),
574579
)
575580
if err != nil {
@@ -691,6 +696,7 @@ func TestLocal_DestroyBuild(t *testing.T) {
691696
WithBuild(_build),
692697
WithPipeline(_pipeline),
693698
WithRuntime(_runtime),
699+
WithOutputCtn(testOutputsCtn()),
694700
)
695701
if err != nil {
696702
t.Errorf("unable to create executor engine: %v", err)
@@ -718,3 +724,11 @@ func TestLocal_DestroyBuild(t *testing.T) {
718724
})
719725
}
720726
}
727+
728+
func testOutputsCtn() *pipeline.Container {
729+
return &pipeline.Container{
730+
ID: "outputs_test",
731+
Environment: make(map[string]string),
732+
Detach: true,
733+
}
734+
}

executor/local/local.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ import (
1717
type (
1818
// client manages communication with the pipeline resources.
1919
client struct {
20-
Vela *vela.Client
21-
Runtime runtime.Engine
22-
Hostname string
23-
Version string
20+
Vela *vela.Client
21+
Runtime runtime.Engine
22+
Hostname string
23+
Version string
24+
OutputCtn *pipeline.Container
2425

2526
// private fields
2627
init *pipeline.Container
@@ -31,11 +32,17 @@ type (
3132
err error
3233
streamRequests chan message.StreamRequest
3334

35+
outputs *outputSvc
36+
3437
// internal field partially exported for tests
3538
stdout *os.File
3639
mockStdoutReader *os.File
3740
}
3841

42+
svc struct {
43+
client *client
44+
}
45+
3946
// MockedClient is for internal use to facilitate testing the local executor.
4047
MockedClient interface {
4148
MockStdout() *os.File
@@ -77,6 +84,8 @@ func New(opts ...Opt) (*client, error) {
7784
// Add stdout by default
7885
c.stdout = os.Stdout
7986

87+
c.outputs = &outputSvc{client: c}
88+
8089
// instantiate streamRequests channel (which may be overridden using withStreamRequests()).
8190
c.streamRequests = make(chan message.StreamRequest)
8291

executor/local/opts.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,16 @@ func WithMockStdout(mock bool) Opt {
120120
}
121121
}
122122

123+
// WithOutputCtn sets the outputs container in the executor client for Linux.
124+
func WithOutputCtn(ctn *pipeline.Container) Opt {
125+
return func(c *client) error {
126+
// set the outputs container in the client
127+
c.OutputCtn = ctn
128+
129+
return nil
130+
}
131+
}
132+
123133
// withStreamRequests sets the streamRequests channel in the executor client for Linux
124134
// (primarily used for tests).
125135
func withStreamRequests(s chan message.StreamRequest) Opt {

executor/local/outputs.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
3+
package local
4+
5+
import (
6+
"bytes"
7+
"context"
8+
"encoding/base64"
9+
"fmt"
10+
11+
envparse "github.com/hashicorp/go-envparse"
12+
"github.com/sirupsen/logrus"
13+
14+
"github.com/go-vela/server/compiler/types/pipeline"
15+
)
16+
17+
// outputSvc handles communication with the outputs container during the build.
18+
type outputSvc svc
19+
20+
// create configures the outputs container for execution.
21+
func (o *outputSvc) create(ctx context.Context, ctn *pipeline.Container, timeout int64) error {
22+
// exit if outputs container has not been configured
23+
if len(ctn.Image) == 0 {
24+
return nil
25+
}
26+
27+
// Encode script content to Base64
28+
script := base64.StdEncoding.EncodeToString(
29+
[]byte(fmt.Sprintf("mkdir /vela/outputs\nsleep %d\n", timeout)),
30+
)
31+
32+
// set the entrypoint for the ctn
33+
ctn.Entrypoint = []string{"/bin/sh", "-c"}
34+
35+
// set the commands for the ctn
36+
ctn.Commands = []string{"echo $VELA_BUILD_SCRIPT | base64 -d | /bin/sh -e"}
37+
38+
// set the environment variables for the ctn
39+
ctn.Environment["HOME"] = "/root"
40+
ctn.Environment["SHELL"] = "/bin/sh"
41+
ctn.Environment["VELA_BUILD_SCRIPT"] = script
42+
43+
// setup the runtime container
44+
err := o.client.Runtime.SetupContainer(ctx, ctn)
45+
if err != nil {
46+
return err
47+
}
48+
49+
return nil
50+
}
51+
52+
// destroy cleans up outputs container after execution.
53+
func (o *outputSvc) destroy(ctx context.Context, ctn *pipeline.Container) error {
54+
// exit if outputs container has not been configured
55+
if len(ctn.Image) == 0 {
56+
return nil
57+
}
58+
59+
// inspect the runtime container
60+
err := o.client.Runtime.InspectContainer(ctx, ctn)
61+
if err != nil {
62+
return err
63+
}
64+
65+
// remove the runtime container
66+
err = o.client.Runtime.RemoveContainer(ctx, ctn)
67+
if err != nil {
68+
return err
69+
}
70+
71+
return nil
72+
}
73+
74+
// exec runs the outputs sidecar container for a pipeline.
75+
func (o *outputSvc) exec(ctx context.Context, _outputs *pipeline.Container) error {
76+
// exit if outputs container has not been configured
77+
if len(_outputs.Image) == 0 {
78+
return nil
79+
}
80+
81+
// run the runtime container
82+
err := o.client.Runtime.RunContainer(ctx, _outputs, o.client.pipeline)
83+
if err != nil {
84+
return err
85+
}
86+
87+
// inspect the runtime container
88+
err = o.client.Runtime.InspectContainer(ctx, _outputs)
89+
if err != nil {
90+
return err
91+
}
92+
93+
return nil
94+
}
95+
96+
// poll tails the output for sidecar container.
97+
func (o *outputSvc) poll(ctx context.Context, ctn *pipeline.Container) (map[string]string, map[string]string, error) {
98+
// exit if outputs container has not been configured
99+
if len(ctn.Image) == 0 {
100+
return nil, nil, nil
101+
}
102+
103+
// grab outputs
104+
outputBytes, err := o.client.Runtime.PollOutputsContainer(ctx, ctn, "/vela/outputs/.env")
105+
if err != nil {
106+
return nil, nil, err
107+
}
108+
109+
reader := bytes.NewReader(outputBytes)
110+
111+
outputMap, err := envparse.Parse(reader)
112+
if err != nil {
113+
logrus.Debugf("unable to parse output map: %v", err)
114+
}
115+
116+
// grab masked outputs
117+
maskedBytes, err := o.client.Runtime.PollOutputsContainer(ctx, ctn, "/vela/outputs/masked.env")
118+
if err != nil {
119+
return nil, nil, err
120+
}
121+
122+
reader = bytes.NewReader(maskedBytes)
123+
124+
maskMap, err := envparse.Parse(reader)
125+
if err != nil {
126+
logrus.Debugf("unable to parse masked output map: %v", err)
127+
}
128+
129+
return outputMap, maskMap, nil
130+
}

0 commit comments

Comments
 (0)