|
1 | 1 | import boto3
|
2 | 2 | import json
|
| 3 | +import hashlib |
| 4 | +import hmac |
3 | 5 | import logging
|
| 6 | +import os |
4 | 7 |
|
5 | 8 | import os.path as op
|
6 | 9 |
|
7 | 10 | from botocore.exceptions import ClientError
|
8 | 11 | from datetime import datetime, timedelta
|
9 |
| -from dateutil.parser import parse |
10 | 12 | from gzip import GzipFile
|
11 | 13 | from io import BytesIO
|
12 | 14 | from os import makedirs, getenv
|
@@ -85,18 +87,22 @@ def download(uri, path=''):
|
85 | 87 | return fout
|
86 | 88 |
|
87 | 89 |
|
88 |
| -def read_json(url): |
89 |
| - """ |
90 |
| - Download object from S3 as JSON |
91 |
| - """ |
| 90 | +def read(url): |
| 91 | + """ Read object from s3 """ |
92 | 92 | parts = urlparse(url)
|
93 | 93 | response = s3.get_object(Bucket=parts['bucket'], Key=parts['key'])
|
94 | 94 | body = response['Body'].read()
|
95 | 95 | if op.splitext(parts['key'])[1] == '.gz':
|
96 | 96 | body = GzipFile(None, 'rb', fileobj=BytesIO(body)).read()
|
97 |
| - return json.loads(body.decode('utf-8')) |
| 97 | + return body.decode('utf-8') |
98 | 98 |
|
99 | 99 |
|
| 100 | +def read_json(url): |
| 101 | + """ Download object from S3 as JSON """ |
| 102 | + return json.loads(read(url)) |
| 103 | + |
| 104 | + |
| 105 | +# function derived from https://alexwlchan.net/2018/01/listing-s3-keys-redux/ |
100 | 106 | def find(url, suffix=''):
|
101 | 107 | """
|
102 | 108 | Generate objects in an S3 bucket.
|
@@ -132,3 +138,125 @@ def find(url, suffix=''):
|
132 | 138 | kwargs['ContinuationToken'] = resp['NextContinuationToken']
|
133 | 139 | except KeyError:
|
134 | 140 | break
|
| 141 | + |
| 142 | + |
| 143 | +def latest_inventory(url, prefix=None, suffix=None, start_date=None, end_date=None, datetime_key='LastModifiedDate'): |
| 144 | + """ Return generator function for objects in Bucket with suffix (all files if suffix=None) """ |
| 145 | + parts = urlparse(url) |
| 146 | + # get latest manifest file |
| 147 | + today = datetime.now() |
| 148 | + manifest_key = None |
| 149 | + for dt in [today, today - timedelta(1)]: |
| 150 | + _key = op.join(parts['key'], dt.strftime('%Y-%m-%d')) |
| 151 | + _url = 's3://%s/%s' % (parts['bucket'], _key) |
| 152 | + keys = [k for k in find(_url, suffix='manifest.json')] |
| 153 | + if len(keys) == 1: |
| 154 | + manifest_key = keys[0] |
| 155 | + break |
| 156 | + # read through latest manifest looking for matches |
| 157 | + if manifest_key: |
| 158 | + _url = 's3://%s/%s' % (parts['bucket'], manifest_key) |
| 159 | + manifest = read_json(_url) |
| 160 | + # get file schema |
| 161 | + keys = [str(key).strip() for key in manifest['fileSchema'].split(',')] |
| 162 | + |
| 163 | + logger.info('Getting latest inventory from %s' % url) |
| 164 | + counter = 0 |
| 165 | + for f in manifest.get('files', []): |
| 166 | + _url = 's3://%s/%s' % (parts['bucket'], f['key']) |
| 167 | + inv = read(_url).split('\n') |
| 168 | + for line in inv: |
| 169 | + counter += 1 |
| 170 | + if counter % 100000 == 0: |
| 171 | + logger.debug('%s: Scanned %s records' % (datetime.now(), str(counter))) |
| 172 | + info = {keys[i]: v for i, v in enumerate(line.replace('"', '').split(','))} |
| 173 | + if 'Key' not in info: |
| 174 | + continue |
| 175 | + # skip to next if last modified date not between start_date and end_date |
| 176 | + dt = datetime.strptime(info[datetime_key], "%Y-%m-%dT%H:%M:%S.%fZ").date() |
| 177 | + if (start_date is not None and dt < start_date) or (end_date is not None and dt > end_date): |
| 178 | + continue |
| 179 | + if prefix is not None: |
| 180 | + # if path doesn't match provided prefix skip to next record |
| 181 | + if info['Key'][:len(prefix)] != prefix: |
| 182 | + continue |
| 183 | + if suffix is None or info['Key'].endswith(suffix): |
| 184 | + if 'Bucket' in keys and 'Key' in keys: |
| 185 | + info['url'] = 's3://%s/%s' % (info['Bucket'], info['Key']) |
| 186 | + yield info |
| 187 | + |
| 188 | + |
| 189 | +def get_presigned_url(url, aws_region=None, |
| 190 | + rtype='GET', public=False, requester_pays=False, content_type=None): |
| 191 | + """ Get presigned URL """ |
| 192 | + access_key = os.environ.get('AWS_BUCKET_ACCESS_KEY_ID', os.environ.get('AWS_ACCESS_KEY_ID')) |
| 193 | + secret_key = os.environ.get('AWS_BUCKET_SECRET_ACCESS_KEY', os.environ.get('AWS_SECRET_ACCESS_KEY')) |
| 194 | + region = os.environ.get('AWS_BUCKET_REGION', os.environ.get('AWS_REGION', 'eu-central-1')) |
| 195 | + if aws_region is not None: |
| 196 | + region = aws_region |
| 197 | + if access_key is None or secret_key is None: |
| 198 | + # if credentials not provided, just try to download without signed URL |
| 199 | + logger.debug('Not using signed URL for %s' % url) |
| 200 | + return url, None |
| 201 | + |
| 202 | + parts = urlparse(url) |
| 203 | + bucket = parts['bucket'] |
| 204 | + key = parts['key'] |
| 205 | + |
| 206 | + service = 's3' |
| 207 | + host = '%s.%s.amazonaws.com' % (bucket, service) |
| 208 | + request_parameters = '' |
| 209 | + |
| 210 | + # Key derivation functions. See: |
| 211 | + # http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python |
| 212 | + def sign(key, msg): |
| 213 | + return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest() |
| 214 | + |
| 215 | + def getSignatureKey(key, dateStamp, regionName, serviceName): |
| 216 | + kDate = sign(('AWS4' + key).encode('utf-8'), dateStamp) |
| 217 | + kRegion = sign(kDate, regionName) |
| 218 | + kService = sign(kRegion, serviceName) |
| 219 | + kSigning = sign(kService, 'aws4_request') |
| 220 | + return kSigning |
| 221 | + |
| 222 | + # Create a date for headers and the credential string |
| 223 | + t = datetime.utcnow() |
| 224 | + amzdate = t.strftime('%Y%m%dT%H%M%SZ') |
| 225 | + datestamp = t.strftime('%Y%m%d') # Date w/o time, used in credential scope |
| 226 | + |
| 227 | + # create signed request and headers |
| 228 | + canonical_uri = '/' + key |
| 229 | + canonical_querystring = request_parameters |
| 230 | + |
| 231 | + payload_hash = 'UNSIGNED-PAYLOAD' |
| 232 | + headers = { |
| 233 | + 'host': host, |
| 234 | + 'x-amz-content-sha256': payload_hash, |
| 235 | + 'x-amz-date': amzdate |
| 236 | + } |
| 237 | + |
| 238 | + if requester_pays: |
| 239 | + headers['x-amz-request-payer'] = 'requester' |
| 240 | + if public: |
| 241 | + headers['x-amz-acl'] = 'public-read' |
| 242 | + if os.environ.get('AWS_SESSION_TOKEN') and 'AWS_BUCKET_ACCESS_KEY_ID' not in os.environ: |
| 243 | + headers['x-amz-security-token'] = os.environ.get('AWS_SESSION_TOKEN') |
| 244 | + canonical_headers = '\n'.join('%s:%s' % (key, headers[key]) for key in sorted(headers)) + '\n' |
| 245 | + signed_headers = ';'.join(sorted(headers.keys())) |
| 246 | + |
| 247 | + canonical_request = '%s\n%s\n%s\n%s\n%s\n%s' % ( |
| 248 | + rtype, canonical_uri, canonical_querystring, canonical_headers, signed_headers, payload_hash |
| 249 | + ) |
| 250 | + algorithm = 'AWS4-HMAC-SHA256' |
| 251 | + credential_scope = datestamp + '/' + region + '/' + service + '/' + 'aws4_request' |
| 252 | + string_to_sign = algorithm + '\n' + amzdate + '\n' + credential_scope + '\n' + hashlib.sha256(canonical_request.encode('utf-8')).hexdigest() |
| 253 | + signing_key = getSignatureKey(secret_key, datestamp, region, service) |
| 254 | + signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest() |
| 255 | + authorization_header = algorithm + ' ' + 'Credential=' + access_key + '/' + credential_scope + ', ' \ |
| 256 | + + 'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature |
| 257 | + |
| 258 | + request_url = 'https://%s%s' % (host, canonical_uri) |
| 259 | + headers['Authorization'] = authorization_header |
| 260 | + if content_type is not None: |
| 261 | + headers['content-type'] = content_type |
| 262 | + return request_url, headers |
0 commit comments