Skip to content

Commit ba18d25

Browse files
authored
Merge pull request #137 from dathere/fixing-issues
Fixing issues
2 parents f9b0294 + 79de9bd commit ba18d25

File tree

5 files changed

+45
-26
lines changed

5 files changed

+45
-26
lines changed

ckanext/datapusher_plus/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class DataPusherPlusConfig(MutableMapping):
4747
STDERR: bool = True
4848
KEEP_JOBS_AGE: int = 60
4949

50-
MAX_CONTENT_LENGTH: str = "25600000"
50+
MAX_CONTENT_LENGTH: str = "1256000000000"
5151
IGNORE_FILE_HASH: bool = True
5252
CHUNK_SIZE: str = "16384"
5353
DOWNLOAD_TIMEOUT: int = 300

ckanext/datapusher_plus/jobs.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import time
1313
import decimal
1414
from urllib.parse import urlsplit
15+
from urllib.parse import urlparse
1516
import logging
1617

1718
# Third-party imports
@@ -445,6 +446,16 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
445446
# If this is an uploaded file to CKAN, authenticate the request,
446447
# otherwise we won't get file from private resources
447448
headers["Authorization"] = api_key
449+
450+
# If the ckan_url differs from this url, rewrite this url to the ckan
451+
# url. This can be useful if ckan is behind a firewall.
452+
if not resource_url.startswith(ckan_url):
453+
new_url = urlparse(resource_url)
454+
rewrite_url = urlparse(ckan_url)
455+
new_url = new_url._replace(scheme=rewrite_url.scheme, netloc=rewrite_url.netloc)
456+
resource_url = new_url.geturl()
457+
logger.info('Rewrote resource url to: {0}'.format(resource_url))
458+
448459
try:
449460
kwargs = {
450461
"headers": headers,

ckanext/datapusher_plus/logic/action.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,9 @@ def datapusher_submit(context, data_dict: dict[str, Any]):
173173
'original_url': resource_dict.get('url'),
174174
}
175175
}
176-
timeout = tk.config.get('ckan.datapusher.timeout', 30)
176+
dp_timeout = tk.config.get('ckan.datapusher.timeout', 3000)
177177
try:
178-
job = tk.enqueue_job(jobs.datapusher_plus_to_datastore, [data], rq_kwargs=dict(timeout=timeout))
178+
job = tk.enqueue_job(jobs.datapusher_plus_to_datastore, [data], rq_kwargs=dict(timeout=dp_timeout))
179179
except Exception as e:
180180
log.error("Error submitting job to DataPusher: %s", e)
181181
return False

ckanext/datapusher_plus/migration/datapusher_plus/versions/01_e9c4a88839c8_upgrade_jobs_table.py

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,37 +5,45 @@
55
Create Date: 2023-09-22 22:14:35.137116
66
77
"""
8+
89
from alembic import op
910
import sqlalchemy as sa
1011

1112

1213
# revision identifiers, used by Alembic.
13-
revision = 'e9c4a88839c8'
14+
revision = "e9c4a88839c8"
1415
down_revision = None
1516
branch_labels = None
1617
depends_on = None
1718

1819

1920
def upgrade():
20-
#upgrade jobs table if it not exists
21-
22-
op.add_column(
23-
u'jobs',
24-
sa.Column(
25-
'aps_job_id',
26-
sa.UnicodeText),
27-
)
28-
29-
#upgrade logs table
30-
op.add_column(
31-
'logs',
32-
sa.Column(
33-
'id',
34-
sa.Integer,
35-
primary_key=True,
36-
autoincrement=True),
37-
)
38-
39-
21+
# upgrade jobs table if it not exists
22+
if not _check_column_exists("jobs", "aps_job_id"):
23+
op.add_column(
24+
"jobs",
25+
sa.Column("aps_job_id", sa.UnicodeText),
26+
)
27+
# upgrade logs table
28+
if not _check_column_exists("logs", "id"):
29+
op.add_column(
30+
"logs",
31+
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
32+
)
33+
34+
4035
def downgrade():
41-
pass
36+
# downgrade jobs table
37+
if _check_column_exists("jobs", "aps_job_id"):
38+
op.drop_column("jobs", "aps_job_id")
39+
40+
# downgrade logs table
41+
if _check_column_exists("logs", "id"):
42+
op.drop_column("logs", "id")
43+
44+
45+
def _check_column_exists(table_name, column_name):
46+
bind = op.get_bind()
47+
insp = sa.engine.reflection.Inspector.from_engine(bind)
48+
columns = insp.get_columns(table_name)
49+
return column_name in [column["name"] for column in columns]

dot-env.template

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ SQLALCHEMY_DATABASE_URI = 'postgresql://datapusher_jobs:YOURPASSWORD@localhost/d
1515

1616
# =============== DOWNLOAD SETTINGS ==============
1717
# 25mb, this is ignored if either PREVIEW_ROWS > 0
18-
MAX_CONTENT_LENGTH = 25600000
18+
MAX_CONTENT_LENGTH = 1256000000
1919

2020
# A Datapusher+ job is triggered automatically everytime a resource is modified (even just its metadata)
2121
# if its mimetype is one of the supported datapusher.formats.

0 commit comments

Comments
 (0)