Skip to content

Commit 6e28304

Browse files
Fjodor van RijsselbergFjodor van Rijsselberg
authored andcommitted
Fixed harvesting with timestamp. Added deployment setup for all dataverse ingestion workflows.
1 parent 013afbf commit 6e28304

File tree

4 files changed

+279
-81
lines changed

4 files changed

+279
-81
lines changed

scripts/deployment/prefect.yaml

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
prefect-version: 3.0.10
2+
name: dataverse-deployments
3+
4+
pull:
5+
- prefect.deployments.steps.set_working_directory:
6+
directory: /app/scripts/
7+
8+
definitions:
9+
work_pools:
10+
default_workpool: &default_workpool
11+
name: default
12+
work_queue_name: default
13+
schedules:
14+
every_day: &every_day
15+
cron: "0 0 * * *"
16+
timezone: "Europe/Amsterdam"
17+
18+
deployments:
19+
- name: hsn-ingest
20+
entrypoint: flows/entry_workflows/main_dataverse_ingestion.py:dataverse_ingestion_pipeline
21+
tags: [ "d2d", "hsn" ]
22+
description: "Ingests metadata from IISG into the HSN subverse."
23+
schedule: *every_day
24+
25+
parameters:
26+
settings_dict_name: "HSN"
27+
28+
work_pool: *default_workpool
29+
30+
- name: twente-ingest
31+
entrypoint: flows/entry_workflows/main_dataverse_ingestion.py:dataverse_ingestion_pipeline
32+
tags: [ "d2d", "dataversenl" ]
33+
description: "Ingests metadata from twente from dataversenl."
34+
schedule: *every_day
35+
36+
parameters:
37+
settings_dict_name: "TWENTE"
38+
39+
work_pool: *default_workpool
40+
41+
- name: delft-ingest
42+
entrypoint: flows/entry_workflows/main_dataverse_ingestion.py:dataverse_ingestion_pipeline
43+
tags: [ "d2d", "dataversenl" ]
44+
description: "Ingests metadata from dataversenl."
45+
schedule: *every_day
46+
47+
parameters:
48+
settings_dict_name: "DELFT"
49+
50+
work_pool: *default_workpool
51+
52+
- name: avans-ingest
53+
entrypoint: flows/entry_workflows/main_dataverse_ingestion.py:dataverse_ingestion_pipeline
54+
tags: [ "d2d", "dataversenl" ]
55+
description: "Ingests metadata from dataversenl."
56+
schedule: *every_day
57+
58+
parameters:
59+
settings_dict_name: "AVANS"
60+
61+
work_pool: *default_workpool
62+
63+
- name: fontys-ingest
64+
entrypoint: flows/entry_workflows/main_dataverse_ingestion.py:dataverse_ingestion_pipeline
65+
tags: [ "d2d", "dataversenl" ]
66+
description: "Ingests metadata from fontys from dataversenl."
67+
schedule: *every_day
68+
69+
parameters:
70+
settings_dict_name: "FONTYS"
71+
72+
work_pool: *default_workpool
73+
74+
- name: groningen-ingest
75+
entrypoint: flows/entry_workflows/main_dataverse_ingestion.py:dataverse_ingestion_pipeline
76+
tags: [ "d2d", "dataversenl" ]
77+
description: "Ingests metadata from dataversenl."
78+
schedule: *every_day
79+
80+
parameters:
81+
settings_dict_name: "GRONINGEN"
82+
83+
work_pool: *default_workpool
84+
85+
- name: hanze-ingest
86+
entrypoint: flows/entry_workflows/main_dataverse_ingestion.py:dataverse_ingestion_pipeline
87+
tags: [ "d2d", "dataversenl" ]
88+
description: "Ingests metadata from dataversenl."
89+
schedule: *every_day
90+
91+
parameters:
92+
settings_dict_name: "HANZE"
93+
94+
work_pool: *default_workpool
95+
96+
- name: hr-ingest
97+
entrypoint: flows/entry_workflows/main_dataverse_ingestion.py:dataverse_ingestion_pipeline
98+
tags: [ "d2d", "dataversenl" ]
99+
description: "Ingests metadata from dataversenl."
100+
schedule: *every_day
101+
102+
parameters:
103+
settings_dict_name: "HR"
104+
105+
work_pool: *default_workpool
106+
107+
- name: leiden-ingest
108+
entrypoint: flows/entry_workflows/main_dataverse_ingestion.py:dataverse_ingestion_pipeline
109+
tags: [ "d2d", "dataversenl" ]
110+
description: "Ingests metadata from leiden from dataversenl."
111+
schedule: *every_day
112+
113+
parameters:
114+
settings_dict_name: "LEIDEN"
115+
116+
work_pool: *default_workpool
117+
118+
- name: maastricht-ingest
119+
entrypoint: flows/entry_workflows/main_dataverse_ingestion.py:dataverse_ingestion_pipeline
120+
tags: [ "d2d", "dataversenl" ]
121+
description: "Ingests metadata from maastricht from dataversenl."
122+
schedule: *every_day
123+
124+
parameters:
125+
settings_dict_name: "MAASTRICHT"
126+
127+
work_pool: *default_workpool
128+
129+
- name: tilburg-ingest
130+
entrypoint: flows/entry_workflows/main_dataverse_ingestion.py:dataverse_ingestion_pipeline
131+
tags: [ "d2d", "dataversenl" ]
132+
description: "Ingests metadata from tilburg from dataversenl."
133+
schedule: *every_day
134+
135+
parameters:
136+
settings_dict_name: "TILBURG"
137+
138+
work_pool: *default_workpool
139+
140+
- name: trimbos-ingest
141+
entrypoint: flows/entry_workflows/main_dataverse_ingestion.py:dataverse_ingestion_pipeline
142+
tags: [ "d2d", "dataversenl" ]
143+
description: "Ingests metadata from trimbos from dataversenl."
144+
schedule: *every_day
145+
146+
parameters:
147+
settings_dict_name: "TRIMBOS"
148+
149+
work_pool: *default_workpool
150+
151+
- name: umcu-ingest
152+
entrypoint: flows/entry_workflows/main_dataverse_ingestion.py:dataverse_ingestion_pipeline
153+
tags: [ "d2d", "dataversenl" ]
154+
description: "Ingests metadata from umcu from dataversenl."
155+
schedule: *every_day
156+
157+
parameters:
158+
settings_dict_name: "UMCU"
159+
160+
work_pool: *default_workpool
161+
162+
- name: utrecht-ingest
163+
entrypoint: flows/entry_workflows/main_dataverse_ingestion.py:dataverse_ingestion_pipeline
164+
tags: [ "d2d", "dataversenl" ]
165+
description: "Ingests metadata from utrecht from dataversenl."
166+
schedule: *every_day
167+
168+
parameters:
169+
settings_dict_name: "UTRECHT"
170+
171+
work_pool: *default_workpool
172+
173+
- name: vu-ingest
174+
entrypoint: flows/entry_workflows/main_dataverse_ingestion.py:dataverse_ingestion_pipeline
175+
tags: [ "d2d", "dataversenl" ]
176+
description: "Ingests metadata from vu from dataversenl."
177+
schedule: *every_day
178+
179+
parameters:
180+
settings_dict_name: "VU"
181+
182+
work_pool: *default_workpool
183+
184+
- name: dans-ingest
185+
entrypoint: flows/entry_workflows/main_dataverse_ingestion.py:dataverse_ingestion_pipeline
186+
tags: [ "d2d", "dataversenl" ]
187+
description: "Ingests metadata from dans from dataversenl."
188+
schedule: *every_day
189+
190+
parameters:
191+
settings_dict_name: "DANS"
192+
193+
work_pool: *default_workpool

