Skip to content

Commit 99d0d74

Browse files
Fjodor van RijsselbergFjodor van Rijsselberg
authored andcommitted
The import for the prefect state changes, updated all uses.
1 parent b22564e commit 99d0d74

File tree

8 files changed

+20
-15
lines changed

8 files changed

+20
-15
lines changed

scripts/configuration/odissei_settings.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ CBS_TEMPLATE_FILE_PATH="/app/resources/templates/cbs_dataverse_template.json"
7272
CBS_MAPPING_FILE_PATH="/app/resources/mappings/cbs-mapping.json"
7373

7474
LISS_TEMPLATE_FILE_PATH="/app/resources/templates/liss_dataverse_template.json"
75-
LISS_MAPPING_FILE_PATH="/resources/mappings/liss-mapping.json"
75+
LISS_MAPPING_FILE_PATH="/app/resources/mappings/liss-mapping.json"
7676

7777
CID_TEMPLATE_FILE_PATH="/app/resources/templates/cid_dataverse_template.json"
7878
CID_MAPPING_FILE_PATH="/app/resources/mappings/cid-mapping.json"

scripts/flows/dataset_workflows/cbs_ingestion.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import jmespath
22

33
from prefect import flow
4-
from prefect.server.schemas.states import Completed, Failed
4+
from prefect.states import Completed, Failed
55
from queries import DIST_DATE_QUERY, CBS_ID_QUERY
66
from tasks.base_tasks import xml2json, dataverse_mapper, \
77
dataverse_import, update_publication_date, add_workflow_versioning_url, \

scripts/flows/dataset_workflows/cid_ingestion.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import jmespath
33

44
from prefect import flow, get_run_logger
5-
from prefect.server.schemas.states import Completed, Failed
5+
from prefect.states import Completed, Failed
66
from queries import DIST_DATE_QUERY
77
from tasks.base_tasks import dataverse_mapper, \
88
dataverse_import, update_publication_date, add_workflow_versioning_url, \

scripts/flows/dataset_workflows/dataverse_deletion.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from prefect import flow
2-
from prefect.server.schemas.states import Failed, Completed
2+
from prefect.states import Failed, Completed
33

44
from tasks.base_tasks import dataverse_dataset_check_status, \
55
delete_dataset

scripts/flows/dataset_workflows/dataverse_ingestion.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from prefect import flow
2-
from prefect.server.schemas.states import Failed, Completed
2+
from prefect.states import Failed, Completed
33

44
from tasks.base_tasks import dataverse_metadata_fetcher, dataverse_import, \
55
update_publication_date, add_workflow_versioning_url, refine_metadata, \

scripts/flows/dataset_workflows/liss_ingestion.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import jmespath
33

44
from prefect import flow
5-
from prefect.server.schemas.states import Completed, Failed
5+
from prefect.states import Completed, Failed
66
from queries import DIST_DATE_QUERY
77
from tasks.base_tasks import dataverse_mapper, \
88
dataverse_import, update_publication_date, get_doi_from_dv_json, \

scripts/flows/entry_workflows/main_dataverse_deletion.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,4 @@ def dataverse_deletion_pipeline(settings_dict_name: str,
5656
settings_dict,
5757
minio_client,
5858
"identifiers-deleted.json",
59-
)
59+
)

scripts/utils.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def identifier_list_workflow_executor(
113113
settings_dict,
114114
s3_client,
115115
object_name,
116-
version = None,
116+
version=None,
117117
):
118118
"""
119119
Grabs a file called identifiers.json from the bucket
@@ -122,17 +122,20 @@ def identifier_list_workflow_executor(
122122
If not return FAILED, if it does execute the data_provider_workflow
123123
for every pid in the list.
124124
125-
:param data_provider_workflow: A function representing the data provider workflow.
125+
:param object_name: Can be identifiers.json or identifiers-deleted.json
126+
:param data_provider_workflow: A function representing the workflow.
126127
:param version: The version of the workflow to be executed.
127-
:param settings_dict: A dictionary containing the settings including the BUCKET_NAME.
128-
:param s3_client: An object representing the Boto3 S3 client to access the bucket.
129-
:return: 'SUCCESS' if the workflow is executed successfully, 'FAILED' otherwise.
128+
:param settings_dict: The settings including the BUCKET_NAME.
129+
:param s3_client: An object representing the Boto3 S3 client.
130+
:return: 'SUCCESS' if the workflow succeeds, 'FAILED' otherwise.
130131
"""
131132
bucket_name = settings_dict.BUCKET_NAME
132-
identifiers_dict = retrieve_identifiers_from_bucket(s3_client, bucket_name, object_name)
133+
identifiers_dict = retrieve_identifiers_from_bucket(s3_client, bucket_name,
134+
object_name)
133135
for pid in identifiers_dict['pids']:
134136
if version:
135-
data_provider_workflow(pid, settings_dict, version, return_state=True)
137+
data_provider_workflow(pid, version, settings_dict,
138+
return_state=True)
136139
else:
137140
data_provider_workflow(pid, settings_dict, return_state=True)
138141

@@ -244,6 +247,7 @@ def failed_dataverse_ingestion_hook(flow, flow_run, state):
244247

245248
update_identifiers_json(bucket_name, "identifiers.json", pid)
246249

250+
247251
def failed_dataverse_deletion_hook(flow, flow_run, state):
248252
logger = get_run_logger()
249253
settings_dict = flow_run.parameters["settings_dict"]
@@ -261,7 +265,8 @@ def failed_dataverse_deletion_hook(flow, flow_run, state):
261265
def update_identifiers_json(bucket_name, object_name, failed_pid):
262266
s3_client = create_s3_client()
263267
create_identifiers_json(s3_client, bucket_name, object_name)
264-
identifiers_dict = retrieve_identifiers_from_bucket(s3_client, bucket_name, object_name)
268+
identifiers_dict = retrieve_identifiers_from_bucket(s3_client, bucket_name,
269+
object_name)
265270
identifiers_dict['pids'].append(failed_pid)
266271

267272
updated_data = json.dumps(identifiers_dict).encode('utf-8')

0 commit comments

Comments
 (0)