Home | Libraries | People | FAQ | More |
The Hello World example didn't really demonstrate why using AFIO is any better than using STL iostreams. The final example in this quick start will give a “real world” and unfortunately somewhat overwhelming in complexity example of how AFIO can run rings around STL iostreams (complete with benchmarks!), so as an intermediate — and hopefully not as overwhelming — step here is a simple file copy utility which can also concatenate multiple source files into a destination file[9].
namespace { using namespace boost::afio; using boost::afio::off_t; // Keep memory buffers around // A special allocator of highly efficient file i/o memory typedef std::vector<char, utils::page_allocator<char>> file_buffer_type; static std::vector<std::unique_ptr<file_buffer_type>> buffers; // Parallel copy files in sources into dest, concatenating stl_future<std::vector<handle_ptr>> async_concatenate_files(atomic<off_t> &written, off_t &totalbytes, dispatcher_ptr dispatcher, boost::afio::filesystem::path dest, std::vector<boost::afio::filesystem::path> sources, size_t chunk_size = 1024 * 1024 /* 1Mb */) { // Schedule the opening of the output file for writing auto oh = async_file(dest, file_flags::create | file_flags::write); // Schedule the opening of all the input files for reading as a batch std::vector<path_req> ihs_reqs; ihs_reqs.reserve(sources.size()); for(auto &&source : sources) ihs_reqs.push_back(path_req( source, file_flags::read | file_flags::will_be_sequentially_accessed)); auto ihs = dispatcher->file(ihs_reqs); // Retrieve any error from opening the output oh.get(); // Wait for the input file handles to open so we can get their sizes // (plus any failures to open) when_all_p(ihs).get(); // Need to figure out the sizes of the sources so we can resize output // correctly. We also need to allocate scratch buffers for each source. std::vector<std::tuple<off_t, off_t>> offsets; offsets.reserve(ihs.size()); off_t offset = 0, max_individual = 0; for(auto &ih : ihs) { // Get the file's size in bytes off_t bytes = ih->direntry(metadata_flags::size).st_size(); if(bytes > max_individual) max_individual = bytes; // std::cout << "File " << ih->path() << " size " << bytes << " to offset " << // offset << std::endl; // Push the offset to write at, amount to write, and a scratch buffer offsets.push_back(std::make_tuple(offset, bytes)); buffers.push_back(detail::make_unique<file_buffer_type>(chunk_size)); offset += bytes; } // Schedule resizing output to correct size, retrieving errors totalbytes = offset; auto ohresize = async_truncate(oh, offset); when_all_p(ohresize).get(); // Schedule the parallel processing of all input files, sequential per file, // but only after the output file has been resized std::vector<future<>> lasts; for(auto &i : ihs) lasts.push_back(dispatcher->depends(ohresize, i)); for(off_t o = 0; o < max_individual; o += chunk_size) { for(size_t idx = 0; idx < ihs_reqs.size(); idx++) { auto offset = std::get<0>(offsets[idx]), bytes = std::get<1>(offsets[idx]); auto &buffer = buffers[idx]; if(o < bytes) { off_t thischunk = bytes - o; if(thischunk > chunk_size) thischunk = chunk_size; // std::cout << "Writing " << thischunk << " from offset " << o << " in " // << lasts[idx]->path() << std::endl; // Schedule a filling of buffer from offset o after last has completed auto readchunk = async_read(lasts[idx], buffer->data(), (size_t) thischunk, o); // Schedule a writing of buffer to offset offset+o after readchunk is ready // Note the call to dispatcher->depends() to make sure the write only // occurs // after the read completes auto writechunk = async_write(depends(readchunk, ohresize), buffer->data(), (size_t) thischunk, offset + o); // Schedule incrementing written after write has completed auto incwritten = writechunk.then([&written, thischunk](future<> f) { written += thischunk; return f; }); // Don't do next read until written is incremented lasts[idx] = dispatcher->depends(incwritten, readchunk); } } } // Having scheduled all the reads and write, return a stl_future which returns // when // they're done return when_all_p(lasts); } } int main(int argc, const char *argv[]) { using namespace boost::afio; using boost::afio::off_t; typedef chrono::duration<double, ratio<1, 1>> secs_type; if(argc < 3) { std::cerr << "ERROR: Need to specify destination path and source paths" << std::endl; return 1; } try { atomic<off_t> written(0); off_t totalbytes = 0; std::shared_ptr<boost::afio::dispatcher> dispatcher = boost::afio::make_dispatcher().get(); // Set a dispatcher as current for this thread boost::afio::current_dispatcher_guard guard(dispatcher); boost::afio::filesystem::path dest = argv[1]; std::vector<boost::afio::filesystem::path> sources; std::cout << "Concatenating into " << dest << " the files "; for(int n = 2; n < argc; ++n) { sources.push_back(argv[n]); std::cout << sources.back(); if(n < argc - 1) std::cout << ", "; } std::cout << " ..." << std::endl; auto begin = chrono::steady_clock::now(); auto h = async_concatenate_files(written, totalbytes, dispatcher, dest, sources); // Print progress once a second until it's done while(future_status::timeout == h.wait_for(boost::afio::chrono::seconds(1))) { std::cout << "\r" << (100 * written) / totalbytes << "% complete (" << written << " out of " << totalbytes << " @ " << (written / chrono::duration_cast<secs_type>(chrono::steady_clock::now() - begin).count() / 1024 / 1024) << "Mb/sec) ..." << std::flush; } // Retrieve any errors h.get(); std::cout << std::endl; } catch(...) { std::cerr << "ERROR: " << boost::current_exception_diagnostic_information(true) << std::endl; return 1; } // Make sure output really is input concatenated std::cout << "CRC checking that the output exactly matches the inputs ..." << std::endl; uint32_t crc1 = 0, crc2 = 0; off_t offset = 0; std::ifstream o(argv[1], std::ifstream::in); for(int n = 2; n < argc; n++) { std::ifstream i(argv[n], std::ifstream::in); i.seekg(0, i.end); std::vector<char> buffer1((size_t) i.tellg()), buffer2((size_t) i.tellg()); i.seekg(0, i.beg); o.read(buffer1.data(), buffer1.size()); i.read(buffer2.data(), buffer2.size()); bool quiet = false; for(size_t n = 0; n < buffer1.size(); n += 64) { crc1 = crc32(buffer1.data() + n, 64, crc1); crc2 = crc32(buffer2.data() + n, 64, crc2); if(crc1 != crc2 && !quiet) { std::cerr << "ERROR: Offset " << offset + n << " not copied correctly!" << std::endl; quiet = true; } } offset += buffer1.size(); } return 0; }
This example consists of a function called async_concatenate_files()
which will asynchronously append those
file paths in sources into the file path in dest,
with the main()
function simply parsing arguments and printing progress every second. I
won't explain the example hugely — it's pretty straightforward,
it simply parallel reads all source files in chunk_size
chunks, writing them to their appropriate offset into the output file.
It is a very good example, incidentally, of why C++11 is like a whole new
language over C++98 even for simple systems programming tasks like this
one.
You may note that the temporary buffers for each chunk are allocated using
a special page_allocator
.
If your operating system allows it, regions of memory returned by this
allocator have the maximum possible scatter gather DMA efficiency possible.
Even if your operating system doesn't allow it, it does no harm to use
this allocator instead of the system allocator.
The chances are that this file copying implementation would be little faster than a naive implementation however (unless the source files are on different physical devices, in which case you'd see maximum performance). Some might also argue that firing a consumer producer thread per source file would be just as easy, albeit that output file position management across threads might be slightly tricky.
Let us start into where the AFIO implementation starts to deliver real value add: a multiprocess safe log file and finding which files in a directory tree contain a regular expression.
[9] My thanks to Artur Laksberg, Dev Lead of the Microsoft Visual C++ Parallel Libraries team, for suggesting this as a good demonstration of an asynchronous file i/o library. I had been until then quite stumped for an intermediate quick start example between the first and last examples.