Skip to content

Commit a7170e5

Browse files
authored
expose piece-writing function which expects caller to bin pack (#935)
* feat(pieces): add write_and_preprocess to API - comment the behavior of the add_piece function - add write_and_preprocess function, which will ultimately replace add_piece * fix(typo): fix typo in comment * feat(commp): generate comm_p while writing and preprocessing
1 parent 6629732 commit a7170e5

File tree

3 files changed

+144
-26
lines changed

3 files changed

+144
-26
lines changed

filecoin-proofs/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ tar = "0.4.26"
4141
rayon = "1.1.0"
4242
blake2s_simd = "0.5.8"
4343
hex = "0.4.0"
44+
tee = "0.1.0"
45+
os_pipe = "0.9.1"
4446

4547
[dependencies.reqwest]
4648
version = "0.9"

filecoin-proofs/src/api/mod.rs

Lines changed: 139 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use std::path::{Path, PathBuf};
44

55
use storage_proofs::drgraph::DefaultTreeHasher;
66
use storage_proofs::hasher::Hasher;
7-
use storage_proofs::pieces::generate_piece_commitment_bytes_from_source;
87
use storage_proofs::porep::PoRep;
98
use storage_proofs::sector::SectorId;
109
use storage_proofs::stacked::{generate_replica_id, StackedDrg};
@@ -30,6 +29,8 @@ pub(crate) mod util;
3029

3130
pub use self::post::*;
3231
pub use self::seal::*;
32+
use std::io;
33+
use storage_proofs::pieces::generate_piece_commitment_bytes_from_source;
3334

3435
/// Unseals the sector at `sealed_path` and returns the bytes for a piece
3536
/// whose first (unpadded) byte begins at `offset` and ends at `offset` plus
@@ -75,62 +76,176 @@ pub fn get_unsealed_range<T: Into<PathBuf> + AsRef<Path>>(
7576
Ok(UnpaddedBytesAmount(written as u64))
7677
}
7778

78-
// Takes a piece and the size of the piece and returns the comm_p.
79+
// Generates a piece commitment for the provided byte source. Returns an error
80+
// if the byte source produced more than `piece_size` bytes.
7981
pub fn generate_piece_commitment<T: std::io::Read>(
80-
unpadded_piece_file: T,
81-
unpadded_piece_size: UnpaddedBytesAmount,
82+
source: T,
83+
piece_size: UnpaddedBytesAmount,
8284
) -> error::Result<PieceInfo> {
83-
let mut padded_piece_file = tempfile()?;
84-
let unpadded_piece_size_with_alignment = add_piece(
85-
unpadded_piece_file,
86-
&mut padded_piece_file,
87-
unpadded_piece_size,
88-
&[],
89-
)?;
85+
ensure_piece_size(piece_size)?;
86+
87+
let mut temp_piece_file = tempfile()?;
88+
89+
// send the source through the preprocessor, writing output to temp file
90+
let n = write_padded(source, &temp_piece_file)
91+
.map_err(|err| format_err!("failed to write and preprocess bytes: {:?}", err))?;
92+
93+
let n = UnpaddedBytesAmount(n as u64);
94+
95+
if n != piece_size {
96+
return Err(format_err!(
97+
"wrote more bytes ({:?}) than expected ({:?}) when preprocessing",
98+
n,
99+
piece_size
100+
));
101+
}
90102

91-
let _ = padded_piece_file.seek(SeekFrom::Start(0))?;
103+
temp_piece_file.seek(SeekFrom::Start(0))?;
92104

93105
let commitment =
94-
generate_piece_commitment_bytes_from_source::<DefaultPieceHasher>(&mut padded_piece_file)?;
106+
generate_piece_commitment_bytes_from_source::<DefaultPieceHasher>(&mut temp_piece_file)?;
95107

96108
Ok(PieceInfo {
97109
commitment,
98-
size: unpadded_piece_size_with_alignment,
110+
size: piece_size,
99111
})
100112
}
101113

102-
/// Write a piece. Returns the unpadded, but aligned size of the piece.
114+
/// Computes a NUL-byte prefix and/or suffix for `source` using the provided
115+
/// `piece_lengths` and `piece_size` (such that the `source`, after
116+
/// preprocessing, will occupy a subtree of a merkle tree built using the bytes
117+
/// from `target`), runs the resultant byte stream through the preprocessor,
118+
/// and writes the result to `target`. Returns a tuple containing the number of
119+
/// bytes written to `target` (`source` plus alignment) and the commitment.
120+
///
121+
/// WARNING: Depending on the ordering and size of the pieces in
122+
/// `piece_lengths`, this function could write a prefix of NUL bytes which
123+
/// wastes ($SIZESECTORSIZE/2)-$MINIMUM_PIECE_SIZE space. This function will be
124+
/// deprecated in favor of `write_and_preprocess`, and miners will be prevented
125+
/// from sealing sectors containing more than $TOOMUCH alignment bytes.
103126
pub fn add_piece<R, W>(
104127
source: R,
105128
target: W,
106129
piece_size: UnpaddedBytesAmount,
107130
piece_lengths: &[UnpaddedBytesAmount],
108-
) -> error::Result<UnpaddedBytesAmount>
131+
) -> error::Result<(UnpaddedBytesAmount, Commitment)>
109132
where
110133
R: Read,
111134
W: Read + Write + Seek,
112135
{
136+
ensure_piece_size(piece_size)?;
137+
138+
let (aligned_source_size, alignment, aligned_source) =
139+
get_aligned_source(source, &piece_lengths, piece_size);
140+
141+
// allows us to tee the source byte stream
142+
let (mut pipe_r, pipe_w) =
143+
os_pipe::pipe().map_err(|err| format_err!("failed to create pipe: {:?}", err))?;
144+
145+
// all bytes read from the TeeReader are written to its writer, no bytes
146+
// will be read from the TeeReader before they are written to its writer
147+
let tee_r = tee::TeeReader::new(aligned_source, pipe_w);
148+
149+
// reads from tee_r block until the tee's source bytes can be written to its
150+
// writer, so to prevent write_padded from blocking indefinitely, we need
151+
// to spin up a separate thread (to read from the pipe which receives writes
152+
// from the TeeReader)
153+
let t_handle = std::thread::spawn(move || {
154+
// discard n left-alignment bytes
155+
let n = alignment.left_bytes.into();
156+
io::copy(&mut pipe_r.by_ref().take(n), &mut io::sink())
157+
.map_err(|err| format_err!("failed to skip alignment bytes: {:?}", err))?;
158+
159+
// generate commitment for piece bytes
160+
let result =
161+
generate_piece_commitment(&mut pipe_r.by_ref().take(piece_size.into()), piece_size);
162+
163+
// drain the remaining bytes (all alignment) from the reader
164+
std::io::copy(&mut pipe_r.by_ref(), &mut io::sink())
165+
.map_err(|err| format_err!("failed to drain reader: {:?}", err))
166+
.and_then(|_| result)
167+
});
168+
169+
// send the source through the preprocessor, writing output to target
170+
let write_rslt = write_padded(tee_r, target)
171+
.map_err(|err| format_err!("failed to write and preprocess bytes: {:?}", err));
172+
173+
// block until piece commitment-generating thread returns
174+
let join_rslt = t_handle
175+
.join()
176+
.map_err(|err| format_err!("join piece commitment-generating thread failed: {:?}", err));
177+
178+
match (write_rslt, join_rslt) {
179+
(Ok(n), Ok(Ok(r))) => {
180+
let n = UnpaddedBytesAmount(n as u64);
181+
182+
ensure!(
183+
aligned_source_size == n,
184+
"expected to write {:?} source bytes, but actually wrote {:?}",
185+
aligned_source_size,
186+
n
187+
);
188+
189+
Ok((n, r.commitment))
190+
}
191+
(Ok(n), Ok(Err(err))) => {
192+
let e = format_err!(
193+
"wrote {:?} to target but then failed to generate piece commitment: {:?}",
194+
n,
195+
err
196+
);
197+
Err(e)
198+
}
199+
(Ok(n), Err(err)) => {
200+
let e = format_err!(
201+
"wrote {:?} to target but then failed to generate piece commitment: {:?}",
202+
n,
203+
err
204+
);
205+
Err(e)
206+
}
207+
(Err(err), _) => {
208+
let e = format_err!("failed to write and preprocess: {:?}", err);
209+
Err(e)
210+
}
211+
}
212+
}
213+
214+
fn ensure_piece_size(piece_size: UnpaddedBytesAmount) -> error::Result<()> {
113215
ensure!(
114216
piece_size >= UnpaddedBytesAmount(MINIMUM_PIECE_SIZE),
115217
"Piece must be at least {} bytes",
116218
MINIMUM_PIECE_SIZE
117219
);
220+
118221
let padded_piece_size: PaddedBytesAmount = piece_size.into();
119222
ensure!(
120223
u64::from(padded_piece_size).is_power_of_two(),
121224
"Bit-padded piece size must be a power of 2 ({:?})",
122225
padded_piece_size,
123226
);
124227

125-
let (bytes_with_alignment, aligned_source) =
126-
get_aligned_source(source, &piece_lengths, piece_size);
127-
let written = write_padded(aligned_source, target)?;
128-
ensure!(
129-
u64::from(bytes_with_alignment) == written as u64,
130-
"Invalid write"
131-
);
228+
Ok(())
229+
}
132230

