Skip to content

Commit cd323d4

Browse files
Merge pull request #64 from odissei-data/incremental-harvester
Incremental harvester
2 parents 8f9261d + 99d0d74 commit cd323d4

File tree

15 files changed

+257
-52
lines changed

15 files changed

+257
-52
lines changed

Dockerfile.server

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM python:3.12-slim
1+
FROM python:3.11-slim
22

33
ENV PYTHONPATH="${PYTHONPATH}:/app/scripts/" \
44
PYTHONDONTWRITEBYTECODE=1

Dockerfile.worker

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM python:3.12-slim
1+
FROM python:3.11-slim
22

33
ENV PYTHONPATH="${PYTHONPATH}:/app/scripts/" \
44
PYTHONDONTWRITEBYTECODE=1

pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ description = "Workflow for metadata ingestion into Dataverse"
55
authors = ["Fjodor van Rijsselberg"]
66

77
[tool.poetry.dependencies]
8-
python = "^3.12.5"
9-
prefect = "^3.0.8"
8+
python = "^3.11.0"
9+
prefect = "^3.0.10"
1010
python-dotenv = "^1.0.1"
11-
boto3 = "^1.35.39"
11+
boto3 = "^1.35.44"
1212
dynaconf = "^3.2.6"
1313
jmespath = "^1.0.1"
1414
pydantic = "^2.9.2"

scripts/configuration/odissei_settings.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ CBS_TEMPLATE_FILE_PATH="/app/resources/templates/cbs_dataverse_template.json"
7272
CBS_MAPPING_FILE_PATH="/app/resources/mappings/cbs-mapping.json"
7373

7474
LISS_TEMPLATE_FILE_PATH="/app/resources/templates/liss_dataverse_template.json"
75-
LISS_MAPPING_FILE_PATH="/resources/mappings/liss-mapping.json"
75+
LISS_MAPPING_FILE_PATH="/app/resources/mappings/liss-mapping.json"
7676

7777
CID_TEMPLATE_FILE_PATH="/app/resources/templates/cid_dataverse_template.json"
7878
CID_MAPPING_FILE_PATH="/app/resources/mappings/cid-mapping.json"

scripts/flows/dataset_workflows/cbs_ingestion.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import jmespath
22

33
from prefect import flow
4-
from prefect.server.schemas.states import Completed, Failed
4+
from prefect.states import Completed, Failed
55
from queries import DIST_DATE_QUERY, CBS_ID_QUERY
66
from tasks.base_tasks import xml2json, dataverse_mapper, \
77
dataverse_import, update_publication_date, add_workflow_versioning_url, \

scripts/flows/dataset_workflows/cid_ingestion.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import jmespath
33

44
from prefect import flow, get_run_logger
5-
from prefect.server.schemas.states import Completed, Failed
5+
from prefect.states import Completed, Failed
66
from queries import DIST_DATE_QUERY
77
from tasks.base_tasks import dataverse_mapper, \
88
dataverse_import, update_publication_date, add_workflow_versioning_url, \
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from prefect import flow
2+
from prefect.states import Failed, Completed
3+
4+
from tasks.base_tasks import dataverse_dataset_check_status, \
5+
delete_dataset
6+
from utils import generate_dv_flow_run_name, failed_dataverse_deletion_hook
7+
8+
9+
@flow(name="Deleting Dataverse metadata", flow_run_name= generate_dv_flow_run_name,
10+
on_failure=[failed_dataverse_deletion_hook])
11+
def dataverse_metadata_deletion(pid, settings_dict):
12+
"""
13+
Deletion flow for Dataverse to dataverse ingestion.
14+
15+
:param pid: pid of the dataset.
16+
:param settings_dict: dict, contains settings for the current workflow.
17+
:return: prefect.server.schemas.states Failed or Completed.
18+
"""
19+
dv_response_status = dataverse_dataset_check_status(pid, settings_dict)
20+
if not dv_response_status:
21+
return Failed(message=f'No response from {pid}.')
22+
23+
if dv_response_status in (404, 403):
24+
response = delete_dataset(pid, settings_dict)
25+
if not response:
26+
return Failed(message=f'Unable to delete dataset: {pid}.')
27+
28+
return Completed(message=pid + 'deleted successfully.')
29+
30+
# If the dataset is active: 200, it will not be deleted.
31+
return Failed(message=f'{pid} will not deleted. Dataset is active.')

