Replies: 1 comment
-
Yes, it's possible. See this example: apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: fan-out-in-params-workflow
namespace: argo
spec:
templates:
- name: fan-out-in-params-workflow
inputs: {}
outputs: {}
metadata: {}
dag:
tasks:
- name: generate
template: generate-artifacts
arguments: {}
- name: fan-out-print
template: fan-out-print
arguments:
parameters:
- name: batch
value: '{{item}}' # Each item is a batch of 20 files
artifacts:
- name: input
from: '{{tasks.generate.outputs.artifacts.fan-out-artifacts}}'
dependencies:
- generate # Ensures this runs after the generate step
withParam: '{{tasks.generate.outputs.result}}' # Iterates over the batches
- name: fan-in
template: fan-in
arguments: {}
dependencies:
- fan-out-print # Ensures this runs after all fan-out tasks complete
- name: generate-artifacts
inputs: {}
outputs:
artifacts:
- name: fan-out-artifacts
path: /tmp/
s3:
key: fanout-{{workflow.name}}/
archive:
none: {} # Prevents compression of artifacts
metadata: {}
script:
name: ''
image: python:alpine3.6
command:
- python
resources: {}
source: |
import json
import sys
import os
files = []
batches = []
# Generate 100 files
for i in range(100):
filename = f'file{i}.txt'
files.append(filename)
with open(os.path.join('tmp', filename), 'w') as f:
f.write(f'hello {i}')
# Group files into batches of 20
for i in range(0, len(files), 20):
batches.append(files[i:i+20])
# Output batches as JSON (this becomes the withParam input)
json.dump(batches, sys.stdout)
- name: fan-out-print
inputs:
parameters:
- name: batch
artifacts:
- name: input
path: /tmp/input
outputs: {}
metadata: {}
script:
name: ''
image: python:alpine3.6
command:
- python
resources: {}
source: |
import json
batch = json.loads('{{inputs.parameters.batch}}')
for file in batch:
with open(f'/tmp/input/{file}', 'r') as f:
filecont = f.read()
print(f'File: {file}, Content: {filecont}') # Prints the file name and content
- name: fan-in
inputs:
artifacts:
- name: artifact-files
path: /tmp
s3:
key: fanout-{{workflow.name}}
outputs: {}
metadata: {}
container:
name: ''
image: alpine:latest
command:
- sh
- '-c'
args:
- ls /tmp # Lists all files in the artifact directory
resources: {}
entrypoint: fan-out-in-params-workflow
arguments: {} This workflow demonstrates the desired fan-out pattern:
References |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
Suppose I have 100 artifacts to handle: /myproject/data/[0-99]
If I use the method introduced by this article
Then there will be 100 tasks to submit to cluster.
What I want is to fan out 5 tasks and each of the task handling 20 artifacts.
Is it possible to implement such fan-out pattern?
Beta Was this translation helpful? Give feedback.
All reactions