133-
Ok(bytes_with_alignment)
231+
/// Writes bytes from `source` to `target`, adding bit-padding ("preprocessing")
232+
/// as needed. Returns a tuple containing the number of bytes written to
233+
/// `target` and the commitment.
234+
///
235+
/// WARNING: This function neither prepends nor appends alignment bytes to the
236+
/// `target`; it is the caller's responsibility to ensure properly sized
237+
/// and ordered writes to `target` such that `source`-bytes occupy whole
238+
/// subtrees of the final merkle tree built over `target`.
239+
pub fn write_and_preprocess<R, W>(
240+
source: R,
241+
target: W,
242+
piece_size: UnpaddedBytesAmount,
243+
) -> error::Result<(UnpaddedBytesAmount, Commitment)>
244+
where
245+
R: Read,
246+
W: Read + Write + Seek,
247+
{
248+
add_piece(source, target, piece_size, Default::default())
134249
}
135250

136251
#[cfg(test)]

filecoin-proofs/src/pieces.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub fn compute_comm_d(sector_size: SectorSize, piece_infos: &[PieceInfo]) -> Res
3636
"Too many pieces"
3737
);
3838

39-
// make sure the pice sizes are at most a sector size large
39+
// make sure the piece sizes are at most a sector size large
4040
let piece_size: u64 = piece_infos
4141
.iter()
4242
.map(|info| u64::from(PaddedBytesAmount::from(info.size)))
@@ -267,14 +267,15 @@ pub fn get_aligned_source<T: Read>(
267267
source: T,
268268
pieces: &[UnpaddedBytesAmount],
269269
piece_bytes: UnpaddedBytesAmount,
270-
) -> (UnpaddedBytesAmount, impl Read) {
270+
) -> (UnpaddedBytesAmount, PieceAlignment, impl Read) {
271271
let written_bytes = sum_piece_bytes_with_alignment(pieces);
272272
let piece_alignment = get_piece_alignment(written_bytes, piece_bytes);
273273
let expected_num_bytes_written =
274274
piece_alignment.left_bytes + piece_bytes + piece_alignment.right_bytes;
275275

276276
(
277277
expected_num_bytes_written,
278+
piece_alignment.clone(),
278279
with_alignment(source, piece_alignment),
279280
)
280281
}

0 commit comments

Comments
 (0)