1
- import boto3
2
1
import logging
3
- import typing
4
2
import time
3
+ import typing
5
4
import webbrowser
6
5
6
+ import boto3
7
7
from botocore .exceptions import ClientError
8
+
8
9
from redshift_connector .error import InterfaceError
9
- from redshift_connector .plugin .common_credentials_provider import CommonCredentialsProvider
10
+ from redshift_connector .plugin .common_credentials_provider import (
11
+ CommonCredentialsProvider ,
12
+ )
10
13
from redshift_connector .redshift_property import RedshiftProperty
11
14
12
15
logging .getLogger (__name__ ).addHandler (logging .NullHandler ())
@@ -18,10 +21,10 @@ class BrowserIdcAuthPlugin(CommonCredentialsProvider):
18
21
Class to get IdC Token using SSO OIDC APIs
19
22
"""
20
23
21
- DEFAULT_IDC_CLIENT_DISPLAY_NAME = ' Amazon Redshift Python connector'
22
- CLIENT_TYPE = ' public'
23
- GRANT_TYPE = ' urn:ietf:params:oauth:grant-type:device_code'
24
- IDC_SCOPE = ' redshift:connect'
24
+ DEFAULT_IDC_CLIENT_DISPLAY_NAME = " Amazon Redshift Python connector"
25
+ CLIENT_TYPE = " public"
26
+ GRANT_TYPE = " urn:ietf:params:oauth:grant-type:device_code"
27
+ IDC_SCOPE = " redshift:connect"
25
28
DEFAULT_BROWSER_AUTH_VERIFY_TIMEOUT_IN_SEC = 120
26
29
DEFAULT_CREATE_TOKEN_INTERVAL_IN_SEC = 1
27
30
@@ -32,11 +35,11 @@ def __init__(self: "BrowserIdcAuthPlugin") -> None:
32
35
self .register_client_cache : typing .Dict [str , dict ] = {}
33
36
self .start_url : typing .Optional [str ] = None
34
37
self .idc_region : typing .Optional [str ] = None
35
- self .sso_oidc_client : "SSOOIDC.Client" = None
38
+ self .sso_oidc_client : boto3 . client = None
36
39
37
40
def add_parameter (
38
- self : "BrowserIdcAuthPlugin" ,
39
- info : RedshiftProperty ,
41
+ self : "BrowserIdcAuthPlugin" ,
42
+ info : RedshiftProperty ,
40
43
) -> None :
41
44
"""
42
45
Adds parameters to the BrowserIdcAuthPlugin
@@ -65,11 +68,13 @@ def check_required_parameters(self: "BrowserIdcAuthPlugin") -> None:
65
68
if not self .start_url :
66
69
_logger .error ("IdC authentication failed: start_url needs to be provided in connection params" )
67
70
raise InterfaceError (
68
- "IdC authentication failed: The start URL must be included in the connection parameters." )
71
+ "IdC authentication failed: The start URL must be included in the connection parameters."
72
+ )
69
73
if not self .idc_region :
70
74
_logger .error ("IdC authentication failed: idc_region needs to be provided in connection params" )
71
75
raise InterfaceError (
72
- "IdC authentication failed: The IdC region must be included in the connection parameters." )
76
+ "IdC authentication failed: The IdC region must be included in the connection parameters."
77
+ )
73
78
74
79
def get_cache_key (self : "BrowserIdcAuthPlugin" ) -> str :
75
80
"""
@@ -93,16 +98,21 @@ def get_idc_token(self: "BrowserIdcAuthPlugin") -> str:
93
98
try :
94
99
self .check_required_parameters ()
95
100
96
- self .sso_oidc_client = boto3 .client (' sso-oidc' , region_name = self .idc_region )
101
+ self .sso_oidc_client = boto3 .client (" sso-oidc" , region_name = self .idc_region )
97
102
register_client_cache_key : str = f"{ self .idc_client_display_name } :{ self .idc_region } "
98
103
99
- register_client_result : typing .Dict [str , typing .Any ] = self .register_client (register_client_cache_key ,
100
- self .idc_client_display_name ,
101
- self .CLIENT_TYPE ,
102
- self .IDC_SCOPE )
104
+ register_client_result : typing .Dict [str , typing .Any ] = self .register_client (
105
+ register_client_cache_key ,
106
+ self .idc_client_display_name ,
107
+ typing .cast (str , self .CLIENT_TYPE ),
108
+ self .IDC_SCOPE ,
109
+ )
103
110
start_device_auth_result : typing .Dict [str , typing .Any ] = self .start_device_authorization (
104
- register_client_result ['clientId' ], register_client_result ['clientSecret' ], self .start_url )
105
- self .open_browser (start_device_auth_result ['verificationUriComplete' ])
111
+ register_client_result ["clientId" ],
112
+ register_client_result ["clientSecret" ],
113
+ typing .cast (str , self .start_url ),
114
+ )
115
+ self .open_browser (start_device_auth_result ["verificationUriComplete" ])
106
116
107
117
return self .poll_for_create_token (register_client_result , start_device_auth_result , self .GRANT_TYPE )
108
118
except InterfaceError as e :
@@ -111,8 +121,9 @@ def get_idc_token(self: "BrowserIdcAuthPlugin") -> str:
111
121
_logger .debug ("An error occurred while trying to obtain an IdC token : {}" .format (str (e )))
112
122
raise InterfaceError ("There was an error during authentication." )
113
123
114
- def register_client (self : "BrowserIdcAuthPlugin" , register_client_cache_key : str , client_name : str ,
115
- client_type : str , scope : str ) -> typing .Dict [str , typing .Any ]:
124
+ def register_client ( # type: ignore
125
+ self : "BrowserIdcAuthPlugin" , register_client_cache_key : str , client_name : str , client_type : str , scope : str
126
+ ) -> typing .Dict [str , typing .Any ]:
116
127
"""
117
128
Registers the client with IdC.
118
129
:param register_client_cache_key: str
@@ -126,21 +137,25 @@ def register_client(self: "BrowserIdcAuthPlugin", register_client_cache_key: str
126
137
:return: dict
127
138
The register client result from IdC
128
139
"""
129
- if register_client_cache_key in self .register_client_cache and \
130
- self .register_client_cache [register_client_cache_key ]['clientSecretExpiresAt' ] > time .time ():
140
+ if (
141
+ register_client_cache_key in self .register_client_cache
142
+ and self .register_client_cache [register_client_cache_key ]["clientSecretExpiresAt" ] > time .time ()
143
+ ):
131
144
_logger .debug ("Valid registerClient result found from cache" )
132
145
return self .register_client_cache [register_client_cache_key ]
133
146
134
147
try :
135
148
register_client_result : typing .Dict [str , typing .Any ] = self .sso_oidc_client .register_client (
136
- clientName = client_name , clientType = client_type , scopes = [scope ])
149
+ clientName = client_name , clientType = client_type , scopes = [scope ]
150
+ )
137
151
self .register_client_cache [register_client_cache_key ] = register_client_result
138
152
return register_client_result
139
153
except ClientError as e :
140
154
self .handle_error (e , "registering client with IdC" )
141
155
142
- def start_device_authorization (self : "BrowserIdcAuthPlugin" , client_id : str , client_secret : str ,
143
- start_url : str ) -> typing .Dict [str , typing .Any ]:
156
+ def start_device_authorization ( # type: ignore
157
+ self : "BrowserIdcAuthPlugin" , client_id : str , client_secret : str , start_url : str
158
+ ) -> typing .Dict [str , typing .Any ]:
144
159
"""
145
160
Starts device authorization flow with IdC.
146
161
:param client_id: str
@@ -154,9 +169,7 @@ def start_device_authorization(self: "BrowserIdcAuthPlugin", client_id: str, cli
154
169
"""
155
170
try :
156
171
response : typing .Dict [str , typing .Any ] = self .sso_oidc_client .start_device_authorization (
157
- clientId = client_id ,
158
- clientSecret = client_secret ,
159
- startUrl = start_url
172
+ clientId = client_id , clientSecret = client_secret , startUrl = start_url
160
173
)
161
174
return response
162
175
except ClientError as e :
@@ -173,9 +186,12 @@ def open_browser(self: "BrowserIdcAuthPlugin", url: str) -> None:
173
186
self .validate_url (url )
174
187
webbrowser .open (url )
175
188
176
- def poll_for_create_token (self : "BrowserIdcAuthPlugin" ,
177
- register_client_result : typing .Dict [str , typing .Any ],
178
- start_device_auth_result : typing .Dict [str , typing .Any ], grant_type : str ) -> str :
189
+ def poll_for_create_token (
190
+ self : "BrowserIdcAuthPlugin" ,
191
+ register_client_result : typing .Dict [str , typing .Any ],
192
+ start_device_auth_result : typing .Dict [str , typing .Any ],
193
+ grant_type : str ,
194
+ ) -> str :
179
195
"""
180
196
Polls for IdC access token using SSO OIDC APIs.
181
197
:param register_client_result: dict
@@ -191,22 +207,22 @@ def poll_for_create_token(self: "BrowserIdcAuthPlugin",
191
207
polling_end_time : float = time .time () + self .idc_response_timeout
192
208
193
209
polling_interval_in_sec : int = self .DEFAULT_CREATE_TOKEN_INTERVAL_IN_SEC
194
- if start_device_auth_result [' interval' ]:
195
- polling_interval_in_sec = start_device_auth_result [' interval' ]
210
+ if start_device_auth_result [" interval" ]:
211
+ polling_interval_in_sec = start_device_auth_result [" interval" ]
196
212
197
213
while time .time () < polling_end_time :
198
214
try :
199
215
response : typing .Dict [str , typing .Any ] = self .sso_oidc_client .create_token (
200
- clientId = register_client_result [' clientId' ],
201
- clientSecret = register_client_result [' clientSecret' ],
216
+ clientId = register_client_result [" clientId" ],
217
+ clientSecret = register_client_result [" clientSecret" ],
202
218
grantType = grant_type ,
203
- deviceCode = start_device_auth_result [' deviceCode' ]
219
+ deviceCode = start_device_auth_result [" deviceCode" ],
204
220
)
205
- if not response [' accessToken' ]:
221
+ if not response [" accessToken" ]:
206
222
raise InterfaceError ("IdC authentication failed : The credential token couldn't be created." )
207
- return response [' accessToken' ]
223
+ return response [" accessToken" ]
208
224
except ClientError as e :
209
- if e .response [' Error' ][ ' Code' ] == ' AuthorizationPendingException' :
225
+ if e .response [" Error" ][ " Code" ] == " AuthorizationPendingException" :
210
226
_logger .debug ("Browser authorization pending from user" )
211
227
time .sleep (polling_interval_in_sec )
212
228
else :
@@ -225,11 +241,11 @@ def handle_error(self: "BrowserIdcAuthPlugin", e: ClientError, operation: str) -
225
241
:raises InterfaceError: A client error to be returned to the user with appropriate error message
226
242
"""
227
243
_logger .debug ("Error response = {} " .format (e .response ))
228
- error_message = e .response [' Error' ][ ' Message' ]
244
+ error_message = e .response [" Error" ][ " Message" ]
229
245
if not error_message :
230
- error_message = e . response [ 'error_description' ] if e . response [
231
- ' error_description' ] else "Something unexpected happened"
232
- error_code = e . response [ 'Error' ][ 'Code' ]
233
- _logger . debug (
234
- "An error occurred while {}: ClientError = {} - {}" .format (operation , error_code , error_message ))
246
+ error_message = (
247
+ e . response [ " error_description" ] if e . response [ "error_description" ] else "Something unexpected happened"
248
+ )
249
+ error_code = e . response [ "Error" ][ "Code" ]
250
+ _logger . debug ( "An error occurred while {}: ClientError = {} - {}" .format (operation , error_code , error_message ))
235
251
raise InterfaceError ("IdC authentication failed : An error occurred during the request." )
0 commit comments