-
Notifications
You must be signed in to change notification settings - Fork 28
Staged cable improve resubmission and restart handling #537
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 4 commits
f37c6e6
4cf0e1b
1218e2a
b69b26d
7a399f5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,23 +21,6 @@ | |
from payu.fsops import mkdir_p | ||
|
||
|
||
def deep_update(d_1, d_2): | ||
"""Deep update of namelists.""" | ||
for key, value in d_2.items(): | ||
if isinstance(value, dict): | ||
# Nested struct | ||
if key in d_1: | ||
# If the master namelist contains the key, then recursively | ||
# apply | ||
deep_update(d_1[key], d_2[key]) | ||
else: | ||
# Otherwise just set the value from the patch dict | ||
d_1[key] = value | ||
else: | ||
# Is value, just override | ||
d_1[key] = value | ||
|
||
|
||
class StagedCable(Model): | ||
"""A driver for running staged CABLE spin-up configurations.""" | ||
|
||
|
@@ -47,30 +30,45 @@ def __init__(self, expt, name, config): | |
self.model_type = 'staged_cable' | ||
self.default_exec = 'cable' | ||
|
||
# We want people to be able to use payu during testing, which | ||
# often means additions of new namelists due to new science | ||
# modules. I would like to set | ||
# optional_config_files = glob.glob("*.nml") | ||
# but this feels like a bit of an abuse of feature. | ||
self.config_files = ['stage_config.yaml'] | ||
self.optional_config_files = ['cable.nml', 'cru.nml', | ||
'luc.nml', 'met_names.nml'] | ||
'luc.nml', 'met_names.nml', | ||
'bios.nml'] | ||
|
||
def setup(self): | ||
super(StagedCable, self).setup() | ||
|
||
# Initialise the configuration log | ||
self.configuration_log = {} | ||
|
||
if not os.path.isfile('configuration_log.yaml'): | ||
conf_log_p = os.path.join(self.control_path, 'configuration_log.yaml') | ||
if not os.path.isfile(conf_log_p): | ||
# Build a new configuration log | ||
self._build_new_configuration_log() | ||
else: | ||
# Read the current configuration log | ||
self._read_configuration_log() | ||
|
||
# Now set the number of runs using the configuration_log | ||
remaining_stages = len(self.configuration_log['queued_stages']) | ||
print("Overriding the remaining number of runs according to the " + | ||
"number of queued stages in the configuration log.") | ||
os.environ['PAYU_N_RUNS'] = str(remaining_stages) | ||
# Prepare the namelists for the stage | ||
stage_name = self._get_stage_name() | ||
self._apply_stage_namelists(stage_name) | ||
|
||
# Make the logging directory | ||
mkdir_p(os.path.join(self.work_path, "logs")) | ||
|
||
self._set_current_stage() | ||
|
||
def _build_new_configuration_log(self): | ||
"""Build a new configuration log for the first stage of the run.""" | ||
|
||
stage_conf_p = os.path.join(self.control_path, 'stage_config.yaml') | ||
# Read the stage_config.yaml file | ||
with open('stage_config.yaml', 'r') as stage_conf_f: | ||
with open(stage_conf_p, 'r') as stage_conf_f: | ||
self.stage_config = yaml.safe_load(stage_conf_f) | ||
|
||
# On the first run, we need to read the 'stage_config.yaml' file. | ||
|
@@ -85,9 +83,12 @@ def _build_new_configuration_log(self): | |
|
||
def _read_configuration_log(self): | ||
"""Read the existing configuration log.""" | ||
with open('configuration_log.yaml') as conf_log_file: | ||
conf_log_p = os.path.join(self.control_path, 'configuration_log.yaml') | ||
with open(conf_log_p, 'r') as conf_log_file: | ||
self.configuration_log = yaml.safe_load(conf_log_file) | ||
|
||
print(f"After reading configuration_log: {self.configuration_log}") | ||
|
||
def _prepare_configuration(self): | ||
"""Prepare the stages in the CABLE configuration.""" | ||
|
||
|
@@ -127,55 +128,6 @@ def _prepare_configuration(self): | |
# Finish handling of single step stage | ||
return cable_stages | ||
|
||
def setup(self): | ||
super(StagedCable, self).setup() | ||
|
||
# Prepare the namelists for the stage | ||
stage_name = self._get_stage_name() | ||
self._apply_stage_namelists(stage_name) | ||
|
||
# Make the logging directory | ||
mkdir_p(os.path.join(self.work_path, "logs")) | ||
|
||
# Get the additional restarts from older restart dirs | ||
self._get_further_restarts() | ||
|
||
# Make necessary adjustments to the configuration log | ||
self._handle_configuration_log_setup() | ||
|
||
def _get_further_restarts(self): | ||
"""Get the restarts from stages further in the past where necessary.""" | ||
|
||
# Often we take restarts from runs which are not the most recent run as | ||
# inputs for particular science modules, which means we have to extend | ||
# the existing functionality around retrieving restarts. | ||
|
||
# We can't supercede the parent get_prior_restart_files, since the | ||
# files returned by said function are prepended by | ||
# self.prior_restart_path, which is not desirable in this instance. | ||
|
||
num_completed_stages = len(self.configuration_log['completed_stages']) | ||
|
||
for stage_number in reversed(range(num_completed_stages - 1)): | ||
respath = os.path.join( | ||
self.expt.archive_path, | ||
f'restart{stage_number:03d}' | ||
) | ||
for f_name in os.listdir(respath): | ||
if os.path.isfile(os.path.join(respath, f_name)): | ||
f_orig = os.path.join(respath, f_name) | ||
f_link = os.path.join(self.work_init_path_local, f_name) | ||
# Check whether a given link already exists in the | ||
# manifest, so we don't write over a newer version of a | ||
# restart | ||
if f_link not in self.expt.manifest.manifests['restart']: | ||
self.expt.manifest.add_filepath( | ||
'restart', | ||
f_link, | ||
f_orig, | ||
self.copy_restarts | ||
) | ||
|
||
def set_model_pathnames(self): | ||
super(StagedCable, self).set_model_pathnames() | ||
|
||
|
@@ -238,52 +190,85 @@ def _apply_stage_namelists(self, stage_name): | |
# Instance where there is only a stage namelist | ||
shutil.copy(stage_nml, write_target) | ||
|
||
def _handle_configuration_log_setup(self): | ||
"""Make appropriate adjustments to the configuration log to reflect | ||
that the setup of the stage is complete.""" | ||
def _set_current_stage(self): | ||
"""Move the stage at the front of the queue into the current stage | ||
slot, then copy the configuration log to the working directory.""" | ||
|
||
if self.configuration_log['current_stage'] != '': | ||
# If the current stage is a non-empty string, it means we exited | ||
# during the running of the previous stage- leave as is | ||
stage_name = self.configuration_log['current_stage'] | ||
else: | ||
# Normal case where we just archived a successful stage. | ||
self.configuration_log['current_stage'] = \ | ||
self.configuration_log['queued_stages'].pop(0) | ||
self.configuration_log['current_stage'] = \ | ||
self.configuration_log['queued_stages'].pop(0) | ||
|
||
self._save_configuration_log() | ||
|
||
# Copy the log to the work directory | ||
shutil.copy('configuration_log.yaml', self.work_input_path) | ||
conf_log_p = os.path.join(self.control_path, 'configuration_log.yaml') | ||
shutil.copy(conf_log_p, self.work_path) | ||
|
||
def archive(self): | ||
"""Store model output to laboratory archive and update the | ||
configuration log.""" | ||
|
||
# Move files from the restart directory within work to the archive | ||
# restart directory. | ||
# Retrieve all the restarts required for the next stage | ||
self._collect_restarts() | ||
|
||
# Update the configuration log and save it to the working directory | ||
self._read_configuration_log() | ||
self._archive_current_stage() | ||
|
||
# Now set the number of runs using the configuration_log | ||
remaining_stages = len(self.configuration_log['queued_stages']) | ||
print("Overriding the remaining number of runs according to the " + | ||
"number of queued stages in the configuration log.") | ||
self.expt.n_runs = remaining_stages | ||
|
||
conf_log_p = os.path.join(self.control_path, 'configuration_log.yaml') | ||
if self.expt.n_runs == 0: | ||
# Configuration successfully completed | ||
os.remove(conf_log_p) | ||
|
||
super(StagedCable, self).archive() | ||
|
||
def _collect_restarts(self): | ||
"""Collect all restart files required for the next stage. This is a | ||
merge of the files in work_path/restart and in prior_restart_path, with | ||
the files in work_path/restart taking precedence.""" | ||
|
||
# First, collect restarts which do not have a newer version (when the | ||
# counter is greater than 0) | ||
if self.expt.counter > 0: | ||
prior_restart_dir = 'restart{0:03}'.format(self.expt.counter - 1) | ||
prior_restart_path = os.path.join(self.expt.archive_path, | ||
prior_restart_dir) | ||
|
||
# For each restart, check if newer version was created. If not, | ||
# copy into the work restart path. | ||
generated_restarts = os.listdir(self.work_restart_path) | ||
|
||
for f in os.listdir(prior_restart_path): | ||
if f not in generated_restarts: | ||
shutil.copy(os.path.join(prior_restart_path, f), | ||
self.work_restart_path) | ||
|
||
Comment on lines
+243
to
+245
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it should be possible to just make links, which will resolve to their original location IIRC. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about the scenario where we want to share a restart directory with someone else, or perhaps even publicly somewhere, so that they can use it as a start point for simulations? I think it's likely enough that the original run would be somewhere that others won't have read permissions to, or even potentially on other machines. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't looked into the code changes but I don't see how making a copy solves problems of read permissions? You need read access to be able to copy files. So links would just be as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm thinking of a situation where someone (e.g. ACCESS-NRI) has run an expensive spin-up configuration and want to provide the restarts as a starting point for others in the community. The configuration may have 20+ stages, but the full suite of restarts may contain a restart from the 1st stage, one from the 5th stage and the rest from the most recent stage. A supported A scenario where I run a configuration to create a starting point for community science. I'll likely save the data to my non-permanent scratch space. I want to take the most recent There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can use |
||
# Move the files in work_path/restart first | ||
for f in os.listdir(self.work_restart_path): | ||
shutil.move(os.path.join(self.work_restart_path, f), | ||
self.restart_path) | ||
os.rmdir(self.work_restart_path) | ||
|
||
# Update the configuration log and save it to the working directory | ||
completed_stage = self.configuration_log['current_stage'] | ||
self.configuration_log['current_stage'] = '' | ||
self.configuration_log['completed_stages'].append(completed_stage) | ||
def _archive_current_stage(self): | ||
"""Move the current stage to the list of completed stages.""" | ||
self.configuration_log['completed_stages'].append( | ||
self.configuration_log['current_stage']) | ||
|
||
self.configuration_log['current_stage'] = '' | ||
self._save_configuration_log() | ||
|
||
if len(self.configuration_log["queued_stages"]) == 0: | ||
# Configuration successfully completed | ||
os.remove('configuration_log.yaml') | ||
|
||
super(StagedCable, self).archive() | ||
# Copy the configuration log to the restart directory for shareability | ||
conf_log_p = os.path.join(self.control_path, 'configuration_log.yaml') | ||
shutil.copy(conf_log_p, self.restart_path) | ||
|
||
def collate(self): | ||
pass | ||
|
||
def _save_configuration_log(self): | ||
"""Write the updated configuration log back to the staging area.""" | ||
with open('configuration_log.yaml', 'w+') as config_log_f: | ||
conf_log_p = os.path.join(self.control_path, 'configuration_log.yaml') | ||
with open(conf_log_p, 'w+') as config_log_f: | ||
yaml.dump(self.configuration_log, config_log_f) |
Uh oh!
There was an error while loading. Please reload this page.