scripts/flows/dataset_workflows/dataverse_ingestion.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from prefect import flow
2-
from prefect.server.schemas.states import Failed, Completed
2+
from prefect.states import Failed, Completed
33

44
from tasks.base_tasks import dataverse_metadata_fetcher, dataverse_import, \
55
update_publication_date, add_workflow_versioning_url, refine_metadata, \
6-
enrich_metadata, dataverse_mapper
6+
enrich_metadata, dataverse_mapper, dataverse_dataset_check_status, \
7+
delete_dataset
78
from utils import generate_dv_flow_run_name, failed_dataverse_ingestion_hook
89

910

@@ -45,6 +46,20 @@ def dataverse_metadata_ingestion(pid, version, settings_dict):
4546
if not dataverse_json:
4647
return Failed(message='Unable to enrich metadata using ELSST.')
4748

49+
# The result will be 200 (Dataset exists) or 404 (Dataset does not exist)
50+
dv_response_status = dataverse_dataset_check_status(
51+
pid,
52+
settings_dict.DESTINATION_DATAVERSE_URL
53+
)
54+
55+
if not dv_response_status:
56+
return Failed(message=f'No response from {pid}.')
57+
if dv_response_status == 200:
58+
# 200 means that the dataset must be deleted and reingested.
59+
deleted_response = delete_dataset(pid, settings_dict)
60+
if not deleted_response:
61+
return Failed(message=f'Unable to delete dataset: {pid}.')
62+
4863
import_response = dataverse_import(dataverse_json, settings_dict, pid)
4964
if not import_response:
5065
return Failed(message='Unable to import dataset into Dataverse')

scripts/flows/dataset_workflows/liss_ingestion.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import jmespath
33

44
from prefect import flow
5-
from prefect.server.schemas.states import Completed, Failed
5+
from prefect.states import Completed, Failed
66
from queries import DIST_DATE_QUERY
77
from tasks.base_tasks import dataverse_mapper, \
88
dataverse_import, update_publication_date, get_doi_from_dv_json, \
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from prefect import flow
2+
3+
import utils
4+
from configuration.config import settings
5+
from flows.dataset_workflows.dataverse_deletion import dataverse_metadata_deletion
6+
from tasks.harvest_tasks import oai_harvest_metadata
7+
8+
9+
@flow(name="Dataverse Deleted Pipeline")
10+
def dataverse_deletion_pipeline(settings_dict_name: str,
11+
target_url: str = None,
12+
target_key: str = None,
13+
do_harvest: bool = True
14+
):
15+
""" Deletion pipeline dedicated to the Dataverse to Dataverse workflow.
16+
17+
:param do_harvest: Boolean stating if the dataset metadata should be
18+
harvested before ingestion.
19+
:param target_url: Optional target dataverse url.
20+
:param target_key: API key of the optional target dataverse.
21+
:param settings_dict_name: string, name of the settings you wish to use
22+
"""
23+
settings_dict = getattr(settings, settings_dict_name)
24+
25+
if target_url:
26+
settings_dict.DESTINATION_DATAVERSE_URL = target_url
27+
28+
if target_key:
29+
settings_dict.DESTINATION_DATAVERSE_API_KEY = target_key
30+
31+
32+
minio_client = utils.create_s3_client()
33+
34+
if hasattr(settings_dict,
35+
'OAI_SET') and settings_dict.OAI_SET and do_harvest:
36+
oai_harvest_metadata(
37+
settings.METADATA_PREFIX,
38+
f'{settings_dict.SOURCE_DATAVERSE_URL}/oai',
39+
settings_dict.BUCKET_NAME,
40+
'ListIdentifiers',
41+
'start_harvest',
42+
settings_dict.OAI_SET,
43+
settings_dict.FROM
44+
)
45+
46+
elif do_harvest:
47+
oai_harvest_metadata(
48+
settings.METADATA_PREFIX,
49+
f'{settings_dict.SOURCE_DATAVERSE_URL}/oai',
50+
settings_dict.BUCKET_NAME,
51+
'ListIdentifiers',
52+
'start_harvest'
53+
)
54+
utils.identifier_list_workflow_executor(
55+
dataverse_metadata_deletion,
56+
settings_dict,
57+
minio_client,
58+
"identifiers-deleted.json",
59+
)

0 commit comments

Comments
 (0)