Skip to content

Commit d00e989

Browse files
authored
use run_remotely to set PythonJob (#84)
When adding a new node, one can set `run_remotely=True` to ask the engine to run the function on a remote computer. In the inputs, user can set `"computer": "merlin6"` to tell which remote computer to run this job, the engine will try to load `python3@{computer}` code, it will create a new one if not exit. One can also set the `code_label`, `python_path`, `prepend_text` to specific the code. In the `metadata.custom_scheduler_commands`, one can add commands to load modules and activate the conda environment and so on.
1 parent 335c1d7 commit d00e989

File tree

8 files changed

+515
-441
lines changed

8 files changed

+515
-441
lines changed

aiida_workgraph/calculations/python.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
List,
1515
FolderData,
1616
RemoteData,
17+
to_aiida_type,
1718
)
1819

1920

@@ -45,13 +46,21 @@ def define(cls, spec: CalcJobProcessSpec) -> None: # type: ignore[override]
4546
:param spec: the calculation job process spec to define.
4647
"""
4748
super().define(spec)
48-
spec.input("function_source_code", required=False)
49-
spec.input("function_name", required=False)
49+
spec.input(
50+
"function_source_code",
51+
valid_type=Str,
52+
serializer=to_aiida_type,
53+
required=False,
54+
)
55+
spec.input(
56+
"function_name", valid_type=Str, serializer=to_aiida_type, required=False
57+
)
5058
spec.input_namespace("kwargs", valid_type=Data, required=False)
5159
spec.input(
5260
"output_name_list",
5361
valid_type=List,
5462
required=False,
63+
serializer=to_aiida_type,
5564
help="The names of the output ports",
5665
)
5766
spec.input(
@@ -62,16 +71,18 @@ def define(cls, spec: CalcJobProcessSpec) -> None: # type: ignore[override]
6271
)
6372
spec.input(
6473
"parent_output_folder",
65-
valid_type=(Str),
74+
valid_type=Str,
6675
default=None,
6776
required=False,
77+
serializer=to_aiida_type,
6878
help="Name of the subfolder inside 'parent_folder' from which you want to copy the files",
6979
)
7080
spec.input(
7181
"additional_retrieve_list",
7282
valid_type=List,
7383
default=None,
7484
required=False,
85+
serializer=to_aiida_type,
7586
help="The names of the files to retrieve",
7687
)
7788
spec.outputs.dynamic = True

aiida_workgraph/collection.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ def new(
2424
# build the node on the fly if the identifier is a callable
2525
if callable(identifier):
2626
identifier = build_node_from_callable(identifier)
27+
if kwargs.pop("run_remotely", False):
28+
# this is a PythonJob
29+
identifier = build_PythonJob_node(identifier)
2730
if isinstance(identifier, str) and identifier.upper() == "PYTHONJOB":
2831
# copy the inputs and outputs from the function node to the PythonJob node
2932
identifier = build_PythonJob_node(kwargs.pop("function"))

aiida_workgraph/decorator.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,13 +236,22 @@ def build_node_from_AiiDA(
236236
def build_PythonJob_node(func: Callable) -> Node:
237237
"""Build PythonJob node from function."""
238238
from aiida_workgraph.calculations.python import PythonJob
239+
from copy import deepcopy
239240

240241
ndata = {"executor": PythonJob, "node_type": "CALCJOB"}
241242
_, ndata_py = build_node_from_AiiDA(ndata)
242-
ndata = func.ndata
243+
ndata = deepcopy(func.ndata)
243244
# merge the inputs and outputs from the PythonJob node to the function node
244245
# skip the already existed inputs and outputs
245246
inputs = ndata["inputs"]
247+
inputs.extend(
248+
[
249+
["String", "computer"],
250+
["String", "code_label"],
251+
["String", "code_path"],
252+
["String", "prepend_text"],
253+
]
254+
)
246255
outputs = ndata["outputs"]
247256
for input in ndata_py["inputs"]:
248257
if input not in inputs:
@@ -252,6 +261,7 @@ def build_PythonJob_node(func: Callable) -> Node:
252261
outputs.append(output)
253262
# append the kwargs of the PythonJob node to the function node
254263
kwargs = ndata["kwargs"]
264+
kwargs.extend(["computer", "code_label", "code_path", "prepend_text"])
255265
kwargs.extend(ndata_py["kwargs"])
256266
ndata["inputs"] = inputs
257267
ndata["outputs"] = outputs

aiida_workgraph/engine/workgraph.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -826,10 +826,23 @@ def run_nodes(self, names: t.List[str], continue_workgraph: bool = True) -> None
826826
elif node["metadata"]["node_type"].upper() in ["PYTHONJOB"]:
827827
from aiida_workgraph.calculations.python import PythonJob
828828
from aiida_workgraph.calculations.general_data import GeneralData
829+
from aiida_workgraph.utils import get_or_create_code
829830

830831
print("node type: Python.")
831832
# normal function does not have a process
832-
code = kwargs.pop("code")
833+
code = kwargs.pop("code", None)
834+
computer = kwargs.pop("computer", None)
835+
code_label = kwargs.pop("code_label", None)
836+
code_path = kwargs.pop("code_path", None)
837+
prepend_text = kwargs.pop("prepend_text", None)
838+
#
839+
if code is None:
840+
code = get_or_create_code(
841+
computer=computer or "localhost",
842+
code_label=code_label or "python3",
843+
code_path=code_path,
844+
prepend_text=prepend_text,
845+
)
833846
parent_folder = kwargs.pop("parent_folder", None)
834847
metadata = kwargs.pop("metadata", {})
835848
metadata.update({"call_link_label": name})
@@ -847,7 +860,6 @@ def run_nodes(self, names: t.List[str], continue_workgraph: bool = True) -> None
847860
inputs[key] = value
848861
else:
849862
inputs[key] = GeneralData(value)
850-
print("inputs: ", inputs)
851863
# outputs
852864
output_name_list = [output["name"] for output in node["outputs"]]
853865

aiida_workgraph/utils/__init__.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import Any, Dict, Optional, Union
22
from aiida.engine.processes import Process
33
from aiida import orm
4+
from aiida.common.exceptions import NotExistent
45

56

67
def get_executor(data: Dict[str, Any]) -> Union[Process, Any]:
@@ -236,3 +237,31 @@ def get_processes_latest(pk: int) -> Dict[str, Dict[str, Union[int, str]]]:
236237
"mtime": nodes.mtime,
237238
}
238239
return nodes
240+
241+
242+
def get_or_create_code(
243+
computer: str = "localhost",
244+
code_label: str = "python3",
245+
code_path: str = None,
246+
prepend_text: str = "",
247+
):
248+
"""Try to load code, create if not exit."""
249+
from aiida.orm.nodes.data.code.installed import InstalledCode
250+
251+
try:
252+
return orm.load_code(f"{code_label}@{computer}")
253+
except NotExistent:
254+
description = f"Python code on computer: {computer}"
255+
computer = orm.load_computer(computer)
256+
code_path = code_path or code_label
257+
code = InstalledCode(
258+
computer=computer,
259+
label=code_label,
260+
description=description,
261+
filepath_executable=code_path,
262+
default_calc_job_plugin="workgraph.python",
263+
prepend_text=prepend_text,
264+
)
265+
266+
code.store()
267+
return code

docs/source/built-in/aiida_python.ipynb

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,13 @@
6565
" return x*y\n",
6666
"\n",
6767
"wg = WorkGraph(\"first_workflow\")\n",
68-
"wg.nodes.new(\"PythonJob\", function=add, name=\"add\")\n",
69-
"wg.nodes.new(\"PythonJob\", function=multiply, name=\"multiply\", x=wg.nodes[\"add\"].outputs[0])\n",
68+
"wg.nodes.new(add, name=\"add\", run_remotely=True)\n",
69+
"wg.nodes.new(multiply, name=\"multiply\", x=wg.nodes[\"add\"].outputs[0], run_remotely=True)\n",
7070
"\n",
71-
"code = load_code(\"python@localhost\")\n",
7271
"\n",
7372
"#------------------------- Submit the calculation -------------------\n",
74-
"wg.submit(inputs = {\"add\": {\"x\": 2, \"y\": 3, \"code\": code},\n",
75-
" \"multiply\": {\"y\": 4, \"code\": code}},\n",
73+
"wg.submit(inputs = {\"add\": {\"x\": 2, \"y\": 3, \"computer\": \"localhost\"},\n",
74+
" \"multiply\": {\"y\": 4, \"computer\": \"localhost\"}},\n",
7675
" wait=True)\n",
7776
"print(\"\\nResult of multiply is {} \\n\\n\".format(wg.nodes[\"multiply\"].outputs['result'].value))\n",
7877
"```\n",

0 commit comments

Comments
 (0)