Skip to content

Async file I/O through producer-consumer patterns. Number of producers, consumers and tasks can be set arbitrarily; no async runtime required

Notifications You must be signed in to change notification settings

Data-Transfer/parallel-io-channles

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

par_io Parallel I/O library

Simple library to read and write files in parallel implementing the producer-consumer model.

Synchronous calls to pread and pwrite inside reader and writer threads are used to transfer data.

No async runtime is used since the actual file I/O is synchronous and task distribution and execution is controlled by the library through direct calls to Rust's thread and mpsc APIs.

Reading

(producer = reader)

1. the file is subdivided into chunks
2. each chunk is read by a separate producer thread
3. the producer thread extracts a buffer from a queue and fills it with the data from the file
4. the filled buffer is sent to a consumer thread (round-robin scheduling)
5. the consumer thread passes a reference to the buffer to a consumer callback received from client code
6. the return value from the callback is stored into an array
7. the buffer is moved back to the thread that sent it
8. all the return values from all the consumer threads are merged into a single array and returned to client code

Memory consumption is equal to:

(buffer size) x (number of buffers per producer) x (number of producers)

and independent from number of chunks or file size.

Having more than one buffer per producer queue allows reading and data consumption to overlap

Writing

(consumer = writer)

1. producer threads extract buffer from queue
2. mutable reference to buffer is passed to producer callback received from client code
3. result of callback invocation is checked: 
   1. no error: buffer is sent to consumer threads (round robin scheduling);
   2. error: error is sent to consumer threads which then terminate immediately
4. consumer threads receive the buffer and the file offset and store the data into file
5. buffer is moved back to the producer thread that sent it
6. each consumer thread returns the number of bytes written to file  
7. the results from all consumer threads are merged into a single array returned to client code

Memory consumption is equal to:

(buffer size) x (number of buffers per producer) x (number of producers)

Usage

read_file and write_to_file functions are used for read and write operations.

The read_file function returns a vector of

size = (number of chunks per producer) x (number of producers)

with each element containing the return value of the callback function consuming the data.

The write_to_file function returns a Result instance containing the number of bytes written or an Err(String) instance. In case the producer's callback fails with an error, such error is forwarded to consumers which immediately exit returning the received error.

Parallel reading example

use par_io::read::read_file;
pub fn main() {
    let filename = std::env::args().nth(1).expect("Missing file name");
    let len = std::fs::metadata(&filename)
        .expect("Error reading file size")
        .len();
    let num_producers: u64 = std::env::args()
        .nth(2)
        .expect("Missing num producers")
        .parse()
        .unwrap();
    let num_consumers: u64 = std::env::args()
        .nth(3)
        .expect("Missing num consumers")
        .parse()
        .unwrap();
    let chunks_per_producer: u64 = std::env::args()
        .nth(4)
        .expect("Missing num chunks per producer")
        .parse()
        .unwrap();
    let num_buffers_per_producer: u64 = if let Some(p) = std::env::args().nth(5) {
        p.parse().expect("Wrong num tasks format")
    } else {
        2
    };
    // callback function
    let consume = |buffer: &[u8],   // buffer containing data from file
                   data: &String,   // custom data
                   chunk_id: u64,   // chunk id 
                   num_chunks: u64, // number of chunks per producer
                   _offset: u64     // file offset
     -> Result<usize, String> {
        std::thread::sleep(std::time::Duration::from_secs(1));
        println!(
            "Consumer: {}/{} {} {}",
            chunk_id,
            num_chunks,
            data,
            buffer.len()
        );
        Ok(buffer.len())
    };
    let tag = "TAG".to_string();
    match read_file(
        &filename,
        num_producers,
        num_consumers,
        chunks_per_producer,
        std::sync::Arc::new(consume), // <- callback
        tag,                          // <- client data passed to callback
        num_buffers_per_producer,
    ) {
        Ok(v) => {
            let bytes_consumed = v
                .iter()
                .fold(0, |acc, x| if let (_, Ok(b)) = x { acc + b } else { acc });
            assert_eq!(bytes_consumed, len as usize);
        }
        Err(err) => {
            eprintln!("{}", err.to_string());
        }
    }

Parallel writing example

use par_io::write::write_to_file;
pub fn main() {
    let buffer_size: usize = std::env::args()
        .nth(1)
        .expect("Missing buffer size")
        .parse()
        .expect("Wrong buffer size format");
    let filename = std::env::args().nth(2).expect("Missing file name");
    let num_producers: u64 = std::env::args()
        .nth(3)
        .expect("Missing num producers")
        .parse()
        .unwrap();
    let num_consumers: u64 = std::env::args()
        .nth(4)
        .expect("Missing num consumers")
        .parse()
        .unwrap();
    let chunks_per_producer: u64 = std::env::args()
        .nth(5)
        .expect("Missing num chunks per producer")
        .parse()
        .unwrap();
    let num_buffers_per_producer: u64 = if let Some(p) = std::env::args().nth(6) {
        p.parse().expect("Wrong num tasks format")
    } else {
        2
    };
    let producer = |buffer: &mut Vec<u8>, _tag: &String, _offset: u64| -> Result<(), String> {
        std::thread::sleep(std::time::Duration::from_secs(1));
        println!("{:?}> Writing to offset {}", buffer.as_ptr(), offset);
        let len = buffer.len();
        // Warning: data need to be modified in place if not `buffer` will be re-allocated and not reused
        buffer.copy_from_slice(vec![1_u8; len].as_slice());
        Ok(())
    };
    let data = "TAG".to_string();
    match write_to_file(
        &filename,
        num_producers,
        num_consumers,
        chunks_per_producer,
        std::sync::Arc::new(producer),
        data,
        num_buffers_per_producer,
        buffer_size,
    ) {
        Ok(bytes_consumed) => {
            let len = std::fs::metadata(&filename)
                .expect("Cannot access file")
                .len();
            assert_eq!(bytes_consumed, len as usize);
            std::fs::remove_file(&filename).expect("Cannot delete file");
        },
        Err(err) => {
            use par_io::write::{WriteError, ProducerError, ConsumerError};
            match err {
                WriteError::Producer(ProducerError{msg, offset}) => {
                    eprintln!("Producer error: {} at {}", msg, offset);
                },
                WriteError::Consumer(ConsumerError{msg}) => {
                    eprintln!("Consumer error: {}", msg);
                },
                WriteError::Other(err) => {
                    eprintln!("Error: {}", err);
                },
            }
        }
    }
}

About

Async file I/O through producer-consumer patterns. Number of producers, consumers and tasks can be set arbitrarily; no async runtime required

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages