Skip to content

Commit 51fdbd7

Browse files
committed
Add KMS encrypt on S3 and bumping version to 0b32
1 parent 33b3a20 commit 51fdbd7

17 files changed

+186
-60
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ instance/
6767

6868
# Sphinx documentation
6969
docs/_build/
70+
docs/source/api/
7071

7172
# PyBuilder
7273
target/

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,19 @@ session.pandas.to_parquet(
4646

4747
If a Glue Database name is passed, all the metadata will be created in the Glue Catalog. If not, only the s3 data write will be done.
4848

49+
### Writing Pandas Dataframe to S3 as Parquet encrypting with a KMS key
50+
51+
```py3
52+
extra_args = {
53+
"ServerSideEncryption": "aws:kms",
54+
"SSEKMSKeyId": "YOUR_KMY_KEY_ARN"
55+
}
56+
session = awswrangler.Session(s3_additional_kwargs=extra_args)
57+
session.pandas.to_parquet(
58+
path="s3://..."
59+
)
60+
```
61+
4962
### Reading from AWS Athena to Pandas
5063

5164
```py3

awswrangler/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
__title__ = "awswrangler"
22
__description__ = "Utility belt to handle data on AWS."
3-
__version__ = "0.0b31"
3+
__version__ = "0.0b32"
44
__license__ = "Apache License 2.0"

awswrangler/athena.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def get_query_dtype(self, query_execution_id):
5454
def create_athena_bucket(self):
5555
"""
5656
Creates the default Athena bucket if not exists
57+
5758
:return: Bucket s3 path (E.g. s3://aws-athena-query-results-ACCOUNT-REGION/)
5859
"""
5960
account_id = (self._session.boto3_session.client(
@@ -69,6 +70,7 @@ def create_athena_bucket(self):
6970
def run_query(self, query, database, s3_output=None):
7071
"""
7172
Run a SQL Query against AWS Athena
73+
7274
:param query: SQL query
7375
:param database: AWS Glue/Athena database name
7476
:param s3_output: AWS S3 path

awswrangler/glue.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ def __init__(self, session):
1717
def get_table_athena_types(self, database, table):
1818
"""
1919
Get all columns names and the related data types
20+
2021
:param database: Glue database's name
2122
:param table: Glue table's name
2223
:return: A dictionary as {"col name": "col dtype"}
@@ -34,6 +35,7 @@ def get_table_athena_types(self, database, table):
3435
def get_table_python_types(self, database, table):
3536
"""
3637
Get all columns names and the related python types
38+
3739
:param database: Glue database's name
3840
:param table: Glue table's name
3941
:return: A dictionary as {"col name": "col python type"}
@@ -178,7 +180,7 @@ def _build_schema(dataframe,
178180
partition_cols,
179181
preserve_index,
180182
cast_columns=None):
181-
print(f"dataframe.dtypes:\n{dataframe.dtypes}")
183+
logger.debug(f"dataframe.dtypes:\n{dataframe.dtypes}")
182184
if not partition_cols:
183185
partition_cols = []
184186
schema_built = []

awswrangler/pandas.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def read_csv(
5454
Try to mimic as most as possible pandas.read_csv()
5555
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
5656
P.S. max_result_size != None tries to mimic the chunksize behaviour in pandas.read_sql()
57+
5758
:param path: AWS S3 path (E.g. S3://BUCKET_NAME/KEY_NAME)
5859
:param max_result_size: Max number of bytes on each request to S3
5960
:param header: Same as pandas.read_csv()
@@ -131,6 +132,7 @@ def _read_csv_iterator(
131132
Read CSV file from AWS S3 using optimized strategies.
132133
Try to mimic as most as possible pandas.read_csv()
133134
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
135+
134136
:param client_s3: Boto3 S3 client object
135137
:param bucket_name: S3 bucket name
136138
:param key_path: S3 key path (W/o bucket)
@@ -235,6 +237,7 @@ def _extract_terminator_profile(body, sep, quotechar, lineterminator,
235237
last_index):
236238
"""
237239
Backward parser for quoted CSV lines
240+
238241
:param body: String
239242
:param sep: Same as pandas.read_csv()
240243
:param quotechar: Same as pandas.read_csv()
@@ -290,6 +293,7 @@ def _extract_terminator_profile(body, sep, quotechar, lineterminator,
290293
def _find_terminator(body, sep, quoting, quotechar, lineterminator):
291294
"""
292295
Find for any suspicious of line terminator (From end to start)
296+
293297
:param body: String
294298
:param sep: Same as pandas.read_csv()
295299
:param quoting: Same as pandas.read_csv()
@@ -345,6 +349,7 @@ def _read_csv_once(
345349
Read CSV file from AWS S3 using optimized strategies.
346350
Try to mimic as most as possible pandas.read_csv()
347351
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
352+
348353
:param client_s3: Boto3 S3 client object
349354
:param bucket_name: S3 bucket name
350355
:param key_path: S3 key path (W/o bucket)
@@ -391,6 +396,7 @@ def read_sql_athena(self,
391396
"""
392397
Executes any SQL query on AWS Athena and return a Dataframe of the result.
393398
P.S. If max_result_size is passed, then a iterator of Dataframes is returned.
399+
394400
:param sql: SQL Query
395401
:param database: Glue/Athena Database
396402
:param s3_output: AWS S3 path
@@ -436,6 +442,7 @@ def to_csv(
436442
"""
437443
Write a Pandas Dataframe as CSV files on S3
438444
Optionally writes metadata on AWS Glue.
445+
439446
:param dataframe: Pandas Dataframe
440447
:param path: AWS S3 path (E.g. s3://bucket-name/folder_name/
441448
:param database: AWS Glue Database name
@@ -474,6 +481,7 @@ def to_parquet(self,
474481
"""
475482
Write a Pandas Dataframe as parquet files on S3
476483
Optionally writes metadata on AWS Glue.
484+
477485
:param dataframe: Pandas Dataframe
478486
:param path: AWS S3 path (E.g. s3://bucket-name/folder_name/
479487
:param database: AWS Glue Database name
@@ -483,8 +491,7 @@ def to_parquet(self,
483491
:param mode: "append", "overwrite", "overwrite_partitions"
484492
:param procs_cpu_bound: Number of cores used for CPU bound tasks
485493
:param procs_io_bound: Number of cores used for I/O bound tasks
486-
:param cast_columns: Dictionary of columns names and Arrow types to be casted.
487-
E.g. {"col name": "int64", "col2 name": "int32"}
494+
:param cast_columns: Dictionary of columns names and Arrow types to be casted. (E.g. {"col name": "int64", "col2 name": "int32"})
488495
:return: List of objects written on S3
489496
"""
490497
return self.to_s3(dataframe=dataframe,
@@ -514,6 +521,7 @@ def to_s3(self,
514521
"""
515522
Write a Pandas Dataframe on S3
516523
Optionally writes metadata on AWS Glue.
524+
517525
:param dataframe: Pandas Dataframe
518526
:param path: AWS S3 path (E.g. s3://bucket-name/folder_name/
519527
:param file_format: "csv" or "parquet"
@@ -524,9 +532,7 @@ def to_s3(self,
524532
:param mode: "append", "overwrite", "overwrite_partitions"
525533
:param procs_cpu_bound: Number of cores used for CPU bound tasks
526534
:param procs_io_bound: Number of cores used for I/O bound tasks
527-
:param cast_columns: Dictionary of columns indexes and Arrow types to be casted.
528-
E.g. {2: "int64", 5: "int32"}
529-
Only for "parquet" file_format
535+
:param cast_columns: Dictionary of columns indexes and Arrow types to be casted. (E.g. {2: "int64", 5: "int32"}) (Only for "parquet" file_format)
530536
:return: List of objects written on S3
531537
"""
532538
if not partition_cols:
@@ -769,17 +775,16 @@ def to_redshift(
769775
):
770776
"""
771777
Load Pandas Dataframe as a Table on Amazon Redshift
778+
772779
:param dataframe: Pandas Dataframe
773780
:param path: S3 path to write temporary files (E.g. s3://BUCKET_NAME/ANY_NAME/)
774781
:param connection: A PEP 249 compatible connection (Can be generated with Redshift.generate_connection())
775782
:param schema: The Redshift Schema for the table
776783
:param table: The name of the desired Redshift table
777784
:param iam_role: AWS IAM role with the related permissions
778-
:param diststyle: Redshift distribution styles. Must be in ["AUTO", "EVEN", "ALL", "KEY"]
779-
https://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html
785+
:param diststyle: Redshift distribution styles. Must be in ["AUTO", "EVEN", "ALL", "KEY"] (https://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html)
780786
:param distkey: Specifies a column name or positional number for the distribution key
781-
:param sortstyle: Sorting can be "COMPOUND" or "INTERLEAVED"
782-
https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html
787+
:param sortstyle: Sorting can be "COMPOUND" or "INTERLEAVED" (https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html)
783788
:param sortkey: List of columns to be sorted
784789
:param preserve_index: Should we preserve the Dataframe index?
785790
:param mode: append or overwrite

awswrangler/redshift.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ def load_table(
113113
"""
114114
Load Parquet files into a Redshift table using a manifest file.
115115
Creates the table if necessary.
116+
116117
:param dataframe: Pandas or Spark Dataframe
117118
:param dataframe_type: "pandas" or "spark"
118119
:param manifest_path: S3 path for manifest file (E.g. S3://...)
@@ -121,11 +122,9 @@ def load_table(
121122
:param redshift_conn: A PEP 249 compatible connection (Can be generated with Redshift.generate_connection())
122123
:param num_files: Number of files to be loaded
123124
:param iam_role: AWS IAM role with the related permissions
124-
:param diststyle: Redshift distribution styles. Must be in ["AUTO", "EVEN", "ALL", "KEY"]
125-
https://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html
125+
:param diststyle: Redshift distribution styles. Must be in ["AUTO", "EVEN", "ALL", "KEY"] (https://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html)
126126
:param distkey: Specifies a column name or positional number for the distribution key
127-
:param sortstyle: Sorting can be "COMPOUND" or "INTERLEAVED"
128-
https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html
127+
:param sortstyle: Sorting can be "COMPOUND" or "INTERLEAVED" (https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html)
129128
:param sortkey: List of columns to be sorted
130129
:param mode: append or overwrite
131130
:param preserve_index: Should we preserve the Dataframe index? (ONLY for Pandas Dataframe)
@@ -184,16 +183,15 @@ def _create_table(
184183
):
185184
"""
186185
Creates Redshift table.
186+
187187
:param cursor: A PEP 249 compatible cursor
188188
:param dataframe: Pandas or Spark Dataframe
189189
:param dataframe_type: "pandas" or "spark"
190190
:param schema_name: Redshift schema
191191
:param table_name: Redshift table name
192-
:param diststyle: Redshift distribution styles. Must be in ["AUTO", "EVEN", "ALL", "KEY"]
193-
https://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html
192+
:param diststyle: Redshift distribution styles. Must be in ["AUTO", "EVEN", "ALL", "KEY"] (https://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html)
194193
:param distkey: Specifies a column name or positional number for the distribution key
195-
:param sortstyle: Sorting can be "COMPOUND" or "INTERLEAVED"
196-
https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html
194+
:param sortstyle: Sorting can be "COMPOUND" or "INTERLEAVED" (https://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html)
197195
:param sortkey: List of columns to be sorted
198196
:param preserve_index: Should we preserve the Dataframe index? (ONLY for Pandas Dataframe)
199197
:return: None

awswrangler/s3.py

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,36 @@ def mkdir_if_not_exists(fs, path):
1919
assert fs.exists(path)
2020

2121

22-
def get_fs(session_primitives):
23-
aws_access_key_id, aws_secret_access_key, profile_name, config = None, None, None, None
22+
def get_fs(session_primitives=None):
23+
aws_access_key_id, aws_secret_access_key, profile_name, config, s3_additional_kwargs = None, None, None, None, None
2424
if session_primitives:
25-
aws_access_key_id = (session_primitives.aws_access_key_id
26-
if session_primitives.aws_access_key_id else None)
27-
aws_secret_access_key = (session_primitives.aws_secret_access_key
28-
if session_primitives.aws_secret_access_key
29-
else None)
30-
profile_name = (session_primitives.profile_name
31-
if session_primitives.profile_name else None)
32-
config = {
33-
"retries": {
34-
"max_attempts": session_primitives.botocore_max_retries
25+
if session_primitives.aws_access_key_id:
26+
aws_access_key_id = session_primitives.aws_access_key_id
27+
if session_primitives.aws_secret_access_key:
28+
aws_secret_access_key = session_primitives.aws_secret_access_key
29+
if session_primitives.profile_name:
30+
profile_name = session_primitives.profile_name
31+
if session_primitives.botocore_max_retries:
32+
config = {
33+
"retries": {
34+
"max_attempts": session_primitives.botocore_max_retries
35+
}
3536
}
36-
}
37+
if session_primitives.s3_additional_kwargs:
38+
s3_additional_kwargs = session_primitives.s3_additional_kwargs
3739
if profile_name:
38-
return s3fs.S3FileSystem(profile_name=profile_name,
39-
config_kwargs=config)
40+
fs = s3fs.S3FileSystem(profile_name=profile_name,
41+
config_kwargs=config,
42+
s3_additional_kwargs=s3_additional_kwargs)
4043
elif aws_access_key_id and aws_secret_access_key:
41-
return s3fs.S3FileSystem(key=aws_access_key_id,
42-
secret=aws_secret_access_key,
43-
config_kwargs=config)
44+
fs = s3fs.S3FileSystem(key=aws_access_key_id,
45+
secret=aws_secret_access_key,
46+
config_kwargs=config,
47+
s3_additional_kwargs=s3_additional_kwargs)
4448
else:
45-
return s3fs.S3FileSystem(config_kwargs=config)
49+
fs = s3fs.S3FileSystem(config_kwargs=config,
50+
s3_additional_kwargs=s3_additional_kwargs)
51+
return fs
4652

4753

4854
class S3:

awswrangler/session.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,23 +37,25 @@ def __init__(
3737
aws_session_token=None,
3838
region_name=None,
3939
botocore_max_retries=40,
40+
s3_additional_kwargs=None,
4041
spark_context=None,
4142
spark_session=None,
4243
procs_cpu_bound=os.cpu_count(),
4344
procs_io_bound=os.cpu_count() * PROCS_IO_BOUND_FACTOR,
4445
):
4546
"""
46-
Most parameters inherit from Boto3 ou Pyspark.
47+
Most parameters inherit from Boto3 or Pyspark.
4748
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
4849
https://spark.apache.org/docs/latest/api/python/index.html
4950
50-
:param boto3_session: Boto3.Session (Overwrite the others Boto3 parameters)
51+
:param boto3_session: Boto3.Session (Overwrite others Boto3 parameters)
5152
:param profile_name: Boto3 profile_name
5253
:param aws_access_key_id: Boto3 aws_access_key_id
5354
:param aws_secret_access_key: Boto3 aws_secret_access_key
5455
:param aws_session_token: Boto3 aws_session_token
5556
:param region_name: Boto3 region_name
5657
:param botocore_max_retries: Botocore max retries
58+
:param s3_additional_kwargs: Passed on to s3fs (https://s3fs.readthedocs.io/en/latest/#serverside-encryption)
5759
:param spark_context: Spark Context (pyspark.SparkContext)
5860
:param spark_session: Spark Session (pyspark.sql.SparkSession)
5961
:param procs_cpu_bound: number of processes that can be used in single
@@ -73,6 +75,7 @@ def __init__(
7375
retries={"max_attempts": self._botocore_max_retries})
7476
self._aws_session_token = aws_session_token
7577
self._region_name = boto3_session.region_name if boto3_session else region_name
78+
self._s3_additional_kwargs = s3_additional_kwargs
7679
self._spark_context = spark_context
7780
self._spark_session = spark_session
7881
self._procs_cpu_bound = procs_cpu_bound
@@ -125,6 +128,7 @@ def _load_new_primitives(self):
125128
aws_session_token=self._aws_session_token,
126129
region_name=self._region_name,
127130
botocore_max_retries=self._botocore_max_retries,
131+
s3_additional_kwargs=self._s3_additional_kwargs,
128132
botocore_config=self._botocore_config,
129133
procs_cpu_bound=self._procs_cpu_bound,
130134
procs_io_bound=self._procs_io_bound,
@@ -158,6 +162,10 @@ def botocore_max_retries(self):
158162
def botocore_config(self):
159163
return self._botocore_config
160164

165+
@property
166+
def s3_additional_kwargs(self):
167+
return self._s3_additional_kwargs
168+
161169
@property
162170
def spark_context(self):
163171
return self._spark_context
@@ -235,6 +243,7 @@ def __init__(
235243
aws_session_token=None,
236244
region_name=None,
237245
botocore_max_retries=None,
246+
s3_additional_kwargs=None,
238247
botocore_config=None,
239248
procs_cpu_bound=None,
240249
procs_io_bound=None,
@@ -249,6 +258,7 @@ def __init__(
249258
:param aws_session_token: Boto3 aws_session_token
250259
:param region_name: Boto3 region_name
251260
:param botocore_max_retries: Botocore max retries
261+
:param s3_additional_kwargs: Passed on to s3fs (https://s3fs.readthedocs.io/en/latest/#serverside-encryption)
252262
:param botocore_config: Botocore configurations
253263
:param procs_cpu_bound: number of processes that can be used in single
254264
node applications for CPU bound case (Default: os.cpu_count())
@@ -261,6 +271,7 @@ def __init__(
261271
self._aws_session_token = aws_session_token
262272
self._region_name = region_name
263273
self._botocore_max_retries = botocore_max_retries
274+
self._s3_additional_kwargs = s3_additional_kwargs
264275
self._botocore_config = botocore_config
265276
self._procs_cpu_bound = procs_cpu_bound
266277
self._procs_io_bound = procs_io_bound
@@ -293,6 +304,10 @@ def botocore_max_retries(self):
293304
def botocore_config(self):
294305
return self._botocore_config
295306

307+
@property
308+
def s3_additional_kwargs(self):
309+
return self._s3_additional_kwargs
310+
296311
@property
297312
def procs_cpu_bound(self):
298313
return self._procs_cpu_bound
@@ -314,6 +329,7 @@ def session(self):
314329
aws_session_token=self._aws_session_token,
315330
region_name=self._region_name,
316331
botocore_max_retries=self._botocore_max_retries,
332+
s3_additional_kwargs=self._s3_additional_kwargs,
317333
procs_cpu_bound=self._procs_cpu_bound,
318334
procs_io_bound=self._procs_io_bound,
319335
)

0 commit comments

Comments
 (0)