@@ -518,7 +518,7 @@ class YAS3FS(LoggingMixIn, Operations):
518
518
def __init__ (self , options ):
519
519
# Some constants
520
520
### self.http_listen_path_length = 30
521
- self .download_running = True
521
+ self .running = True
522
522
self .check_status_interval = 5.0 # Seconds, no need to configure that
523
523
524
524
# Initialization
@@ -741,7 +741,7 @@ class YAS3FS(LoggingMixIn, Operations):
741
741
# Cleanup for unmount
742
742
logger .info ('File system unmount...' )
743
743
744
- self .download_running = False
744
+ self .running = False
745
745
746
746
if self .http_listen_thread :
747
747
self .httpd .shutdown () # To stop HTTP listen thread
@@ -924,7 +924,7 @@ class YAS3FS(LoggingMixIn, Operations):
924
924
logger .info ("num_entries, mem_size, disk_size, download_queue, prefetch_queue: %i, %i, %i, %i, %i"
925
925
% (num_entries , mem_size , disk_size , self .download_queue .qsize (), self .prefetch_queue .qsize ()))
926
926
927
- if self .download_running :
927
+ if self .running :
928
928
for i in self .download_threads .keys ():
929
929
if not self .download_threads [i ].is_alive ():
930
930
logger .debug ("Download thread restarted!" )
@@ -992,7 +992,8 @@ class YAS3FS(LoggingMixIn, Operations):
992
992
(parent_path , dir ) = os .path .split (path )
993
993
logger .debug ("parent_path '%s'" % (parent_path ))
994
994
with self .cache .get_lock (path ):
995
- dirs = self .cache .get (parent_path , 'readdir' )
995
+ # dirs = self.cache.get(parent_path, 'readdir')
996
+ dirs = self .readdir (parent_path )
996
997
if dirs != None and dirs .count (dir ) > 0 :
997
998
dirs .remove (dir )
998
999
@@ -1017,16 +1018,27 @@ class YAS3FS(LoggingMixIn, Operations):
1017
1018
if key :
1018
1019
logger .debug ("get_key from cache '%s'" % (path ))
1019
1020
return key
1020
- logger .debug ("get_key from S3 #1 '%s'" % (path ))
1021
- key = self .s3_bucket .get_key (self .join_prefix (path ))
1022
- if not key and path != '/' :
1023
- full_path = path + '/'
1024
- logger .debug ("get_key from S3 #2 '%s' '%s'" % (path , full_path ))
1025
- key = self .s3_bucket .get_key (self .join_prefix (full_path ))
1026
- if key :
1027
- logger .debug ("get_key to cache '%s'" % (path ))
1028
- self .cache .set (path , 'key' , key )
1021
+ look_on_S3 = False
1022
+ if path == '/' :
1023
+ look_on_S3 = True
1029
1024
else :
1025
+ (parent_path , file ) = os .path .split (path )
1026
+ dirs = self .readdir (parent_path )
1027
+ if file in dirs : # We know it can be found on S3
1028
+ look_on_S3 = True
1029
+ if look_on_S3 :
1030
+ logger .debug ("get_key from S3 #1 '%s'" % (path ))
1031
+ key = self .s3_bucket .get_key (self .join_prefix (path ))
1032
+ if not key and path != '/' :
1033
+ full_path = path + '/'
1034
+ logger .debug ("get_key from S3 #2 '%s' '%s'" % (path , full_path ))
1035
+ key = self .s3_bucket .get_key (self .join_prefix (full_path ))
1036
+ if key :
1037
+ logger .debug ("get_key to cache '%s'" % (path ))
1038
+ self .cache .set (path , 'key' , key )
1039
+ else :
1040
+ logger .debug ("get_key not on S3 '%s'" % (path ))
1041
+ if not key :
1030
1042
logger .debug ("get_key no '%s'" % (path ))
1031
1043
return key
1032
1044
@@ -1114,7 +1126,7 @@ class YAS3FS(LoggingMixIn, Operations):
1114
1126
if not (metadata_name == 'attr' and k == 'st_size' )]) # For the size use the key.size
1115
1127
key .metadata [metadata_name ] = s
1116
1128
if (not data ) or (data and (not data .has ('change' ))):
1117
- logger .debug ("writing metadata '%s' '%s'" % (path , key ))
1129
+ logger .debug ("writing metadata '%s' '%s' S3 " % (path , key ))
1118
1130
md = key .metadata
1119
1131
md ['Content-Type' ] = key .content_type # Otherwise we loose the Content-Type with S3 Copy
1120
1132
key .copy (key .bucket .name , key .name , md , preserve_acl = False ) # Do I need to preserve ACL?
@@ -1211,16 +1223,19 @@ class YAS3FS(LoggingMixIn, Operations):
1211
1223
full_path = path + '/'
1212
1224
else :
1213
1225
full_path = path # To manage '/' with an empty s3_prefix
1214
- if path != '/' or self .write_metadata :
1215
- k .key = self .join_prefix (full_path )
1216
- logger .debug ("mkdir '%s' '%s' '%s' S3" % (path , mode , k ))
1217
- k .set_contents_from_string ('' , headers = {'Content-Type' : 'application/x-directory' })
1218
1226
self .cache .set (path , 'key' , k )
1219
- data .delete ('change' )
1220
1227
if path != '/' :
1221
1228
self .cache .set (path , 'readdir' , ['.' , '..' ]) # the directory is empty
1222
1229
self .add_to_parent_readdir (path )
1230
+
1231
+ if path != '/' or self .write_metadata :
1232
+ k .key = self .join_prefix (full_path )
1233
+ logger .debug ("mkdir '%s' '%s' '%s' S3" % (path , mode , k ))
1234
+ k .set_contents_from_string ('' , headers = {'Content-Type' : 'application/x-directory' })
1235
+ data .delete ('change' )
1236
+ if path != '/' : ### Do I need this???
1223
1237
self .publish (['mkdir' , path ])
1238
+
1224
1239
return 0
1225
1240
1226
1241
def symlink (self , path , link ):
@@ -1240,7 +1255,7 @@ class YAS3FS(LoggingMixIn, Operations):
1240
1255
attr ['st_ctime' ] = now
1241
1256
attr ['st_size' ] = 0
1242
1257
attr ['st_mode' ] = (stat .S_IFLNK | 0755 )
1243
- self .cache .delete (path )
1258
+ self .cache .delete (path )
1244
1259
self .cache .add (path )
1245
1260
if self .cache_on_disk > 0 :
1246
1261
data = FSData (self .cache , 'mem' , path ) # New files (almost) always cache in mem - is it ok ???
@@ -1255,12 +1270,14 @@ class YAS3FS(LoggingMixIn, Operations):
1255
1270
self .write (path , link , 0 )
1256
1271
data .close ()
1257
1272
k .key = self .join_prefix (path )
1258
- logger .debug ("symlink '%s' '%s' '%s' S3" % (path , link , k ))
1259
- k .set_contents_from_string (link , headers = {'Content-Type' : 'application/x-symlink' })
1260
1273
self .cache .set (path , 'key' , k )
1261
- data .delete ('change' )
1262
1274
self .add_to_parent_readdir (path )
1263
- self .publish (['symlink' , path ])
1275
+
1276
+ logger .debug ("symlink '%s' '%s' '%s' S3" % (path , link , k ))
1277
+ k .set_contents_from_string (link , headers = {'Content-Type' : 'application/x-symlink' })
1278
+ data .delete ('change' )
1279
+ self .publish (['symlink' , path ])
1280
+
1264
1281
return 0
1265
1282
1266
1283
def check_data (self , path ):
@@ -1323,7 +1340,7 @@ class YAS3FS(LoggingMixIn, Operations):
1323
1340
self .download_queue .put (option_list )
1324
1341
1325
1342
def download (self , prefetch = False ):
1326
- while self .download_running :
1343
+ while self .running :
1327
1344
try :
1328
1345
if prefetch :
1329
1346
(path , start , end ) = self .prefetch_queue .get (True , 1 ) # 1 second time-out
@@ -1469,16 +1486,22 @@ class YAS3FS(LoggingMixIn, Operations):
1469
1486
if not k :
1470
1487
logger .debug ("rmdir '%s' ENOENT" % (path ))
1471
1488
raise FuseOSError (errno .ENOENT )
1472
- full_path = self .join_prefix (path + '/' )
1473
- key_list = self .s3_bucket .list (full_path ) # Don't need to set a delimeter here
1474
- for l in key_list :
1475
- if l .name != full_path :
1476
- logger .debug ("rmdir '%s' ENOTEMPTY" % (path ))
1477
- raise FuseOSError (errno .ENOTEMPTY )
1478
- k .delete ()
1489
+ if len (self .readdir (path )) > 2 :
1490
+ logger .debug ("rmdir '%s' ENOTEMPTY" % (path ))
1491
+ raise FuseOSError (errno .ENOTEMPTY )
1492
+ #full_path = self.join_prefix(path + '/')
1493
+ #key_list = self.s3_bucket.list(full_path) # Don't need to set a delimeter here
1494
+ #for l in key_list:
1495
+ # if l.name != full_path:
1496
+ # logger.debug("rmdir '%s' ENOTEMPTY" % (path))
1497
+ # raise FuseOSError(errno.ENOTEMPTY)
1498
+
1499
+ logger .debug ("rmdir '%s' '%s' S3" % (path , k ))
1500
+ k .delete ()
1501
+ self .publish (['rmdir' , path ])
1502
+
1479
1503
self .cache .reset (path ) # Cache invaliation
1480
1504
self .remove_from_parent_readdir (path )
1481
- self .publish (['rmdir' , path ])
1482
1505
return 0
1483
1506
1484
1507
def truncate (self , path , size ):
@@ -1558,7 +1581,8 @@ class YAS3FS(LoggingMixIn, Operations):
1558
1581
md ['Content-Type' ] = key .content_type # Otherwise we loose the Content-Type with S3 Copy
1559
1582
key .copy (key .bucket .name , target , md , preserve_acl = False ) # Do I need to preserve ACL?
1560
1583
key .delete ()
1561
- self .publish (['rename' , source_path , target_path ])
1584
+ self .publish (['rename' , source_path , target_path ])
1585
+
1562
1586
self .remove_from_parent_readdir (path )
1563
1587
self .add_to_parent_readdir (new_path )
1564
1588
@@ -1605,10 +1629,12 @@ class YAS3FS(LoggingMixIn, Operations):
1605
1629
logger .debug ("unlink '%s' ENOENT" % (path ))
1606
1630
raise FuseOSError (errno .ENOENT )
1607
1631
if k :
1632
+ logger .debug ("unlink '%s' '%s' S3" % (path , k ))
1608
1633
k .delete ()
1634
+ self .publish (['unlink' , path ])
1635
+
1609
1636
self .cache .reset (path )
1610
1637
self .remove_from_parent_readdir (path )
1611
- self .publish (['unlink' , path ])
1612
1638
return 0
1613
1639
1614
1640
def create (self , path , mode , fi = None ):
@@ -1737,20 +1763,25 @@ class YAS3FS(LoggingMixIn, Operations):
1737
1763
old_size = 0
1738
1764
else :
1739
1765
old_size = k .size
1766
+
1740
1767
written = False
1741
1768
if self .multipart_num > 0 :
1742
1769
full_size = attr ['st_size' ]
1743
1770
if full_size > self .multipart_size :
1744
- k = self .multipart_upload (k .name , data , full_size ,
1745
- headers = {'Content-Type' : type }, metadata = k .metadata )
1746
- k = self .get_key (path , cache = False )
1771
+ logger .debug ("flush '%s' '%s' '%s' '%s' S3" % (path , fh , k , type ))
1772
+ new_k = self .multipart_upload (k .name , data , full_size ,
1773
+ headers = {'Content-Type' : type }, metadata = k .metadata )
1774
+ new_k = self .get_key (path , cache = False )
1775
+ etag = new_k .etag [1 :- 1 ]
1747
1776
written = True
1748
1777
if not written :
1749
1778
logger .debug ("flush '%s' '%s' '%s' '%s' S3" % (path , fh , k , type ))
1750
1779
k .set_contents_from_file (data .content , headers = {'Content-Type' : type })
1751
- data .update_etag (k .etag [1 :- 1 ])
1780
+ etag = k .etag [1 :- 1 ]
1781
+ data .update_etag (etag )
1752
1782
data .delete ('change' )
1753
- self .publish (['flush' , path , k .etag [1 :- 1 ]])
1783
+ self .publish (['flush' , path , etag ])
1784
+
1754
1785
return 0
1755
1786
1756
1787
def multipart_upload (self , key_path , data , full_size , headers , metadata ):
0 commit comments