44
44
import boto3 # type: ignore
45
45
from botocore .exceptions import ClientError , NoCredentialsError , ParamValidationError , WaiterError # type: ignore
46
46
47
+ from dataclasses_json import dataclass_json
48
+ from dataclasses import dataclass , field
49
+ from copy import deepcopy
50
+
47
51
from .util import convert_labels_to_aws_tags , convert_disk_size_to_gb
48
52
from .util import convert_memory_to_mb , UserReportError
49
53
from .util import ElbSupportedPrograms , get_usage_reporting , sanitize_aws_batch_job_name
@@ -110,6 +114,37 @@ def check_cluster(cfg: ElasticBlastConfig) -> bool:
110
114
return False
111
115
112
116
117
+ @dataclass_json
118
+ @dataclass
119
+ class JobIds :
120
+ """Serializable store of AWS Batch job ids for query splitting, cloud job
121
+ submission and BLAST searches"""
122
+ query_splitting : str = ''
123
+ job_submission : str = ''
124
+ search : List [str ] = field (default_factory = list )
125
+
126
+ def __bool__ (self ):
127
+ """Boolean value of the object: True if at least one job id is set"""
128
+ return bool (self .query_splitting ) or bool (self .job_submission ) or bool (self .search )
129
+
130
+ def merge (self , obj ):
131
+ """Merge another JobIds object into self"""
132
+ if not self .query_splitting and obj .query_splitting :
133
+ self .query_splitting = obj .query_splitting
134
+ if not self .job_submission and obj .job_submission :
135
+ self .job_submission = obj .job_submission
136
+ self .search = list (set (self .search + obj .search ))
137
+
138
+ def to_list (self ) -> List [str ]:
139
+ """Return all jobs ids as a list"""
140
+ id_list = [job for job in self .search ]
141
+ if self .query_splitting :
142
+ id_list .append (self .query_splitting )
143
+ if self .job_submission :
144
+ id_list .append (self .job_submission )
145
+ return id_list
146
+
147
+
113
148
class ElasticBlastAws (ElasticBlast ):
114
149
""" Implementation of core ElasticBLAST functionality in AWS.
115
150
Uses a CloudFormation template and AWS Batch for its main operation.
@@ -146,8 +181,7 @@ def _init(self, cfg: ElasticBlastConfig, create: bool):
146
181
self .subnets = None
147
182
self ._provide_subnets ()
148
183
self .cf_stack = None
149
- self .job_ids : List [str ] = []
150
- self .qs_job_id = None
184
+ self .job_ids = JobIds ()
151
185
152
186
initialized = True
153
187
@@ -639,7 +673,7 @@ def cloud_query_split(self, query_files: List[str]) -> None:
639
673
jobName = jname ,
640
674
parameters = parameters ,
641
675
containerOverrides = overrides )
642
- self .qs_job_id = job ['jobId' ]
676
+ self .job_ids . query_splitting = job ['jobId' ]
643
677
logging .info (f"Submitted AWS Batch job { job ['jobId' ]} to split query { query_files [0 ]} " )
644
678
self .upload_job_ids ()
645
679
else :
@@ -653,15 +687,15 @@ def wait_for_cloud_query_split(self) -> None:
653
687
"""
654
688
if self .dry_run :
655
689
return
656
- if not self .qs_job_id :
690
+ if not self .job_ids . query_splitting :
657
691
msg = 'Query splitting job was not submitted!'
658
692
logging .fatal (msg )
659
693
raise RuntimeError (msg )
660
694
661
695
while True :
662
- job_batch = self .batch .describe_jobs (jobs = [self .qs_job_id ])['jobs' ]
696
+ job_batch = self .batch .describe_jobs (jobs = [self .job_ids . query_splitting ])['jobs' ]
663
697
job_status = job_batch [0 ]['status' ]
664
- logging .debug (f'Query splitting job status { job_status } for { self .qs_job_id } ' )
698
+ logging .debug (f'Query splitting job status { job_status } for { self .job_ids . query_splitting } ' )
665
699
if job_status == 'SUCCEEDED' :
666
700
break
667
701
if job_status == 'FAILED' :
@@ -674,7 +708,7 @@ def wait_for_cloud_query_split(self) -> None:
674
708
for k in ['exitCode' , 'reason' ]:
675
709
if k in container :
676
710
failure_details += f'Container{ k [0 ].upper ()+ k [1 :]} : { container [k ]} '
677
- msg = f'Query splitting on the cloud failed (jobId={ self .qs_job_id } )'
711
+ msg = f'Query splitting on the cloud failed (jobId={ self .job_ids . query_splitting } )'
678
712
if failure_details : msg += failure_details
679
713
logging .fatal (msg )
680
714
raise UserReportError (CLUSTER_ERROR , msg )
@@ -736,11 +770,11 @@ def _cloud_submit(self) -> None:
736
770
"parameters" : parameters ,
737
771
"containerOverrides" : overrides
738
772
}
739
- if self .qs_job_id :
740
- submit_job_args ["dependsOn" ] = [{'jobId' : self .qs_job_id }]
773
+ if self .job_ids . query_splitting :
774
+ submit_job_args ["dependsOn" ] = [{'jobId' : self .job_ids . query_splitting }]
741
775
job = self .batch .submit_job (** submit_job_args )
742
776
logging .info (f'Submitted AWS Batch job { job ["jobId" ]} to submit search jobs' )
743
- self .job_ids .append ( job ['jobId' ])
777
+ self .job_ids .job_submission = job ['jobId' ]
744
778
self .upload_job_ids ()
745
779
746
780
@@ -751,8 +785,6 @@ def client_submit(self, query_batches: List[str], one_stage_cloud_query_split: b
751
785
query_batches - list of bucket names of queries to submit
752
786
one_stage_cloud_query_split - do the query split in the cloud as a part
753
787
of executing a regular job """
754
- self .job_ids = []
755
-
756
788
prog = self .cfg .blast .program
757
789
758
790
if self .cfg .cluster .db_source != DBSource .AWS :
@@ -829,10 +861,10 @@ def is_int(value: str):
829
861
"parameters" : parameters ,
830
862
"containerOverrides" : overrides
831
863
}
832
- if self .qs_job_id :
833
- submit_job_args ["dependsOn" ] = [{'jobId' : self .qs_job_id }]
864
+ if self .job_ids . query_splitting :
865
+ submit_job_args ["dependsOn" ] = [{'jobId' : self .job_ids . query_splitting }]
834
866
job = self .batch .submit_job (** submit_job_args )
835
- self .job_ids .append (job ['jobId' ])
867
+ self .job_ids .search . append (job ['jobId' ])
836
868
logging .debug (f"Job definition parameters for job { job ['jobId' ]} { parameters } " )
837
869
logging .info (f"Submitted AWS Batch job { job ['jobId' ]} with query { q } " )
838
870
else :
@@ -873,15 +905,23 @@ def get_job_ids(self) -> List[str]:
873
905
def upload_job_ids (self ) -> None :
874
906
"""Save AWS Batch job ids in a metadata file in S3, if the metadata
875
907
file is present, append job ids"""
908
+ current_job_ids = deepcopy (self .job_ids )
876
909
self ._load_job_ids_from_aws ()
910
+ current_job_ids .merge (self .job_ids )
911
+ self .job_ids = current_job_ids
912
+
877
913
bucket_name , key = parse_bucket_name_key (f'{ self .results_bucket } /{ ELB_METADATA_DIR } /{ ELB_AWS_JOB_IDS } ' )
878
914
bucket = self .s3 .Bucket (bucket_name )
879
- job_ids = self .job_ids
880
- if self .qs_job_id :
881
- job_ids .append (self .qs_job_id )
882
- job_ids = list (set (job_ids ))
883
- bucket .put_object (Body = json .dumps (job_ids ).encode (), Key = key )
884
- logging .debug (f'Uploaded { len (job_ids )} job IDs to { self .results_bucket } /{ ELB_METADATA_DIR } /{ ELB_AWS_JOB_IDS } ' )
915
+ bucket .put_object (Body = self .job_ids .to_json ().encode (), Key = key ) # type: ignore
916
+ logging .debug (f'Uploaded job IDs to { self .results_bucket } /{ ELB_METADATA_DIR } /{ ELB_AWS_JOB_IDS } ' )
917
+
918
+ # This code is needed for janitor backward compatibility in version
919
+ # 0.2.4, and can be removed when the ElasticBLAST janitor is upgraded to version 0.2.4.
920
+ ELB_AWS_OLD_JOB_IDS = 'job-ids.json'
921
+ bucket_name , key = parse_bucket_name_key (f'{ self .results_bucket } /{ ELB_METADATA_DIR } /{ ELB_AWS_OLD_JOB_IDS } ' )
922
+ bucket = self .s3 .Bucket (bucket_name )
923
+ bucket .put_object (Body = json .dumps (self .job_ids .to_list ()).encode (), Key = key )
924
+ logging .debug (f'Uploaded job IDs to { self .results_bucket } /{ ELB_METADATA_DIR } /{ ELB_AWS_OLD_JOB_IDS } ' )
885
925
886
926
887
927
def upload_query_length (self , query_length : int ) -> None :
@@ -920,6 +960,8 @@ def check_status(self, extended=False) -> Tuple[ElbStatus, Dict[str, int], str]:
920
960
elif njobs == 0 :
921
961
# This is likely the case when dry-run is set to True
922
962
retval = ElbStatus .UNKNOWN
963
+ elif (self .job_ids .query_splitting or self .job_ids .job_submission ) and not self .job_ids .search :
964
+ retval = ElbStatus .SUBMITTING
923
965
elif running > 0 or pending > 0 :
924
966
retval = ElbStatus .RUNNING
925
967
elif (pending + running + failed ) == 0 and succeeded == njobs :
@@ -941,8 +983,8 @@ def _load_job_ids_from_aws(self):
941
983
try :
942
984
bucket .download_file (key , tmp .name )
943
985
with open (tmp .name ) as f_ids :
944
- self . job_ids += json . load (f_ids )
945
- self .job_ids = list ( set ( self . job_ids ) )
986
+ new_job_ids = JobIds . from_json (f_ids . read () )
987
+ self .job_ids . merge ( new_job_ids )
946
988
except ClientError as err :
947
989
err_code = err .response ['Error' ]['Code' ]
948
990
fnx_name = inspect .stack ()[0 ].function
@@ -965,11 +1007,12 @@ def _check_status(self, extended) -> Tuple[Dict[str, int], str]:
965
1007
966
1008
if not self .job_ids :
967
1009
self ._load_job_ids_from_aws ()
1010
+ job_ids = self .job_ids .to_list ()
968
1011
969
1012
# check status of jobs in batches of JOB_BATCH_NUM
970
1013
JOB_BATCH_NUM = 100
971
- for i in range (0 , len (self . job_ids ), JOB_BATCH_NUM ):
972
- job_batch = self .batch .describe_jobs (jobs = self . job_ids [i :i + JOB_BATCH_NUM ])['jobs' ]
1014
+ for i in range (0 , len (job_ids ), JOB_BATCH_NUM ):
1015
+ job_batch = self .batch .describe_jobs (jobs = job_ids [i :i + JOB_BATCH_NUM ])['jobs' ]
973
1016
# get number for AWS Batch job states
974
1017
for st in AWS_BATCH_JOB_STATES :
975
1018
counts [st ] += sum ([j ['status' ] == st for j in job_batch ])
0 commit comments