scripts/flows/entry_workflows/main_dataverse_ingestion.py

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
dataverse_metadata_ingestion
77
from flows.workflow_versioning.workflow_versioner import \
88
create_ingestion_workflow_versioning
9-
from tasks.harvest_tasks import oai_harvest_metadata
9+
from tasks.harvest_tasks import oai_harvest_metadata, \
10+
get_most_recent_publication_date
1011

1112

1213
@flow(name="Dataverse Ingestion Pipeline")
@@ -42,31 +43,29 @@ def dataverse_ingestion_pipeline(settings_dict_name: str,
4243

4344
minio_client = utils.create_s3_client()
4445

45-
if hasattr(settings_dict,
46-
'OAI_SET') and settings_dict.OAI_SET and do_harvest:
47-
oai_harvest_metadata(
48-
settings.METADATA_PREFIX,
49-
f'{settings_dict.SOURCE_DATAVERSE_URL}/oai',
50-
settings_dict.BUCKET_NAME,
51-
'ListIdentifiers',
52-
'start_harvest',
53-
settings_dict.OAI_SET,
54-
settings_dict.FROM
55-
)
46+
if do_harvest:
47+
timestamp = get_most_recent_publication_date(settings_dict)
5648

57-
elif do_harvest:
58-
oai_harvest_metadata(
59-
settings.METADATA_PREFIX,
60-
f'{settings_dict.SOURCE_DATAVERSE_URL}/oai',
61-
settings_dict.BUCKET_NAME,
62-
'ListIdentifiers',
63-
'start_harvest'
64-
)
49+
harvest_params = {
50+
'metadata_prefix': settings.METADATA_PREFIX,
51+
'oai_endpoint': f'{settings_dict.SOURCE_DATAVERSE_URL}/oai',
52+
'bucket_name': settings_dict.BUCKET_NAME,
53+
'verb': 'ListIdentifiers',
54+
'harvester_endpoint': 'start_harvest'
55+
}
56+
57+
if hasattr(settings_dict, 'OAI_SET') and settings_dict.OAI_SET:
58+
harvest_params['oai_set'] = settings_dict.OAI_SET
59+
60+
if timestamp:
61+
harvest_params['timestamp'] = timestamp
62+
63+
oai_harvest_metadata(**harvest_params)
6564

6665
utils.identifier_list_workflow_executor(
6766
dataverse_metadata_ingestion,
6867
settings_dict,
6968
minio_client,
7069
"identifiers.json",
7170
version
72-
)
71+
)

