8
8
# Case 5: arn:partition:service:region:account-id:resourcetype:resource
9
9
# Case 6: arn:partition:service:region:account-id:resourcetype:resource:qualifier
10
10
# Source: https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#genref-arns
11
+ from __future__ import annotations
12
+
11
13
import logging
12
14
import re
13
15
16
+ ARN_SEPARATOR_PATTERN = re .compile (r"[:/]" )
17
+ # Note: Each service can only have one of these, so these are definitely exceptions
18
+ EXCLUSION_LIST = {
19
+ "${ObjectName}" ,
20
+ "${RepositoryName}" ,
21
+ "${BucketName}" ,
22
+ "table/${TableName}" ,
23
+ "${BucketName}/${ObjectName}" ,
24
+ }
25
+
14
26
logger = logging .getLogger (__name__ )
15
27
16
28
17
29
# pylint: disable=too-many-instance-attributes
18
30
class ARN :
19
31
"""Class that helps to match ARN resource type formats neatly"""
20
32
21
- def __init__ (self , provided_arn ) :
33
+ def __init__ (self , provided_arn : str ) -> None :
22
34
self .arn = provided_arn
23
35
follows_arn_format = re .search (
24
36
r"^arn:([^:]*):([^:]*):([^:]*):([^:]*):(.+)$" , provided_arn
@@ -34,18 +46,20 @@ def __init__(self, provided_arn):
34
46
self .account = elements [4 ]
35
47
self .resource = elements [5 ]
36
48
except IndexError as error :
37
- raise Exception (f"The provided ARN is invalid. IndexError: { error } . Please provide a valid ARN." ) from error
49
+ raise Exception (
50
+ f"The provided ARN is invalid. IndexError: { error } . Please provide a valid ARN."
51
+ ) from error
38
52
if "/" in self .resource :
39
53
self .resource , self .resource_path = self .resource .split ("/" , 1 )
40
54
elif ":" in self .resource :
41
55
self .resource , self .resource_path = self .resource .split (":" , 1 )
42
56
self .resource_string = self ._resource_string ()
43
57
44
- def __repr__ (self ):
58
+ def __repr__ (self ) -> str :
45
59
return self .arn
46
60
47
61
# pylint: disable=too-many-return-statements
48
- def _resource_string (self ):
62
+ def _resource_string (self ) -> str :
49
63
"""
50
64
Given an ARN, return the string after the account ID, no matter the ARN format.
51
65
Return:
@@ -62,7 +76,7 @@ def _resource_string(self):
62
76
resource_string = ":" .join (split_arn [5 :])
63
77
return resource_string
64
78
65
- def same_resource_type (self , arn_in_database ) :
79
+ def same_resource_type (self , arn_in_database : str ) -> bool :
66
80
"""Given an arn, see if it has the same resource type"""
67
81
68
82
# 1. If the RAW ARN in the database is *, then it doesn't have a resource type
@@ -80,14 +94,16 @@ def same_resource_type(self, arn_in_database):
80
94
# Previously, this would fail and return empty results.
81
95
# Now it correctly returns the full list of matching ARNs and corresponding actions.
82
96
resource_type_arn_to_test = parse_arn_for_resource_type (self .arn )
83
- if resource_type_arn_to_test == '*' :
97
+ if resource_type_arn_to_test == "*" :
84
98
return True
85
99
86
100
# 4. Match patterns for complicated resource strings, leveraging the standardized format of the Raw ARN format
87
101
# table/${TableName} should not match `table/${TableName}/backup/${BackupName}`
88
102
resource_string_arn_in_database = get_resource_string (arn_in_database )
89
103
90
- split_resource_string_in_database = re .split (':|/' , resource_string_arn_in_database )
104
+ split_resource_string_in_database = re .split (
105
+ ARN_SEPARATOR_PATTERN , resource_string_arn_in_database
106
+ )
91
107
# logger.debug(str(split_resource_string_in_database))
92
108
arn_format_list = []
93
109
for elem in split_resource_string_in_database :
@@ -97,47 +113,47 @@ def same_resource_type(self, arn_in_database):
97
113
# If an element says something like ${TableName}, normalize it to an empty string
98
114
arn_format_list .append ("" )
99
115
100
- split_resource_string_to_test = re .split (':|/' , self .resource_string )
116
+ split_resource_string_to_test = re .split (
117
+ ARN_SEPARATOR_PATTERN , self .resource_string
118
+ )
101
119
# 4b: If we have a confusing resource string, the length of the split resource string list
102
120
# should at least be the same
103
121
# Again, table/${TableName} (len of 2) should not match `table/${TableName}/backup/${BackupName}` (len of 4)
104
122
# if len(split_resource_string_to_test) != len(arn_format_list):
105
123
# return False
106
124
107
- non_empty_arn_format_list = []
108
- for i in arn_format_list :
109
- if i != "" :
110
- non_empty_arn_format_list .append (i )
111
-
112
- lower_resource_string = list (map (lambda x :x .lower (),split_resource_string_to_test ))
113
- for i in non_empty_arn_format_list :
114
- if i .lower () not in lower_resource_string :
125
+ lower_resource_string = [x .lower () for x in split_resource_string_to_test ]
126
+ for elem in arn_format_list :
127
+ if elem and elem .lower () not in lower_resource_string :
115
128
return False
116
129
117
130
# 4c: See if the non-normalized fields match
118
- for i in range ( len ( arn_format_list ) ):
131
+ for idx , elem in enumerate ( arn_format_list ):
119
132
# If the field is not normalized to empty string, then make sure the resource type segments match
120
133
# So, using table/${TableName}/backup/${BackupName} as an example:
121
134
# table should match, backup should match,
122
135
# and length of the arn_format_list should be the same as split_resource_string_to_test
123
136
# If all conditions match, then the ARN format is the same.
124
- if arn_format_list [ i ] != "" :
125
- if arn_format_list [ i ] == split_resource_string_to_test [i ]:
137
+ if elem :
138
+ if elem == split_resource_string_to_test [idx ]:
126
139
pass
127
- elif split_resource_string_to_test [i ] == "*" :
140
+ elif split_resource_string_to_test [idx ] == "*" :
128
141
pass
129
142
else :
130
143
return False
131
144
132
145
# 4. Special type for S3 bucket objects and CodeCommit repos
133
- # Note: Each service can only have one of these, so these are definitely exceptions
134
- exclusion_list = ["${ObjectName}" , "${RepositoryName}" , "${BucketName}" , "table/${TableName}" , "${BucketName}/${ObjectName}" ]
135
146
resource_path_arn_in_database = elements [5 ]
136
- if resource_path_arn_in_database in exclusion_list :
137
- logger .debug ("Special type: %s" , resource_path_arn_in_database )
147
+ if resource_path_arn_in_database in EXCLUSION_LIST :
148
+ logger .debug (f "Special type: { resource_path_arn_in_database } " )
138
149
# handling special case table/${TableName}
139
- if resource_string_arn_in_database in ["table/${TableName}" , "${BucketName}" ]:
140
- return len (self .resource_string .split ('/' )) == len (elements [5 ].split ('/' ))
150
+ if resource_string_arn_in_database in (
151
+ "table/${TableName}" ,
152
+ "${BucketName}" ,
153
+ ):
154
+ return len (self .resource_string .split ("/" )) == len (
155
+ elements [5 ].split ("/" )
156
+ )
141
157
# If we've made it this far, then it is a special type
142
158
# return True
143
159
# Presence of / would mean it's an object in both so it matches
@@ -154,7 +170,7 @@ def same_resource_type(self, arn_in_database):
154
170
return True
155
171
156
172
157
- def parse_arn (arn ) :
173
+ def parse_arn (arn : str ) -> dict [ str , str ] :
158
174
"""
159
175
Given an ARN, split up the ARN into the ARN namespacing schema dictated by the AWS docs.
160
176
"""
@@ -167,53 +183,52 @@ def parse_arn(arn):
167
183
"region" : elements [3 ],
168
184
"account" : elements [4 ],
169
185
"resource" : elements [5 ],
170
- "resource_path" : None ,
186
+ "resource_path" : "" ,
171
187
}
172
188
except IndexError as error :
173
- raise Exception (f"IndexError: The provided ARN '{ arn } ' is invalid. Please provide a valid ARN." ) from error
189
+ raise Exception (
190
+ f"IndexError: The provided ARN '{ arn } ' is invalid. Please provide a valid ARN."
191
+ ) from error
174
192
if "/" in result ["resource" ]:
175
193
result ["resource" ], result ["resource_path" ] = result ["resource" ].split ("/" , 1 )
176
194
elif ":" in result ["resource" ]:
177
195
result ["resource" ], result ["resource_path" ] = result ["resource" ].split (":" , 1 )
178
196
return result
179
197
180
198
181
- def get_service_from_arn (arn ) :
182
- """Given an ARN string, return the service """
199
+ def get_service_from_arn (arn : str ) -> str :
200
+ """Given an ARN string, return the service"""
183
201
result = parse_arn (arn )
184
202
return result ["service" ]
185
203
186
204
187
- def get_region_from_arn (arn ) :
205
+ def get_region_from_arn (arn : str ) -> str :
188
206
"""Given an ARN, return the region in the ARN, if it is available. In certain cases like S3 it is not"""
189
207
result = parse_arn (arn )
190
208
# Support S3 buckets with no values under region
191
209
if result ["region" ] is None :
192
- result = ""
193
- else :
194
- result = result ["region" ]
195
- return result
210
+ return ""
211
+ return result ["region" ]
196
212
197
213
198
- def get_account_from_arn (arn ) :
214
+ def get_account_from_arn (arn : str ) -> str :
199
215
"""Given an ARN, return the account ID in the ARN, if it is available. In certain cases like S3 it is not"""
200
216
result = parse_arn (arn )
201
217
# Support S3 buckets with no values under account
202
218
if result ["account" ] is None :
203
- result = ""
204
- else :
205
- result = result ["account" ]
206
- return result
219
+ return ""
220
+ return result ["account" ]
207
221
208
222
209
- def get_resource_path_from_arn (arn ) :
223
+ def get_resource_path_from_arn (arn : str ) -> str | None :
210
224
"""Given an ARN, parse it according to ARN namespacing and return the resource path. See
211
- http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html for more details on ARN namespacing."""
225
+ http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html for more details on ARN namespacing.
226
+ """
212
227
result = parse_arn (arn )
213
228
return result ["resource_path" ]
214
229
215
230
216
- def get_resource_string (arn ) :
231
+ def get_resource_string (arn : str ) -> str :
217
232
"""
218
233
Given an ARN, return the string after the account ID, no matter the ARN format.
219
234
@@ -229,7 +244,7 @@ def get_resource_string(arn):
229
244
230
245
# In the meantime, we have to skip this pylint check (consider this as tech debt)
231
246
# pylint: disable=inconsistent-return-statements
232
- def parse_arn_for_resource_type (arn ) :
247
+ def parse_arn_for_resource_type (arn : str ) -> str | None :
233
248
"""
234
249
Parses the resource string (resourcetype/resource and other variants) and grab the resource type.
235
250
@@ -240,15 +255,17 @@ def parse_arn_for_resource_type(arn):
240
255
"""
241
256
split_arn = arn .split (":" )
242
257
resource_string = ":" .join (split_arn [5 :])
243
- split_resource = re .split ("/|:" , resource_string )
258
+ split_resource = re .split (ARN_SEPARATOR_PATTERN , resource_string )
244
259
if len (split_resource ) == 1 :
245
260
# logger.debug(f"split_resource length is 1: {str(split_resource)}")
246
261
pass
247
262
elif len (split_resource ) > 1 :
248
263
return split_resource [0 ]
249
264
265
+ return None
266
+
250
267
251
- def does_arn_match (arn_to_test , arn_in_database ) :
268
+ def does_arn_match (arn_to_test : str , arn_in_database : str ) -> bool :
252
269
"""
253
270
Given two ARNs, determine if they have the same resource type.
254
271
0 commit comments