15
15
16
16
logger = logging .getLogger (__name__ )
17
17
18
- # s3 client
19
- s3 = boto3 .client ('s3' )
20
18
19
+ class s3 (object ):
21
20
22
- def urlparse (url ):
23
- """ Split S3 URL into bucket, key, filename """
24
- if url [0 :5 ] != 's3://' :
25
- raise Exception ('Invalid S3 url %s' % url )
21
+ def __init__ (self , session = None ):
22
+ if session is None :
23
+ self .s3 = boto3 .client ('s3' )
24
+ else :
25
+ self .s3 = session .client ('s3' )
26
26
27
- url_obj = url .replace ('s3://' , '' ).split ('/' )
27
+ @classmethod
28
+ def urlparse (cls , url ):
29
+ """ Split S3 URL into bucket, key, filename """
30
+ if url [0 :5 ] != 's3://' :
31
+ raise Exception ('Invalid S3 url %s' % url )
28
32
29
- # remove empty items
30
- url_obj = list (filter (lambda x : x , url_obj ))
31
-
32
- return {
33
- 'bucket' : url_obj [0 ],
34
- 'key' : '/' .join (url_obj [1 :]),
35
- 'filename' : url_obj [- 1 ] if len (url_obj ) > 1 else ''
36
- }
37
-
38
-
39
- def s3_to_https (url , region = getenv ('AWS_REGION' , getenv ('AWS_DEFAULT_REGION' , 'us-east-1' ))):
40
- """ Convert an s3 URL to an s3 https URL """
41
- parts = urlparse (url )
42
- return 'https://%s.s3.%s.amazonaws.com/%s' % (parts ['bucket' ], region , parts ['key' ])
33
+ url_obj = url .replace ('s3://' , '' ).split ('/' )
43
34
35
+ # remove empty items
36
+ url_obj = list (filter (lambda x : x , url_obj ))
44
37
45
- def exists (url ):
46
- """ Check if this URL exists on S3 """
47
- parts = urlparse (url )
48
- try :
49
- s3 .head_object (Bucket = parts ['bucket' ], Key = parts ['key' ])
50
- return True
51
- except ClientError as exc :
52
- if exc .response ['Error' ]['Code' ] != '404' :
53
- raise
54
- return False
38
+ return {
39
+ 'bucket' : url_obj [0 ],
40
+ 'key' : '/' .join (url_obj [1 :]),
41
+ 'filename' : url_obj [- 1 ] if len (url_obj ) > 1 else ''
42
+ }
55
43
44
+ @classmethod
45
+ def s3_to_https (cls , url , region = getenv ('AWS_REGION' , getenv ('AWS_DEFAULT_REGION' , 'us-east-1' ))):
46
+ """ Convert an s3 URL to an s3 https URL """
47
+ parts = cls .urlparse (url )
48
+ return 'https://%s.s3.%s.amazonaws.com/%s' % (parts ['bucket' ], region , parts ['key' ])
56
49
57
- def upload (filename , uri , public = False , extra = {}):
58
- """ Upload object to S3 uri (bucket + prefix), keeping same base filename """
59
- logger .debug ("Uploading %s to %s" % (filename , uri ))
60
- s3_uri = urlparse (uri )
61
- uri_out = 's3://%s' % op .join (s3_uri ['bucket' ], s3_uri ['key' ])
62
- if public :
63
- extra ['ACL' ] = 'public-read'
64
- with open (filename , 'rb' ) as data :
65
- s3 .upload_fileobj (data , s3_uri ['bucket' ], s3_uri ['key' ], ExtraArgs = extra )
66
- return uri_out
67
-
68
-
69
- def download (uri , path = '' ):
70
- """
71
- Download object from S3
72
- :param uri: URI of object to download
73
- :param path: Output path
74
- """
75
- s3_uri = urlparse (uri )
76
- fout = op .join (path , s3_uri ['filename' ])
77
- logger .debug ("Downloading %s as %s" % (uri , fout ))
78
- if path != '' :
79
- makedirs (path , exist_ok = True )
80
-
81
- with open (fout , 'wb' ) as f :
82
- s3 .download_fileobj (
83
- Bucket = s3_uri ['bucket' ],
84
- Key = s3_uri ['key' ],
85
- Fileobj = f
86
- )
87
- return fout
88
-
89
-
90
- def read (url ):
91
- """ Read object from s3 """
92
- parts = urlparse (url )
93
- response = s3 .get_object (Bucket = parts ['bucket' ], Key = parts ['key' ])
94
- body = response ['Body' ].read ()
95
- if op .splitext (parts ['key' ])[1 ] == '.gz' :
96
- body = GzipFile (None , 'rb' , fileobj = BytesIO (body )).read ()
97
- return body .decode ('utf-8' )
98
-
99
-
100
- def read_json (url ):
101
- """ Download object from S3 as JSON """
102
- return json .loads (read (url ))
103
-
104
-
105
- # function derived from https://alexwlchan.net/2018/01/listing-s3-keys-redux/
106
- def find (url , suffix = '' ):
107
- """
108
- Generate objects in an S3 bucket.
109
- :param url: The beginning part of the URL to match (bucket + optional prefix)
110
- :param suffix: Only fetch objects whose keys end with this suffix.
111
- """
112
- parts = urlparse (url )
113
- kwargs = {'Bucket' : parts ['bucket' ]}
114
-
115
- # If the prefix is a single string (not a tuple of strings), we can
116
- # do the filtering directly in the S3 API.
117
- if isinstance (parts ['key' ], str ):
118
- kwargs ['Prefix' ] = parts ['key' ]
119
-
120
- while True :
121
- # The S3 API response is a large blob of metadata.
122
- # 'Contents' contains information about the listed objects.
123
- resp = s3 .list_objects_v2 (** kwargs )
124
- try :
125
- contents = resp ['Contents' ]
126
- except KeyError :
127
- return
128
-
129
- for obj in contents :
130
- key = obj ['Key' ]
131
- if key .startswith (parts ['key' ]) and key .endswith (suffix ):
132
- yield obj ['Key' ]
133
-
134
- # The S3 API is paginated, returning up to 1000 keys at a time.
135
- # Pass the continuation token into the next response, until we
136
- # reach the final page (when this field is missing).
50
+ def exists (self , url ):
51
+ """ Check if this URL exists on S3 """
52
+ parts = self .urlparse (url )
137
53
try :
138
- kwargs ['ContinuationToken' ] = resp ['NextContinuationToken' ]
139
- except KeyError :
140
- break
141
-
142
-
143
- def latest_inventory (url , prefix = None , suffix = None , start_date = None , end_date = None , datetime_key = 'LastModifiedDate' ):
144
- """ Return generator function for objects in Bucket with suffix (all files if suffix=None) """
145
- parts = urlparse (url )
146
- # get latest manifest file
147
- today = datetime .now ()
148
- manifest_key = None
149
- for dt in [today , today - timedelta (1 )]:
150
- _key = op .join (parts ['key' ], dt .strftime ('%Y-%m-%d' ))
151
- _url = 's3://%s/%s' % (parts ['bucket' ], _key )
152
- keys = [k for k in find (_url , suffix = 'manifest.json' )]
153
- if len (keys ) == 1 :
154
- manifest_key = keys [0 ]
155
- break
156
- # read through latest manifest looking for matches
157
- if manifest_key :
158
- _url = 's3://%s/%s' % (parts ['bucket' ], manifest_key )
159
- manifest = read_json (_url )
160
- # get file schema
161
- keys = [str (key ).strip () for key in manifest ['fileSchema' ].split (',' )]
162
-
163
- logger .info ('Getting latest inventory from %s' % url )
164
- counter = 0
165
- for f in manifest .get ('files' , []):
166
- _url = 's3://%s/%s' % (parts ['bucket' ], f ['key' ])
167
- inv = read (_url ).split ('\n ' )
168
- for line in inv :
169
- counter += 1
170
- if counter % 100000 == 0 :
171
- logger .debug ('%s: Scanned %s records' % (datetime .now (), str (counter )))
172
- info = {keys [i ]: v for i , v in enumerate (line .replace ('"' , '' ).split (',' ))}
173
- if 'Key' not in info :
174
- continue
175
- # skip to next if last modified date not between start_date and end_date
176
- dt = datetime .strptime (info [datetime_key ], "%Y-%m-%dT%H:%M:%S.%fZ" ).date ()
177
- if (start_date is not None and dt < start_date ) or (end_date is not None and dt > end_date ):
178
- continue
179
- if prefix is not None :
180
- # if path doesn't match provided prefix skip to next record
181
- if info ['Key' ][:len (prefix )] != prefix :
54
+ self .s3 .head_object (Bucket = parts ['bucket' ], Key = parts ['key' ])
55
+ return True
56
+ except ClientError as exc :
57
+ if exc .response ['Error' ]['Code' ] != '404' :
58
+ raise
59
+ return False
60
+
61
+ def upload (self , filename , url , public = False , extra = {}, http_url = False ):
62
+ """ Upload object to S3 uri (bucket + prefix), keeping same base filename """
63
+ logger .debug ("Uploading %s to %s" % (filename , url ))
64
+ parts = self .urlparse (url )
65
+ url_out = 's3://%s' % op .join (parts ['bucket' ], parts ['key' ])
66
+ if public :
67
+ extra ['ACL' ] = 'public-read'
68
+ with open (filename , 'rb' ) as data :
69
+ self .s3 .upload_fileobj (data , parts ['bucket' ], parts ['key' ], ExtraArgs = extra )
70
+ if http_url :
71
+ region = self .s3 .get_bucket_location (Bucket = parts ['bucket' ])['LocationConstraint' ]
72
+ return self .s3_to_https (url_out , region )
73
+ else :
74
+ return url_out
75
+
76
+ def download (self , uri , path = '' ):
77
+ """
78
+ Download object from S3
79
+ :param uri: URI of object to download
80
+ :param path: Output path
81
+ """
82
+ s3_uri = self .urlparse (uri )
83
+ fout = op .join (path , s3_uri ['filename' ])
84
+ logger .debug ("Downloading %s as %s" % (uri , fout ))
85
+ if path != '' :
86
+ makedirs (path , exist_ok = True )
87
+
88
+ with open (fout , 'wb' ) as f :
89
+ self .s3 .download_fileobj (
90
+ Bucket = s3_uri ['bucket' ],
91
+ Key = s3_uri ['key' ],
92
+ Fileobj = f
93
+ )
94
+ return fout
95
+
96
+ def read (self , url ):
97
+ """ Read object from s3 """
98
+ parts = self .urlparse (url )
99
+ response = self .s3 .get_object (Bucket = parts ['bucket' ], Key = parts ['key' ])
100
+ body = response ['Body' ].read ()
101
+ if op .splitext (parts ['key' ])[1 ] == '.gz' :
102
+ body = GzipFile (None , 'rb' , fileobj = BytesIO (body )).read ()
103
+ return body .decode ('utf-8' )
104
+
105
+ def read_json (self , url ):
106
+ """ Download object from S3 as JSON """
107
+ return json .loads (self .read (url ))
108
+
109
+
110
+ # function derived from https://alexwlchan.net/2018/01/listing-s3-keys-redux/
111
+ def find (self , url , suffix = '' ):
112
+ """
113
+ Generate objects in an S3 bucket.
114
+ :param url: The beginning part of the URL to match (bucket + optional prefix)
115
+ :param suffix: Only fetch objects whose keys end with this suffix.
116
+ """
117
+ parts = self .urlparse (url )
118
+ kwargs = {'Bucket' : parts ['bucket' ]}
119
+
120
+ # If the prefix is a single string (not a tuple of strings), we can
121
+ # do the filtering directly in the S3 API.
122
+ if isinstance (parts ['key' ], str ):
123
+ kwargs ['Prefix' ] = parts ['key' ]
124
+
125
+ while True :
126
+ # The S3 API response is a large blob of metadata.
127
+ # 'Contents' contains information about the listed objects.
128
+ resp = self .s3 .list_objects_v2 (** kwargs )
129
+ try :
130
+ contents = resp ['Contents' ]
131
+ except KeyError :
132
+ return
133
+
134
+ for obj in contents :
135
+ key = obj ['Key' ]
136
+ if key .startswith (parts ['key' ]) and key .endswith (suffix ):
137
+ yield f"s3://{ parts ['bucket' ]} /{ obj ['Key' ]} "
138
+
139
+ # The S3 API is paginated, returning up to 1000 keys at a time.
140
+ # Pass the continuation token into the next response, until we
141
+ # reach the final page (when this field is missing).
142
+ try :
143
+ kwargs ['ContinuationToken' ] = resp ['NextContinuationToken' ]
144
+ except KeyError :
145
+ break
146
+
147
+ def latest_inventory (self , url , prefix = None , suffix = None , start_date = None , end_date = None , datetime_key = 'LastModifiedDate' ):
148
+ """ Return generator function for objects in Bucket with suffix (all files if suffix=None) """
149
+ parts = self .urlparse (url )
150
+ # get latest manifest file
151
+ today = datetime .now ()
152
+ manifest_url = None
153
+ for dt in [today , today - timedelta (1 )]:
154
+ _key = op .join (parts ['key' ], dt .strftime ('%Y-%m-%d' ))
155
+ _url = 's3://%s/%s' % (parts ['bucket' ], _key )
156
+ manifests = [k for k in self .find (_url , suffix = 'manifest.json' )]
157
+ if len (manifests ) == 1 :
158
+ manifest_url = manifests [0 ]
159
+ break
160
+ # read through latest manifest looking for matches
161
+ if manifest_url :
162
+ manifest = self .read_json (manifest_url )
163
+ # get file schema
164
+ keys = [str (key ).strip () for key in manifest ['fileSchema' ].split (',' )]
165
+
166
+ logger .info ('Getting latest inventory from %s' % url )
167
+ counter = 0
168
+ for f in manifest .get ('files' , []):
169
+ _url = 's3://%s/%s' % (parts ['bucket' ], f ['key' ])
170
+ inv = self .read (_url ).split ('\n ' )
171
+ for line in inv :
172
+ counter += 1
173
+ if counter % 100000 == 0 :
174
+ logger .debug ('%s: Scanned %s records' % (datetime .now (), str (counter )))
175
+ info = {keys [i ]: v for i , v in enumerate (line .replace ('"' , '' ).split (',' ))}
176
+ if 'Key' not in info :
177
+ continue
178
+ # skip to next if last modified date not between start_date and end_date
179
+ dt = datetime .strptime (info [datetime_key ], "%Y-%m-%dT%H:%M:%S.%fZ" ).date ()
180
+ if (start_date is not None and dt < start_date ) or (end_date is not None and dt > end_date ):
182
181
continue
183
- if suffix is None or info ['Key' ].endswith (suffix ):
184
- if 'Bucket' in keys and 'Key' in keys :
185
- info ['url' ] = 's3://%s/%s' % (info ['Bucket' ], info ['Key' ])
186
- yield info
182
+ if prefix is not None :
183
+ # if path doesn't match provided prefix skip to next record
184
+ if info ['Key' ][:len (prefix )] != prefix :
185
+ continue
186
+ if suffix is None or info ['Key' ].endswith (suffix ):
187
+ if 'Bucket' in keys and 'Key' in keys :
188
+ info ['url' ] = 's3://%s/%s' % (info ['Bucket' ], info ['Key' ])
189
+ yield info
187
190
188
191
189
192
def get_presigned_url (url , aws_region = None ,
@@ -199,7 +202,7 @@ def get_presigned_url(url, aws_region=None,
199
202
logger .debug ('Not using signed URL for %s' % url )
200
203
return url , None
201
204
202
- parts = urlparse (url )
205
+ parts = s3 . urlparse (url )
203
206
bucket = parts ['bucket' ]
204
207
key = parts ['key' ]
205
208
0 commit comments