Skip to content

Commit cecfeb2

Browse files
0x2b3bfa0casperdcl
andauthored
task: rename workdir.input => storage.workdir & make output a subpath (#435)
Co-authored-by: Casper da Costa-Luis <casper.dcl@physics.org>
1 parent b47f801 commit cecfeb2

File tree

8 files changed

+64
-67
lines changed

8 files changed

+64
-67
lines changed

iterative/resource_task.go

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"io"
87
"os"
98
"strings"
109
"time"
@@ -109,12 +108,12 @@ func resourceTask() *schema.Resource {
109108
ForceNew: true,
110109
Required: true,
111110
},
112-
"workdir": {
111+
"storage": {
113112
Optional: true,
114113
Type: schema.TypeSet,
115114
Elem: &schema.Resource{
116115
Schema: map[string]*schema.Schema{
117-
"input": {
116+
"workdir": {
118117
Type: schema.TypeString,
119118
ForceNew: true,
120119
Optional: true,
@@ -289,14 +288,10 @@ func resourceTaskBuild(ctx context.Context, d *schema.ResourceData, m interface{
289288

290289
directory := ""
291290
directory_out := ""
292-
if d.Get("workdir").(*schema.Set).Len() > 0 {
293-
storage := d.Get("workdir").(*schema.Set).List()[0].(map[string]interface{})
294-
directory = storage["input"].(string)
295-
291+
if d.Get("storage").(*schema.Set).Len() > 0 {
292+
storage := d.Get("storage").(*schema.Set).List()[0].(map[string]interface{})
293+
directory = storage["workdir"].(string)
296294
directory_out = storage["output"].(string)
297-
if directory_out != "" && !isOutputValid(directory_out) {
298-
return nil, errors.New("output directory " + directory_out + " is not empty!")
299-
}
300295
}
301296

302297
t := common.Task{
@@ -346,17 +341,3 @@ func diagnostic(diags diag.Diagnostics, err error, severity diag.Severity) diag.
346341
Summary: err.Error(),
347342
})
348343
}
349-
350-
func isOutputValid(path string) bool {
351-
f, err := os.Open(path)
352-
if err != nil {
353-
return true
354-
}
355-
defer f.Close()
356-
357-
_, err = f.Readdir(1)
358-
if err == io.EOF {
359-
return true
360-
}
361-
return false
362-
}

task/aws/task.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func (t *Task) Delete(ctx context.Context) error {
198198
logrus.Debug("Downloading Directory...")
199199
if t.Read(ctx) == nil {
200200
if t.Attributes.Environment.DirectoryOut != "" {
201-
if err := t.Pull(ctx, t.Attributes.Environment.DirectoryOut); err != nil && err != common.NotFoundError {
201+
if err := t.Pull(ctx, t.Attributes.Environment.Directory, t.Attributes.Environment.DirectoryOut); err != nil && err != common.NotFoundError {
202202
return err
203203
}
204204
}
@@ -243,20 +243,20 @@ func (t *Task) Logs(ctx context.Context) ([]string, error) {
243243
return machine.Logs(ctx, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"])
244244
}
245245

246-
func (t *Task) Pull(ctx context.Context, destination string) error {
246+
func (t *Task) Pull(ctx context.Context, destination, include string) error {
247247
if err := t.Read(ctx); err != nil {
248248
return err
249249
}
250250

251-
return machine.Transfer(ctx, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"]+"/data", destination)
251+
return machine.Transfer(ctx, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"]+"/data", destination, include)
252252
}
253253

254254
func (t *Task) Push(ctx context.Context, source string) error {
255255
if err := t.Read(ctx); err != nil {
256256
return err
257257
}
258258

259-
return machine.Transfer(ctx, source, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"]+"/data")
259+
return machine.Transfer(ctx, source, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"]+"/data", "**")
260260
}
261261

262262
func (t *Task) Start(ctx context.Context) error {

task/az/task.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func (t *Task) Delete(ctx context.Context) error {
187187
logrus.Debug("Downloading Directory...")
188188
if t.Read(ctx) == nil {
189189
if t.Attributes.Environment.DirectoryOut != "" {
190-
if err := t.Pull(ctx, t.Attributes.Environment.DirectoryOut); err != nil && err != common.NotFoundError {
190+
if err := t.Pull(ctx, t.Attributes.Environment.Directory, t.Attributes.Environment.DirectoryOut); err != nil && err != common.NotFoundError {
191191
return err
192192
}
193193
}
@@ -236,20 +236,20 @@ func (t *Task) Logs(ctx context.Context) ([]string, error) {
236236
return machine.Logs(ctx, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"])
237237
}
238238

239-
func (t *Task) Pull(ctx context.Context, destination string) error {
239+
func (t *Task) Pull(ctx context.Context, destination, include string) error {
240240
if err := t.Read(ctx); err != nil {
241241
return err
242242
}
243243

244-
return machine.Transfer(ctx, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"]+"/data", destination)
244+
return machine.Transfer(ctx, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"]+"/data", destination, include)
245245
}
246246

247247
func (t *Task) Push(ctx context.Context, source string) error {
248248
if err := t.Read(ctx); err != nil {
249249
return err
250250
}
251251

252-
return machine.Transfer(ctx, source, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"]+"/data")
252+
return machine.Transfer(ctx, source, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"]+"/data", "**")
253253
}
254254

255255
func (t *Task) Start(ctx context.Context) error {

task/common/machine/storage.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
_ "github.com/rclone/rclone/backend/s3"
1616

1717
"github.com/rclone/rclone/fs"
18+
"github.com/rclone/rclone/fs/filter"
1819
"github.com/rclone/rclone/fs/operations"
1920
"github.com/rclone/rclone/fs/sync"
2021

@@ -90,7 +91,25 @@ func Status(ctx context.Context, remote string, initialStatus common.Status) (co
9091
return initialStatus, nil
9192
}
9293

93-
func Transfer(ctx context.Context, source, destination string) error {
94+
func Transfer(ctx context.Context, source, destination string, include string) error {
95+
include = filepath.Clean(include)
96+
if filepath.IsAbs(include) || strings.HasPrefix(include, "../") {
97+
return errors.New("storage.output must be inside storage.workdir")
98+
}
99+
100+
rules := []string{
101+
"+ /" + include,
102+
"+ /" + include + "/**",
103+
"- **",
104+
}
105+
106+
ctx, fi := filter.AddConfig(ctx)
107+
for _, rule := range rules {
108+
if err := fi.AddRule(rule); err != nil {
109+
return err
110+
}
111+
}
112+
94113
sourceFileSystem, err := fs.NewFs(ctx, source)
95114
if err != nil {
96115
return err

task/gcp/task.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ func (t *Task) Delete(ctx context.Context) error {
266266
logrus.Debug("Downloading Directory...")
267267
if t.Read(ctx) == nil {
268268
if t.Attributes.Environment.DirectoryOut != "" {
269-
if err := t.Pull(ctx, t.Attributes.Environment.DirectoryOut); err != nil && err != common.NotFoundError {
269+
if err := t.Pull(ctx, t.Attributes.Environment.Directory, t.Attributes.Environment.DirectoryOut); err != nil && err != common.NotFoundError {
270270
return err
271271
}
272272
}
@@ -323,20 +323,20 @@ func (t *Task) Logs(ctx context.Context) ([]string, error) {
323323
return machine.Logs(ctx, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"])
324324
}
325325

326-
func (t *Task) Pull(ctx context.Context, destination string) error {
326+
func (t *Task) Pull(ctx context.Context, destination, include string) error {
327327
if err := t.Read(ctx); err != nil {
328328
return err
329329
}
330330

331-
return machine.Transfer(ctx, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"]+"/data", destination)
331+
return machine.Transfer(ctx, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"]+"/data", destination, include)
332332
}
333333

334334
func (t *Task) Push(ctx context.Context, source string) error {
335335
if err := t.Read(ctx); err != nil {
336336
return err
337337
}
338338

339-
return machine.Transfer(ctx, source, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"]+"/data")
339+
return machine.Transfer(ctx, source, (*t.DataSources.Credentials.Resource)["RCLONE_REMOTE"]+"/data", "**")
340340
}
341341

342342
func (t *Task) Start(ctx context.Context) error {

task/k8s/task.go

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@ import (
1616

1717
_ "github.com/rclone/rclone/backend/local"
1818

19-
"github.com/rclone/rclone/fs"
20-
"github.com/rclone/rclone/fs/sync"
21-
2219
"terraform-provider-iterative/task/common"
20+
"terraform-provider-iterative/task/common/machine"
2321
"terraform-provider-iterative/task/common/ssh"
2422
"terraform-provider-iterative/task/k8s/client"
2523
"terraform-provider-iterative/task/k8s/resources"
@@ -177,7 +175,7 @@ func (t *Task) Delete(ctx context.Context) error {
177175
return err
178176
}
179177
log.Println("[INFO] Downloading Directory...")
180-
if err := t.Pull(ctx, t.Attributes.DirectoryOut); err != nil {
178+
if err := t.Pull(ctx, t.Attributes.Directory, t.Attributes.DirectoryOut); err != nil {
181179
return err
182180
}
183181

@@ -221,7 +219,7 @@ func (t *Task) Push(ctx context.Context, source string) error {
221219
return copyOptions.Run([]string{source, fmt.Sprintf("%s/%s:%s", t.Client.Namespace, pod, "/directory/directory")})
222220
}
223221

224-
func (t *Task) Pull(ctx context.Context, destination string) error {
222+
func (t *Task) Pull(ctx context.Context, destination, include string) error {
225223
waitSelector := fmt.Sprintf("controller-uid=%s", t.Resources.Job.Resource.GetObjectMeta().GetLabels()["controller-uid"])
226224
pod, err := resources.WaitForPods(ctx, t.Client, 1*time.Second, t.Client.Cloud.Timeouts.Delete, t.Client.Namespace, waitSelector)
227225
if err != nil {
@@ -244,21 +242,7 @@ func (t *Task) Pull(ctx context.Context, destination string) error {
244242
return err
245243
}
246244

247-
sourceFileSystem, err := fs.NewFs(ctx, dir)
248-
if err != nil {
249-
return err
250-
}
251-
252-
destinationFileSystem, err := fs.NewFs(ctx, destination)
253-
if err != nil {
254-
return err
255-
}
256-
257-
if err := sync.CopyDir(ctx, destinationFileSystem, sourceFileSystem, true); err != nil {
258-
return err
259-
}
260-
261-
return nil
245+
return machine.Transfer(ctx, dir, destination, include)
262246
}
263247

264248
func (t *Task) Status(ctx context.Context) (common.Status, error) {

task/task.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type Task interface {
4444
Stop(ctx context.Context) error
4545

4646
Push(ctx context.Context, source string) error
47-
Pull(ctx context.Context, destination string) error
47+
Pull(ctx context.Context, destination, include string) error
4848

4949
Status(ctx context.Context) (common.Status, error)
5050
Events(ctx context.Context) []common.Event

task/task_test.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,14 @@ func TestTask(t *testing.T) {
5252
oldData := gofakeit.UUID()
5353
newData := gofakeit.UUID()
5454

55-
dataDirectory := t.TempDir()
56-
dataFile := filepath.Join(dataDirectory, "data")
55+
baseDirectory := t.TempDir()
56+
cacheDirectory := filepath.Join(baseDirectory, "cache")
57+
outputDirectory := filepath.Join(baseDirectory, "output")
58+
cacheFile := filepath.Join(cacheDirectory, "file")
59+
outputFile := filepath.Join(outputDirectory, "file")
60+
61+
relativeOutputDirectory, err := filepath.Rel(baseDirectory, outputDirectory)
62+
require.Nil(t, err)
5763

5864
cloud := common.Cloud{
5965
Provider: provider,
@@ -76,15 +82,18 @@ func TestTask(t *testing.T) {
7682
Environment: common.Environment{
7783
Image: "ubuntu",
7884
Script: `#!/bin/bash
79-
echo "$ENVIRONMENT_VARIABLE_DATA" | tee --append data
85+
mkdir cache
86+
touch cache/file
87+
mkdir output
88+
echo "$ENVIRONMENT_VARIABLE_DATA" | tee --append output/file
8089
sleep 60
81-
cat data
90+
cat output/file
8291
`,
8392
Variables: map[string]*string{
8493
"ENVIRONMENT_VARIABLE_DATA": &newData,
8594
},
86-
Directory: dataDirectory,
87-
DirectoryOut: dataDirectory,
95+
Directory: baseDirectory,
96+
DirectoryOut: relativeOutputDirectory,
8897
Timeout: 10 * time.Minute,
8998
},
9099
Firewall: common.Firewall{
@@ -106,7 +115,10 @@ func TestTask(t *testing.T) {
106115
return
107116
}
108117

109-
file, err := os.Create(dataFile)
118+
require.Nil(t, os.Mkdir(cacheDirectory, 0777))
119+
require.Nil(t, os.Mkdir(outputDirectory, 0777))
120+
121+
file, err := os.Create(outputFile)
110122
require.Nil(t, err)
111123

112124
_, err = file.WriteString(oldData)
@@ -151,9 +163,10 @@ func TestTask(t *testing.T) {
151163
require.Nil(t, newTask.Delete(ctx))
152164
require.Nil(t, newTask.Delete(ctx))
153165

154-
require.FileExists(t, dataFile)
166+
require.NoFileExists(t, cacheFile)
167+
require.FileExists(t, outputFile)
155168

156-
contents, err := ioutil.ReadFile(dataFile)
169+
contents, err := ioutil.ReadFile(outputFile)
157170
require.Nil(t, err)
158171

159172
require.Contains(t, string(contents), newData)

0 commit comments

Comments
 (0)