4
4
# --- BEGIN_HEADER ---
5
5
#
6
6
# transferfunctions - data transfer helper functions
7
- # Copyright (C) 2003-2023 The MiG Project lead by Brian Vinter
7
+ # Copyright (C) 2003-2024 The MiG Project lead by Brian Vinter
8
8
#
9
9
# This file is part of MiG.
10
10
#
38
38
from mig .shared .base import client_id_dir , mask_creds
39
39
from mig .shared .defaults import datatransfers_filename , user_keys_dir , \
40
40
transfer_output_dir
41
- from mig .shared .fileio import makedirs_rec , delete_file
41
+ from mig .shared .fileio import makedirs_rec , delete_file , acquire_file_lock , \
42
+ release_file_lock , remove_rec
42
43
from mig .shared .safeeval import subprocess_popen , subprocess_pipe
43
44
from mig .shared .serial import load , dump
44
45
45
46
default_key_type = 'rsa'
46
47
default_key_bits = 2048
47
48
48
49
50
+ def get_transfers_path (configuration , client_id ):
51
+ """Build the default transfers file path for client_id"""
52
+ return os .path .join (configuration .user_settings , client_id_dir (client_id ),
53
+ datatransfers_filename )
54
+
55
+
49
56
def get_status_dir (configuration , client_id , transfer_id = '' ):
50
57
"""Lookup the status directory for transfers on behalf of client_id.
51
58
The optional transfer_id is used to get the explicit status dir for that
@@ -65,7 +72,7 @@ def blind_pw(transfer_dict):
65
72
for target in ('lftp_src' , 'lftp_dst' ):
66
73
if transfer_dict .get (target , '' ):
67
74
replace_map [target ] = (
68
- r'(.*://[^:]*):[^@]+@(.*)' , r'\1:%s@\2' % hide_pw )
75
+ r'(.*://[^:]*):[^@]+@(.*)' , r'\1:%s@\2' % hide_pw )
69
76
blinded = mask_creds (
70
77
transfer_dict , masked_value = hide_pw , subst_map = replace_map )
71
78
return blinded
@@ -114,32 +121,58 @@ def build_keyitem_object(configuration, key_dict):
114
121
return key_obj
115
122
116
123
117
- def load_data_transfers (configuration , client_id ):
118
- """Find all data transfers owned by user"""
124
+ def lock_data_transfers (transfers_path , exclusive = True , blocking = True ):
125
+ """Lock per-user transfers index"""
126
+ transfers_lock_path = '%s.lock' % transfers_path
127
+ return acquire_file_lock (transfers_lock_path , exclusive = exclusive ,
128
+ blocking = blocking )
129
+
130
+
131
+ def unlock_data_transfers (transfers_lock ):
132
+ """Unlock per-user transfers index"""
133
+ return release_file_lock (transfers_lock )
134
+
135
+
136
+ def load_data_transfers (configuration , client_id , do_lock = True , blocking = True ):
137
+ """Find all data transfers owned by user with optional locking support
138
+ for synchronized access.
139
+ """
119
140
logger = configuration .logger
120
141
logger .debug ("load transfers for %s" % client_id )
142
+ transfers_path = get_transfers_path (configuration , client_id )
143
+ if do_lock :
144
+ flock = lock_data_transfers (transfers_path , exclusive = False ,
145
+ blocking = blocking )
146
+ if not blocking and not flock :
147
+ return (False , "could not lock+load saved data transfers for %s" %
148
+ client_id )
121
149
try :
122
- transfers_path = os .path .join (configuration .user_settings ,
123
- client_id_dir (client_id ),
124
- datatransfers_filename )
125
150
logger .debug ("load transfers from %s" % transfers_path )
126
151
if os .path .isfile (transfers_path ):
127
152
transfers = load (transfers_path )
128
153
else :
129
154
transfers = {}
130
155
except Exception as exc :
156
+ if do_lock :
157
+ unlock_data_transfers (flock )
131
158
return (False , "could not load saved data transfers: %s" % exc )
159
+ if do_lock :
160
+ unlock_data_transfers (flock )
132
161
return (True , transfers )
133
162
134
163
135
- def get_data_transfer (configuration , client_id , transfer_id , transfers = None ):
164
+ def get_data_transfer (configuration , client_id , transfer_id , transfers = None ,
165
+ do_lock = True , blocking = True ):
136
166
"""Helper to extract all details for a data transfer. The optional
137
167
transfers argument can be used to pass an already loaded dictionary of
138
- saved transfers to avoid reloading.
168
+ saved transfers to avoid reloading. In that case the caller might want to
169
+ hold the corresponding lock during the handling here to avoid races.
170
+ Locking is also generally supported for synchronized access.
139
171
"""
140
172
if transfers is None :
141
173
(load_status , transfers ) = load_data_transfers (configuration ,
142
- client_id )
174
+ client_id , do_lock ,
175
+ blocking )
143
176
if not load_status :
144
177
return (load_status , transfers )
145
178
transfer_dict = transfers .get (transfer_id , None )
@@ -150,18 +183,34 @@ def get_data_transfer(configuration, client_id, transfer_id, transfers=None):
150
183
151
184
152
185
def modify_data_transfers (configuration , client_id , transfer_dict , action ,
153
- transfers = None ):
186
+ transfers = None , do_lock = True , blocking = True ):
154
187
"""Modify data transfers with given action and transfer_dict for client_id.
155
188
In practice this a shared helper to add or remove transfers from the saved
156
189
data transfers. The optional transfers argument can be used to pass an
157
- already loaded dictionary of saved transfers to avoid reloading.
190
+ already loaded dictionary of saved transfers to avoid reloading. In that
191
+ case the caller might want to hold the corresponding lock during the
192
+ handling here to avoid races. Locking is also generally supported for
193
+ synchronized access.
158
194
"""
159
195
logger = configuration .logger
160
196
transfer_id = transfer_dict ['transfer_id' ]
197
+ transfers_path = get_transfers_path (configuration , client_id )
198
+ # Lock during entire load and save
199
+ if do_lock :
200
+ flock = lock_data_transfers (transfers_path , exclusive = True ,
201
+ blocking = blocking )
202
+ if not blocking and not flock :
203
+ return (False , "could not lock+update data transfers for %s" %
204
+ client_id )
205
+
161
206
if transfers is None :
207
+ # Load without repeated lock
162
208
(load_status , transfers ) = load_data_transfers (configuration ,
163
- client_id )
209
+ client_id ,
210
+ do_lock = False )
164
211
if not load_status :
212
+ if do_lock :
213
+ unlock_data_transfers (flock )
165
214
logger .error ("modify_data_transfers failed in load: %s" %
166
215
transfers )
167
216
return (load_status , transfers )
@@ -180,49 +229,61 @@ def modify_data_transfers(configuration, client_id, transfer_dict, action,
180
229
elif action == "delete" :
181
230
del transfers [transfer_id ]
182
231
else :
183
- return (False , "Invalid action %s on data transfers" % action )
232
+ if do_lock :
233
+ unlock_data_transfers (flock )
234
+ return (False , "Invalid action %s on data transfer %s" % (action ,
235
+ transfer_id ))
184
236
185
237
try :
186
- transfers_path = os .path .join (configuration .user_settings ,
187
- client_id_dir (client_id ),
188
- datatransfers_filename )
189
238
dump (transfers , transfers_path )
190
239
res_dir = get_status_dir (configuration , client_id , transfer_id )
191
240
makedirs_rec (res_dir , configuration )
192
241
except Exception as err :
242
+ if do_lock :
243
+ unlock_data_transfers (flock )
193
244
logger .error ("modify_data_transfers failed: %s" % err )
194
245
return (False , 'Error updating data transfers: %s' % err )
246
+ if do_lock :
247
+ unlock_data_transfers (flock )
195
248
return (True , transfer_id )
196
249
197
250
198
251
def create_data_transfer (configuration , client_id , transfer_dict ,
199
- transfers = None ):
252
+ transfers = None , do_lock = True , blocking = True ):
200
253
"""Create a new data transfer for client_id. The optional
201
254
transfers argument can be used to pass an already loaded dictionary of
202
- saved transfers to avoid reloading.
255
+ saved transfers to avoid reloading. In that case the caller might want to
256
+ hold the corresponding lock during the handling here to avoid races.
257
+ Locking is also generally supported for synchronized access.
203
258
"""
204
259
return modify_data_transfers (configuration , client_id , transfer_dict ,
205
- "create" , transfers )
260
+ "create" , transfers , do_lock , blocking )
206
261
207
262
208
263
def update_data_transfer (configuration , client_id , transfer_dict ,
209
- transfers = None ):
264
+ transfers = None , do_lock = True , blocking = True ):
210
265
"""Update existing data transfer for client_id. The optional transfers
211
266
argument can be used to pass an already loaded dictionary of saved
212
- transfers to avoid reloading.
267
+ transfers to avoid reloading. In that case the caller might want to
268
+ hold the corresponding lock during the handling here to avoid races.
269
+ Locking is also generally supported for synchronized access.
213
270
"""
214
271
return modify_data_transfers (configuration , client_id , transfer_dict ,
215
- "modify" , transfers )
272
+ "modify" , transfers , do_lock , blocking )
216
273
217
274
218
275
def delete_data_transfer (configuration , client_id , transfer_id ,
219
- transfers = None ):
276
+ transfers = None , do_lock = True , blocking = True ):
220
277
"""Delete an existing data transfer without checking ownership. The
221
278
optional transfers argument can be used to pass an already loaded
222
- dictionary of saved transfers to avoid reloading. """
279
+ dictionary of saved transfers to avoid reloading. In that case the caller
280
+ might want to hold the corresponding lock during the handling here to avoid
281
+ races.
282
+ Locking is also generally supported for synchronized access.
283
+ """
223
284
transfer_dict = {'transfer_id' : transfer_id }
224
285
return modify_data_transfers (configuration , client_id , transfer_dict ,
225
- "delete" , transfers )
286
+ "delete" , transfers , do_lock , blocking )
226
287
227
288
228
289
def load_user_keys (configuration , client_id ):
@@ -434,11 +495,93 @@ def kill_sub_pid(configuration, client_id, transfer_id, sub_pid, sig=9):
434
495
from mig .shared .conf import get_configuration_object
435
496
conf = get_configuration_object ()
436
497
print ("Unit testing transfer functions" )
498
+ # NOTE: use /tmp for testing
499
+ orig_user_settings = conf .user_settings
500
+ client , transfer = "testuser" , "testtransfer"
501
+ conf .user_settings = '/tmp/transferstest'
502
+ dummy_transfers_dir = os .path .join (conf .user_settings , client )
503
+ dummy_transfers_file = os .path .join (dummy_transfers_dir ,
504
+ datatransfers_filename )
505
+ makedirs_rec (dummy_transfers_dir , conf )
506
+ transfer_dict = {'transfer_id' : transfer }
507
+ dummypw = 'NotSoSecretDummy'
508
+ transfer_dict .update (
509
+ {'password' : dummypw ,
510
+ 'lftp_src' : 'sftp://john.doe:%s@nowhere.org/README' % dummypw ,
511
+ 'lftp_dst' : 'https://john.doe:%s@outerspace.org/' % dummypw ,
512
+ })
513
+ print ("=== user transfers dict mangling ===" )
514
+ (status , transfers ) = load_data_transfers (conf , client )
515
+ print ("initial transfers before create : %s" % transfers )
516
+ create_data_transfer (conf , client , transfer_dict )
517
+ (status , transfers ) = load_data_transfers (conf , client )
518
+ print ("transfers after create: %s" % transfers )
519
+ transfer_dict ['password' ] += "-UPDATED-NOW"
520
+ update_data_transfer (conf , client , transfer_dict )
521
+ (status , transfers ) = load_data_transfers (conf , client )
522
+ print ("transfers after update: %s" % transfers )
523
+ delete_data_transfer (conf , client , transfer )
524
+ (status , transfers ) = load_data_transfers (conf , client )
525
+ print ("transfers after delete: %s" % transfers )
526
+
527
+ print ("lock transfers file for testing prevented create access" )
528
+ dummy_lock = lock_data_transfers (dummy_transfers_file , exclusive = True )
529
+ for i in range (3 ):
530
+ print ("try creating transfer while locked (%d)" % i )
531
+ transfer_dict ['transfer_id' ] = '%s-%s' % (transfer , i )
532
+ (create_status , created_id ) = \
533
+ create_data_transfer (conf , client , transfer_dict ,
534
+ blocking = False )
535
+ print ("create transfer while locked status: %s" % create_status )
536
+ time .sleep (1 )
537
+ print ("unlock transfers file for testing restored create access" )
538
+ unlock_data_transfers (dummy_lock )
539
+ (status , transfers ) = load_data_transfers (conf , client )
540
+ print ("transfers after locked create attempts: %s" % transfers )
541
+ for i in range (3 ):
542
+ print ("try creating transfer while unlocked (%d)" % i )
543
+ transfer_dict ['transfer_id' ] = '%s-%s' % (transfer , i )
544
+ (create_status , created_id ) = \
545
+ create_data_transfer (conf , client , transfer_dict ,
546
+ blocking = False )
547
+ print ("create transfer while unlocked status: %s" % create_status )
548
+ time .sleep (1 )
549
+
550
+ (status , transfers ) = load_data_transfers (conf , client )
551
+ print ("transfers after unlocked create attempts: %s" % transfers )
552
+ print ("lock transfers file for testing prevented delete access" )
553
+ dummy_lock = lock_data_transfers (dummy_transfers_file , exclusive = True )
554
+ for i in range (3 ):
555
+ print ("try deleting transfer while locked (%d)" % i )
556
+ transfer_id = transfer_dict ['transfer_id' ] = '%s-%s' % (transfer , i )
557
+ (delete_status , delete_id ) = \
558
+ delete_data_transfer (conf , client , transfer_id ,
559
+ blocking = False )
560
+ print ("delete transfer while locked status: %s" % delete_status )
561
+ time .sleep (1 )
562
+
563
+ print ("unlock transfers file for testing restored delete access" )
564
+ unlock_data_transfers (dummy_lock )
565
+ (status , transfers ) = load_data_transfers (conf , client )
566
+ print ("transfers after locked delete attempts: %s" % transfers )
567
+ for i in range (3 ):
568
+ print ("try deleting transfer while unlocked (%d)" % i )
569
+ transfer_id = transfer_dict ['transfer_id' ] = '%s-%s' % (transfer , i )
570
+ (delete_status , delete_id ) = \
571
+ delete_data_transfer (conf , client , transfer_id ,
572
+ blocking = False )
573
+ print ("delete transfer while unlocked status: %s" % delete_status )
574
+ time .sleep (1 )
575
+
576
+ (status , transfers ) = load_data_transfers (conf , client )
577
+ print ("transfers after unlocked delete attempts: %s" % transfers )
578
+
579
+ remove_rec (dummy_transfers_dir , conf )
580
+
437
581
print ("=== sub pid functions ===" )
438
582
import multiprocessing
439
583
manager = multiprocessing .Manager ()
440
584
sub_procs_map = manager .dict ()
441
- client , transfer = "testuser" , "testtransfer"
442
585
sub_procs = sub_pid_list (conf , sub_procs_map , client , transfer )
443
586
print ("initial sub pids: %s" % sub_procs )
444
587
for pid in xrange (3 ):
@@ -476,13 +619,6 @@ def kill_sub_pid(configuration, client_id, transfer_id, sub_pid, sig=9):
476
619
print ("verify transfer worker is no longer found: %s" % verify_worker )
477
620
transfer_workers = all_worker_transfers (conf , workers_map )
478
621
print ("final transfer workers: %s" % transfer_workers )
479
- transfer_dict = {}
480
- dummypw = 'NotSoSecretDummy'
481
- transfer_dict .update (
482
- {'password' : dummypw ,
483
- 'lftp_src' : 'sftp://john.doe:%s@nowhere.org/README' % dummypw ,
484
- 'lftp_dst' : 'https://john.doe:%s@outerspace.org/' % dummypw ,
485
- })
486
622
487
623
print ("raw transfer dict:\n %s\n is blinded into:\n %s" %
488
624
(transfer_dict , blind_pw (transfer_dict )))
0 commit comments