scripts/tasks/base_tasks.py

Lines changed: 25 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def dataverse_mapper(json_metadata, mapping_file_path, template_file_path,
8080

8181

8282
@task(timeout_seconds=300, retries=1, cache_expiration=timedelta(minutes=10))
83-
def dataverse_import(mapped_metadata, settings_dict, doi=None):
83+
def dataverse_import(mapped_metadata, settings_dict, doi):
8484
""" Sends a request to the import service to import the given metadata.
8585
8686
The dataverse_information field in the data takes three fields:
@@ -96,27 +96,15 @@ def dataverse_import(mapped_metadata, settings_dict, doi=None):
9696
logger = get_run_logger()
9797

9898
headers = {
99-
'accept': 'application/json',
100-
'Content-Type': 'application/json'
99+
"X-Dataverse-key": settings_dict.DESTINATION_DATAVERSE_API_KEY,
100+
"Content-type": "application/json"
101101
}
102102

103-
data = {
104-
"metadata": mapped_metadata,
105-
"dataverse_information": {
106-
"base_url": settings_dict.DESTINATION_DATAVERSE_URL,
107-
"dt_alias": settings_dict.ALIAS,
108-
"api_token": settings_dict.DESTINATION_DATAVERSE_API_KEY
109-
}}
103+
url = f"{settings_dict.DESTINATION_DATAVERSE_URL}/api/dataverses/" \
104+
f"{settings_dict.ALIAS}/datasets/:import?pid={doi}&release=no"
110105

111-
if doi:
112-
data['doi'] = doi
106+
response = requests.post(url, headers=headers, json=mapped_metadata)
113107

114-
url = f"{settings.DATAVERSE_IMPORTER_URL}/importer"
115-
response = requests.post(
116-
url,
117-
headers=headers,
118-
data=json.dumps(data)
119-
)
120108
if not response.ok:
121109
logger.info(response.text)
122110
return None
@@ -125,11 +113,10 @@ def dataverse_import(mapped_metadata, settings_dict, doi=None):
125113

126114
@task(timeout_seconds=300, retries=1, cache_expiration=timedelta(minutes=10))
127115
def update_publication_date(publication_date, pid, settings_dict):
128-
""" Sends a request to the publication date updater to update the pub date.
116+
""" Sends a request to the dataverse target to update the publication date.
129117
130-
The dataverse_information field in the data takes two fields:
131-
base_url: The Dataverse instance URL.
132-
api_token: The token specific to this DV instance to allow use of the API.
118+
This task updates the publication date of a given pid in the destination
119+
dataverse. It uses the experimental dataverse API to achieve this.
133120
134121
:param publication_date: The original date of publication.
135122
:param pid: The DOI of the dataset in question.
@@ -139,25 +126,19 @@ def update_publication_date(publication_date, pid, settings_dict):
139126
logger = get_run_logger()
140127

141128
headers = {
142-
'accept': 'application/json',
143-
'Content-Type': 'application/json'
144-
}
129+
"X-Dataverse-key": settings_dict.DESTINATION_DATAVERSE_API_KEY,
130+
'Content-Type': 'application/ld+json'}
145131

146-
data = {
147-
'pid': pid,
148-
'publication_date': publication_date,
149-
"dataverse_information": {
150-
"base_url": settings_dict.DESTINATION_DATAVERSE_URL,
151-
"api_token": settings_dict.DESTINATION_DATAVERSE_API_KEY
152-
}
153-
}
132+
url = f'{settings_dict.DESTINATION_DATAVERSE_URL}/api/datasets/' \
133+
f':persistentId/actions/:releasemigrated?persistentId={pid}'
134+
135+
publication_date = {
136+
"schema:datePublished": f'{publication_date}',
137+
"@context": {"schema": "http://schema.org/"}}
138+
139+
response = requests.post(url, data=json.dumps(publication_date),
140+
headers=headers)
154141

155-
url = f"{settings.PUBLICATION_DATA_UPDATER_URL}/publication-date-updater"
156-
response = requests.post(
157-
url,
158-
headers=headers,
159-
data=json.dumps(data)
160-
)
161142
if not response.ok:
162143
logger.info(response.text)
163144
return None
@@ -179,23 +160,10 @@ def dataverse_metadata_fetcher(metadata_format, doi, settings_dict):
179160
"""
180161
logger = get_run_logger()
181162

182-
headers = {
183-
'accept': 'application/json',
184-
'Content-Type': 'application/json'
185-
}
163+
url = f'{settings_dict.SOURCE_DATAVERSE_URL}/api/datasets/export?' \
164+
f'exporter={metadata_format}&persistentId={doi}'
186165

187-
data = {
188-
'doi': doi,
189-
'metadata_format': metadata_format,
190-
"base_url": settings_dict.SOURCE_DATAVERSE_URL,
191-
}
192-
193-
url = f"{settings.METADATA_FETCHER_URL}/dataverse-metadata-fetcher"
194-
response = requests.post(
195-
url,
196-
headers=headers,
197-
data=json.dumps(data)
198-
)
166+
response = requests.get(url)
199167

200168
if not response.ok:
201169
logger.info(response.text)
@@ -223,7 +191,7 @@ def dataverse_dataset_check_status(doi, dataverse_url):
223191
logger = get_run_logger()
224192

225193
url = f"{dataverse_url}/api/datasets/export?exporter=dcterms&" \
226-
f"persistentId={doi}"
194+
f"persistentId={doi}"
227195
response = requests.get(url)
228196

229197
if response.status_code in (200, 403, 404):
@@ -253,7 +221,7 @@ def delete_dataset(pid, settings_dict):
253221

254222
logger = get_run_logger()
255223
url = f"{settings_dict.DESTINATION_DATAVERSE_URL}/api/datasets/" \
256-
f":persistentId/destroy/?persistentId={pid}"
224+
f":persistentId/destroy/?persistentId={pid}"
257225
response = requests.delete(url, headers=headers)
258226

259227
if response and response.status_code == 200:

0 commit comments

Comments
 (0)