25
25
26
26
import re
27
27
28
+ from tornado .ioloop import PeriodicCallback
29
+
28
30
# constant used as suffix to deal with directory objects
29
31
EMPTY_DIR_SUFFIX = '/.jupyter_drives_fix_dir'
30
32
33
+ # 15 minutes
34
+ CREDENTIALS_REFRESH = 15 * 60 * 1000
35
+
31
36
class JupyterDrivesManager ():
32
37
"""
33
38
Jupyter-drives manager class.
@@ -46,21 +51,12 @@ def __init__(self, config: traitlets.config.Config) -> None:
46
51
self ._client = httpx .AsyncClient ()
47
52
self ._content_managers = {}
48
53
self ._max_files_listed = 1025
54
+ self ._drives = None
49
55
50
56
# instate fsspec file system
51
57
self ._file_system = fsspec .filesystem (self ._config .provider , asynchronous = True )
52
58
53
- # initiate aiobotocore session if we are dealing with S3 drives
54
- if self ._config .provider == 's3' :
55
- if self ._config .access_key_id and self ._config .secret_access_key :
56
- self ._s3_clients = {}
57
- self ._s3_session = get_session ()
58
- self ._file_system = s3fs .S3FileSystem (anon = False , asynchronous = True , key = self ._config .access_key_id , secret = self ._config .secret_access_key , token = self ._config .session_token )
59
- else :
60
- raise tornado .web .HTTPError (
61
- status_code = httpx .codes .BAD_REQUEST ,
62
- reason = "No credentials specified. Please set them in your user jupyter_server_config file." ,
63
- )
59
+ self ._initialize_credentials_refresh ()
64
60
65
61
@property
66
62
def base_api_url (self ) -> str :
@@ -81,6 +77,83 @@ def per_page_argument(self) -> Optional[Tuple[str, int]]:
81
77
"""
82
78
return ("per_page" , 100 )
83
79
80
+ def _initialize_credentials_refresh (self ):
81
+ self ._drives_refresh_callback ()
82
+ if not self ._config .credentials_already_set :
83
+ self ._drives_refresh_timer = PeriodicCallback (
84
+ self ._drives_refresh_callback , CREDENTIALS_REFRESH
85
+ )
86
+ self ._drives_refresh_timer .start ()
87
+
88
+ def _drives_refresh_callback (self ):
89
+ self ._config .load_credentials ()
90
+ self ._initialize_s3_file_system ()
91
+ self ._initialize_drives ()
92
+ self ._initialize_content_managers ()
93
+
94
+ def _initialize_s3_file_system (self ):
95
+ # initiate aiobotocore session if we are dealing with S3 drives
96
+ if self ._config .provider == 's3' :
97
+ if self ._config .access_key_id and self ._config .secret_access_key :
98
+ self ._s3_session = get_session ()
99
+ self ._file_system = s3fs .S3FileSystem (
100
+ anon = False ,
101
+ asynchronous = True ,
102
+ key = self ._config .access_key_id ,
103
+ secret = self ._config .secret_access_key ,
104
+ token = self ._config .session_token ,
105
+ )
106
+ else :
107
+ raise tornado .web .HTTPError (
108
+ status_code = httpx .codes .BAD_REQUEST ,
109
+ reason = "No credentials specified. Please set them in your user jupyter_server_config file." ,
110
+ )
111
+
112
+ def _initialize_drives (self ):
113
+ if self ._config .provider == "s3" :
114
+ S3Drive = get_driver (Provider .S3 )
115
+ self ._drives = [S3Drive (self ._config .access_key_id , self ._config .secret_access_key , True , None , None , None , self ._config .session_token )]
116
+ elif self ._config .provider == 'gcs' :
117
+ GCSDrive = get_driver (Provider .GOOGLE_STORAGE )
118
+ self ._drives = [GCSDrive (self ._config .access_key_id , self ._config .secret_access_key )] # verfiy credentials needed
119
+
120
+ def _initialize_content_managers (self ):
121
+ for drive_name , content_manager in self ._content_managers .items ():
122
+ self ._initialize_content_manager (drive_name , content_manager ["provider" ], content_manager ["location" ])
123
+
124
+ def _initialize_content_manager (self , drive_name , provider , region = None ):
125
+ try :
126
+ if provider == 's3' :
127
+ if self ._config .session_token is None :
128
+ configuration = {
129
+ "aws_access_key_id" : self ._config .access_key_id ,
130
+ "aws_secret_access_key" : self ._config .secret_access_key ,
131
+ "aws_region" : region ,
132
+ }
133
+ else :
134
+ configuration = {
135
+ "aws_access_key_id" : self ._config .access_key_id ,
136
+ "aws_secret_access_key" : self ._config .secret_access_key ,
137
+ "aws_session_token" : self ._config .session_token ,
138
+ "aws_region" : region ,
139
+ }
140
+ store = obs .store .S3Store .from_url ("s3://" + drive_name + "/" , config = configuration )
141
+ elif provider == 'gcs' :
142
+ store = obs .store .GCSStore .from_url ("gs://" + drive_name + "/" , config = {}) # add gcs config
143
+ elif provider == 'http' :
144
+ store = obs .store .HTTPStore .from_url (drive_name , client_options = {}) # add http client config
145
+
146
+ self ._content_managers [drive_name ] = {
147
+ "store" : store ,
148
+ "location" : region ,
149
+ "provider" : provider ,
150
+ }
151
+ except Exception as e :
152
+ raise tornado .web .HTTPError (
153
+ status_code = httpx .codes .BAD_REQUEST ,
154
+ reason = f"The following error occured when initializing the content manager: { e } " ,
155
+ )
156
+
84
157
def set_listing_limit (self , new_limit ):
85
158
"""Set new limit for listing.
86
159
@@ -105,23 +178,21 @@ async def list_drives(self):
105
178
"""
106
179
data = []
107
180
if self ._config .access_key_id and self ._config .secret_access_key :
108
- if self ._config .provider == "s3" :
109
- S3Drive = get_driver (Provider .S3 )
110
- drives = [S3Drive (self ._config .access_key_id , self ._config .secret_access_key , True , None , None , None , self ._config .session_token )]
111
-
112
- elif self ._config .provider == 'gcs' :
113
- GCSDrive = get_driver (Provider .GOOGLE_STORAGE )
114
- drives = [GCSDrive (self ._config .access_key_id , self ._config .secret_access_key )] # verfiy credentials needed
115
-
116
- else :
181
+ if self ._drives is None :
117
182
raise tornado .web .HTTPError (
118
183
status_code = httpx .codes .NOT_IMPLEMENTED ,
119
184
reason = "Listing drives not supported for given provider." ,
120
185
)
121
186
122
187
results = []
123
- for drive in drives :
124
- results += drive .list_containers ()
188
+ for drive in self ._drives :
189
+ try :
190
+ results += drive .list_containers ()
191
+ except Exception as e :
192
+ raise tornado .web .HTTPError (
193
+ status_code = httpx .codes .BAD_REQUEST ,
194
+ reason = f"The following error occured when listing drives: { e } " ,
195
+ )
125
196
126
197
for result in results :
127
198
data .append (
@@ -150,42 +221,10 @@ async def mount_drive(self, drive_name, provider):
150
221
Args:
151
222
drive_name: name of drive to mount
152
223
"""
153
- try :
154
- # check if content manager doesn't already exist
155
- if drive_name not in self ._content_managers or self ._content_managers [drive_name ] is None :
156
- if provider == 's3' :
157
- # get region of drive
158
- region = await self ._get_drive_location (drive_name )
159
- if self ._config .session_token is None :
160
- configuration = {
161
- "aws_access_key_id" : self ._config .access_key_id ,
162
- "aws_secret_access_key" : self ._config .secret_access_key ,
163
- "aws_region" : region
164
- }
165
- else :
166
- configuration = {
167
- "aws_access_key_id" : self ._config .access_key_id ,
168
- "aws_secret_access_key" : self ._config .secret_access_key ,
169
- "aws_session_token" : self ._config .session_token ,
170
- "aws_region" : region
171
- }
172
- store = obs .store .S3Store .from_url ("s3://" + drive_name + "/" , config = configuration )
173
- elif provider == 'gcs' :
174
- store = obs .store .GCSStore .from_url ("gs://" + drive_name + "/" , config = {}) # add gcs config
175
- elif provider == 'http' :
176
- store = obs .store .HTTPStore .from_url (drive_name , client_options = {}) # add http client config
177
-
178
- self ._content_managers [drive_name ] = {
179
- "store" : store ,
180
- "location" : region
181
- }
182
-
183
- else :
184
- raise tornado .web .HTTPError (
185
- status_code = httpx .codes .CONFLICT ,
186
- reason = "Drive already mounted."
187
- )
188
-
224
+ try :
225
+ if provider == 's3' :
226
+ region = await self ._get_drive_location (drive_name )
227
+ self ._initialize_content_manager (drive_name , provider , region )
189
228
except Exception as e :
190
229
raise tornado .web .HTTPError (
191
230
status_code = httpx .codes .BAD_REQUEST ,
0 commit comments