Skip to content

Commit 90a4868

Browse files
authored
Merge pull request #138 from dathere/sync-with-master
Sync with master
2 parents ba18d25 + 6041af4 commit 90a4868

File tree

3 files changed

+59
-4
lines changed

3 files changed

+59
-4
lines changed

ckanext/datapusher_plus/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class DataPusherPlusConfig(MutableMapping):
3535
# ckan_service_provider settings
3636
SQLALCHEMY_DATABASE_URI: str = _DATABASE_URI
3737
WRITE_ENGINE_URL: str = _WRITE_ENGINE_URL
38+
COPY_READBUFFER_SIZE: int = 1048576
3839
DEBUG: bool = False
3940
TESTING: bool = False
4041
SECRET_KEY: str = str(uuid.uuid4())

ckanext/datapusher_plus/jobs.py

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,55 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
659659
if resource_format.upper() == "CSV":
660660
logger.info("Normalizing/UTF-8 transcoding {}...".format(resource_format))
661661
else:
662-
logger.info("Normalizing/UTF-8 transcoding {} to CSV...".format(format))
662+
# if not CSV (e.g. TSV, TAB, etc.) we need to normalize to CSV
663+
logger.info(
664+
"Normalizing/UTF-8 transcoding {} to CSV...".format(resource_format)
665+
)
666+
667+
qsv_input_utf_8_encoded_csv = os.path.join(temp_dir, 'qsv_input_utf_8_encoded.csv')
668+
669+
# using uchardet to determine encoding
670+
file_encoding = subprocess.run(
671+
[
672+
"uchardet",
673+
tmp
674+
],
675+
check=True,
676+
capture_output=True,
677+
text=True,
678+
)
679+
logger.info("Identified encoding of the file: {}".format(file_encoding.stdout))
680+
681+
# trim the encoding string
682+
file_encoding.stdout = file_encoding.stdout.strip()
683+
684+
# using iconv to re-encode in UTF-8
685+
if file_encoding.stdout != "UTF-8":
686+
logger.info("File is not UTF-8 encoded. Re-encoding from {} to UTF-8".format(
687+
file_encoding.stdout)
688+
)
689+
try:
690+
subprocess.run(
691+
[
692+
"iconv",
693+
"-f",
694+
file_encoding.stdout,
695+
"-t",
696+
"UTF-8",
697+
tmp,
698+
"--output",
699+
qsv_input_utf_8_encoded_csv,
700+
],
701+
check=True,
702+
)
703+
except subprocess.CalledProcessError as e:
704+
# return as we can't push a non UTF-8 CSV
705+
logger.error(
706+
"Job aborted as the file cannot be re-encoded to UTF-8: {}.".format(e)
707+
)
708+
return
709+
else:
710+
qsv_input_utf_8_encoded_csv = tmp
663711
try:
664712
qsv_input = subprocess.run(
665713
[
@@ -693,7 +741,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
693741
)
694742
except subprocess.CalledProcessError as e:
695743
# return as we can't push an invalid CSV file
696-
validate_error_msg = qsv_validate.stderr
744+
validate_error_msg = e.stderr
697745
logger.error("Invalid CSV! Job aborted: {}.".format(validate_error_msg))
698746
return
699747
logger.info("Well-formed, valid CSV file confirmed...")
@@ -1359,6 +1407,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
13591407
except psycopg2.Error as e:
13601408
raise utils.JobError("Could not connect to the Datastore: {}".format(e))
13611409
else:
1410+
copy_readbuffer_size = config.get("COPY_READBUFFER_SIZE")
13621411
cur = raw_connection.cursor()
13631412
"""
13641413
truncate table to use copy freeze option and further increase
@@ -1383,9 +1432,10 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
13831432
sql.Identifier(resource_id),
13841433
column_names,
13851434
)
1386-
with open(tmp, "rb") as f:
1435+
# specify a 1MB buffer size for COPY read from disk
1436+
with open(tmp, "rb", copy_readbuffer_size) as f:
13871437
try:
1388-
cur.copy_expert(copy_sql, f)
1438+
cur.copy_expert(copy_sql, f, size=copy_readbuffer_size)
13891439
except psycopg2.Error as e:
13901440
raise utils.JobError("Postgres COPY failed: {}".format(e))
13911441
else:

dot-env.template

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ WRITE_ENGINE_URL = 'postgresql://datapusher:YOURPASSWORD@localhost/datastore_def
1313
# The connect string of the Datapusher+ Job database
1414
SQLALCHEMY_DATABASE_URI = 'postgresql://datapusher_jobs:YOURPASSWORD@localhost/datapusher_jobs'
1515

16+
# READ BUFFER SIZE IN BYTES WHEN READING CSV FILE WHEN USING POSTGRES COPY
17+
# default 64k = 65536
18+
COPY_READBUFFER_SIZE = 65536
19+
1620
# =============== DOWNLOAD SETTINGS ==============
1721
# 25mb, this is ignored if either PREVIEW_ROWS > 0
1822
MAX_CONTENT_LENGTH = 1256000000

0 commit comments

Comments
 (0)