@@ -22,7 +22,8 @@ use serde::{Deserialize, Serialize};
22
22
use std:: collections:: hash_map:: Entry ;
23
23
use std:: collections:: HashMap ;
24
24
use std:: collections:: { BTreeSet , HashSet } ;
25
- use std:: fs:: File ;
25
+ use std:: convert:: TryInto ;
26
+ use std:: fs:: { File , OpenOptions } ;
26
27
use std:: io:: Write ;
27
28
use std:: ops:: Bound :: Included ;
28
29
use std:: path:: { Path , PathBuf } ;
@@ -73,6 +74,11 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
73
74
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
74
75
static TIMEOUT : Duration = Duration :: from_secs ( 60 ) ;
75
76
77
+ // Taken from PG_CONTROL_MAX_SAFE_SIZE
78
+ const METADATA_MAX_SAFE_SIZE : usize = 512 ;
79
+ const METADATA_CHECKSUM_SIZE : usize = std:: mem:: size_of :: < u32 > ( ) ;
80
+ const METADATA_MAX_DATA_SIZE : usize = METADATA_MAX_SAFE_SIZE - METADATA_CHECKSUM_SIZE ;
81
+
76
82
// Metrics collected on operations on the storage repository.
77
83
lazy_static ! {
78
84
static ref STORAGE_TIME : HistogramVec = register_histogram_vec!(
@@ -135,7 +141,7 @@ impl Repository for LayeredRepository {
135
141
ancestor_timeline : None ,
136
142
ancestor_lsn : Lsn ( 0 ) ,
137
143
} ;
138
- Self :: save_metadata ( self . conf , timelineid, self . tenantid , & metadata) ?;
144
+ Self :: save_metadata ( self . conf , timelineid, self . tenantid , & metadata, true ) ?;
139
145
140
146
let timeline = LayeredTimeline :: new (
141
147
self . conf ,
@@ -180,7 +186,7 @@ impl Repository for LayeredRepository {
180
186
ancestor_lsn : start_lsn,
181
187
} ;
182
188
crashsafe_dir:: create_dir_all ( self . conf . timeline_path ( & dst, & self . tenantid ) ) ?;
183
- Self :: save_metadata ( self . conf , dst, self . tenantid , & metadata) ?;
189
+ Self :: save_metadata ( self . conf , dst, self . tenantid , & metadata, true ) ?;
184
190
185
191
info ! ( "branched timeline {} from {} at {}" , dst, src, start_lsn) ;
186
192
@@ -353,13 +359,36 @@ impl LayeredRepository {
353
359
timelineid : ZTimelineId ,
354
360
tenantid : ZTenantId ,
355
361
data : & TimelineMetadata ,
362
+ first_save : bool ,
356
363
) -> Result < PathBuf > {
357
- let path = conf. timeline_path ( & timelineid, & tenantid) . join ( "metadata" ) ;
358
- let mut file = File :: create ( & path) ?;
364
+ let timeline_path = conf. timeline_path ( & timelineid, & tenantid) ;
365
+ let path = timeline_path. join ( "metadata" ) ;
366
+ // use OpenOptions to ensure file presence is consistent with first_save
367
+ let mut file = OpenOptions :: new ( )
368
+ . write ( true )
369
+ . create_new ( first_save)
370
+ . open ( & path) ?;
359
371
360
372
info ! ( "saving metadata {}" , path. display( ) ) ;
361
373
362
- file. write_all ( & TimelineMetadata :: ser ( data) ?) ?;
374
+ let mut metadata_bytes = TimelineMetadata :: ser ( data) ?;
375
+
376
+ assert ! ( metadata_bytes. len( ) <= METADATA_MAX_DATA_SIZE ) ;
377
+ metadata_bytes. resize ( METADATA_MAX_SAFE_SIZE , 0u8 ) ;
378
+
379
+ let checksum = crc32c:: crc32c ( & metadata_bytes[ ..METADATA_MAX_DATA_SIZE ] ) ;
380
+ metadata_bytes[ METADATA_MAX_DATA_SIZE ..] . copy_from_slice ( & u32:: to_le_bytes ( checksum) ) ;
381
+
382
+ if file. write ( & metadata_bytes) ? != metadata_bytes. len ( ) {
383
+ bail ! ( "Could not write all the metadata bytes in a single call" ) ;
384
+ }
385
+ file. sync_all ( ) ?;
386
+
387
+ // fsync the parent directory to ensure the directory entry is durable
388
+ if first_save {
389
+ let timeline_dir = File :: open ( & timeline_path) ?;
390
+ timeline_dir. sync_all ( ) ?;
391
+ }
363
392
364
393
Ok ( path)
365
394
}
@@ -370,9 +399,18 @@ impl LayeredRepository {
370
399
tenantid : ZTenantId ,
371
400
) -> Result < TimelineMetadata > {
372
401
let path = conf. timeline_path ( & timelineid, & tenantid) . join ( "metadata" ) ;
373
- let data = std:: fs:: read ( & path) ?;
402
+ let metadata_bytes = std:: fs:: read ( & path) ?;
403
+ ensure ! ( metadata_bytes. len( ) == METADATA_MAX_SAFE_SIZE ) ;
404
+
405
+ let data = & metadata_bytes[ ..METADATA_MAX_DATA_SIZE ] ;
406
+ let calculated_checksum = crc32c:: crc32c ( & data) ;
374
407
375
- let data = TimelineMetadata :: des ( & data) ?;
408
+ let checksum_bytes: & [ u8 ; METADATA_CHECKSUM_SIZE ] =
409
+ metadata_bytes[ METADATA_MAX_DATA_SIZE ..] . try_into ( ) ?;
410
+ let expected_checksum = u32:: from_le_bytes ( * checksum_bytes) ;
411
+ ensure ! ( calculated_checksum == expected_checksum) ;
412
+
413
+ let data = TimelineMetadata :: des_prefix ( & data) ?;
376
414
assert ! ( data. disk_consistent_lsn. is_aligned( ) ) ;
377
415
378
416
Ok ( data)
@@ -1450,8 +1488,13 @@ impl LayeredTimeline {
1450
1488
ancestor_timeline : ancestor_timelineid,
1451
1489
ancestor_lsn : self . ancestor_lsn ,
1452
1490
} ;
1453
- let metadata_path =
1454
- LayeredRepository :: save_metadata ( self . conf , self . timelineid , self . tenantid , & metadata) ?;
1491
+ let metadata_path = LayeredRepository :: save_metadata (
1492
+ self . conf ,
1493
+ self . timelineid ,
1494
+ self . tenantid ,
1495
+ & metadata,
1496
+ false ,
1497
+ ) ?;
1455
1498
if let Some ( relish_uploader) = & self . relish_uploader {
1456
1499
relish_uploader. schedule_upload ( self . timelineid , metadata_path) ;
1457
1500
}
0 commit comments