Home | Libraries | People | FAQ | More |
So we can schedule file i/o operations asynchronously with AFIO: what's the benefit of doing that instead of creating separate threads of execution and executing the file i/o there instead?
As a quick summary as we're about to prove our case, there are three main benefits to using AFIO over doing it by hand:
What these three items mean is that writing scalable high-performance filing system code is much easier. Let us take a real world comparative example, this being a simple STL iostreams, Boost Filesystem and OpenMP based find regular expression in files implementation:
int main(int argc, const char *argv[]) { using namespace std; namespace filesystem = boost::afio::filesystem; #if BOOST_AFIO_USE_BOOST_FILESYSTEM using boost::filesystem::ifstream; #endif /*_ BOOST_AFIO_USE_BOOST_FILESYSTEM _*/ typedef chrono::duration<double, ratio<1, 1>> secs_type; if(argc < 2) { cerr << "ERROR: Specify a regular expression to search all files in the current " "directory." << endl; return 1; } // Prime SpeedStep auto begin = chrono::high_resolution_clock::now(); while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now() - begin).count() < 1) ; size_t bytesread = 0, filesread = 0, filesmatched = 0; try { begin = chrono::high_resolution_clock::now(); // Generate a list of all files here and below the current working directory vector<filesystem::path> filepaths; for(auto it = filesystem::recursive_directory_iterator("."); it != filesystem::recursive_directory_iterator(); ++it) { if(it->status().type() != #ifdef BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS filesystem::file_type::regular_file) #else /*_ defined(BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS) _*/ filesystem::file_type::regular) #endif /*_ defined(BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS) _*/ continue; filepaths.push_back(it->path()); } // Compile the regular expression, and have OpenMP parallelise the loop regex regexpr(argv[1]); #pragma omp parallel for schedule(dynamic) for(int n = 0; n < (int) filepaths.size(); n++) { // Open the file ifstream s(filepaths[n], ifstream::binary); s.exceptions(fstream::failbit | fstream::badbit); // Turn on exception throwing // Get its length s.seekg(0, ios::end); size_t length = (size_t) s.tellg(); s.seekg(0, ios::beg); // Allocate a sufficient buffer, avoiding the byte clearing vector would do unique_ptr<char[]> buffer(new char[length + 1]); // Read in the file, terminating with zero s.read(buffer.get(), length); buffer.get()[length] = 0; // Search the buffer for the regular expression if(regex_search(buffer.get(), regexpr)) { #pragma omp critical { cout << filepaths[n] << endl; } filesmatched++; } filesread++; bytesread += length; } auto end = chrono::high_resolution_clock::now(); auto diff = chrono::duration_cast<secs_type>(end - begin); cout << "\n" << filesmatched << " files matched out of " << filesread << " files which was " << bytesread << " bytes." << endl; cout << "The search took " << diff.count() << " seconds which was " << filesread / diff.count() << " files per second or " << (bytesread / diff.count() / 1024 / 1024) << " Mb/sec." << endl; } catch(...) { cerr << boost::current_exception_diagnostic_information(true) << endl; return 1; } return 0; }
This implementation is fairly straightforward: enumerate all files in all directories below the current directory into a vector, fire off N threads to open each file, read it entirely into memory, regex it for the pattern and if found print the file's path.
Let's now look at the AFIO implementation, and you should prepare yourself because it is far more mind bendy due to the many nestings of callbacks needed (it may remind you of WinRT or .NET code, everything is asynchronous callback):
Warning | |
---|---|
In the v1.4 engine we ought to gain C++ 1z stackful coroutine support which will let me rewrite the below to be vastly simpler and without the multitude of nested callback handlers. |
using namespace boost::afio; // Often it's easiest for a lot of nesting callbacks to carry state via a this // pointer class find_in_files { public: std::promise<int> finished; std::regex regexpr; // The precompiled regular expression dispatcher_ptr dispatcher; recursive_mutex opslock; std::deque<future<>> ops; // For exception gathering std::atomic<size_t> bytesread, filesread, filesmatched, scheduled, completed; std::vector<std::pair<boost::afio::filesystem::path, size_t>> filepaths; // Signals finish once all scheduled ops have completed void docompleted(size_t inc) { size_t c = (completed += inc); if(c == scheduled) finished.set_value(0); }; // Adds ops to the list of scheduled void doscheduled(std::initializer_list<future<>> list) { scheduled += list.size(); // boost::lock_guard<decltype(opslock)> lock(opslock); // ops.insert(ops.end(), list.begin(), list.end()); } void doscheduled(std::vector<future<>> list) { scheduled += list.size(); // boost::lock_guard<decltype(opslock)> lock(opslock); // ops.insert(ops.end(), list.begin(), list.end()); } void dosearch(handle_ptr h, const char *buffer, size_t length) { // Search the buffer for the regular expression if(std::regex_search(buffer, regexpr)) { #pragma omp critical { std::cout << h->path() << std::endl; } ++filesmatched; } ++filesread; bytesread += length; } // A file searching completion, called when each file read completes std::pair<bool, handle_ptr> file_read( size_t id, future<> op, std::shared_ptr<std::vector<char, detail::aligned_allocator<char, 4096, false>>> _buffer, size_t length) { handle_ptr h(op.get_handle()); // std::cout << "R " << h->path() << std::endl; char *buffer = _buffer->data(); buffer[length] = 0; dosearch(h, buffer, length); docompleted(2); // Throw away the buffer now rather than later to keep memory consumption down _buffer->clear(); return std::make_pair(true, h); } // A file reading completion, called when each file open completes std::pair<bool, handle_ptr> file_opened(size_t id, future<> op, size_t length) { handle_ptr h(op.get_handle()); // std::cout << "F " << h->path() << std::endl; if(length) { #ifdef USE_MMAPS auto map(h->map_file()); if(map) { dosearch(h, (const char *) map->addr, length); } else #endif /*_ defined(USE_MMAPS) _*/ { // Allocate a sufficient 4Kb aligned buffer size_t _length = (4095 + length) & ~4095; auto buffer = std::make_shared< std::vector<char, detail::aligned_allocator<char, 4096, false>>>(_length + 1); // Schedule a read of the file auto read = dispatcher->read(make_io_req( dispatcher->op_from_scheduled_id(id), buffer->data(), _length, 0)); auto read_done = dispatcher->completion( read, std::make_pair(async_op_flags::none /*regex search might be slow*/, std::function<dispatcher::completion_t>(std::bind( &find_in_files::file_read, this, std::placeholders::_1, std::placeholders::_2, buffer, length)))); doscheduled({read, read_done}); } } docompleted(2); return std::make_pair(true, h); } // An enumeration parsing completion, called when each directory enumeration // completes std::pair<bool, handle_ptr> dir_enumerated( size_t id, future<> op, std::shared_ptr<future<std::pair<std::vector<directory_entry>, bool>>> listing) { handle_ptr h(op.get_handle()); future<> lastdir, thisop(dispatcher->op_from_scheduled_id(id)); // Get the entries from the ready stl_future std::vector<directory_entry> entries(std::move(listing->get().first)); // std::cout << "E " << h->path() << std::endl; // For each of the directories schedule an open and enumeration #if 0 // Algorithm 1 { std::vector<path_req> dir_reqs; dir_reqs.reserve(entries.size()); for(auto &entry : entries) { if((entry.st_type()&S_IFDIR)==S_IFDIR) { dir_reqs.push_back(path_req(thisop, h->path()/entry.name())); } } if(!dir_reqs.empty()) { std::vector<std::pair<async_op_flags, std::function<dispatcher::completion_t>>> dir_openedfs(dir_reqs.size(), std::make_pair(async_op_flags::None, std::bind(&find_in_files::dir_opened, this, std::placeholders::_1, std::placeholders::_2))); auto dir_opens=dispatcher->dir(dir_reqs); doscheduled(dir_opens); auto dir_openeds=dispatcher->completion(dir_opens, dir_openedfs); doscheduled(dir_openeds); // Hold back some of the concurrency lastdir=dir_openeds.back(); } } #else /*_ 0 _*/ // Algorithm 2 // The Windows NT kernel filing system driver gets upset with too much // concurrency // when used with OSDirect so throttle directory enumerations to enforce some // depth first traversal. { std::pair<async_op_flags, std::function<dispatcher::completion_t>> dir_openedf = std::make_pair( async_op_flags::none, std::bind(&find_in_files::dir_opened, this, std::placeholders::_1, std::placeholders::_2)); for(auto &entry : entries) { if(entry.st_type() == # ifdef BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS boost::afio::filesystem::file_type::directory_file) # else /*_ defined(BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS) _*/ boost::afio::filesystem::file_type::directory) # endif /*_ defined(BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS) _*/ { auto dir_open = dispatcher->dir(path_req::absolute(lastdir, h->path() / entry.name())); auto dir_opened = dispatcher->completion(dir_open, dir_openedf); doscheduled({dir_open, dir_opened}); lastdir = dir_opened; } } } #endif /*_ 0 _*/ // For each of the files schedule an open and search #if 0 // Algorithm 1 { std::vector<path_req> file_reqs; file_reqs.reserve(entries.size()); std::vector<std::pair<async_op_flags, std::function<dispatcher::completion_t>>> file_openedfs; file_openedfs.reserve(entries.size()); for(auto &entry : entries) { if((entry.st_type()&S_IFREG)==S_IFREG) { size_t length=(size_t)entry.st_size(); if(length) { file_flags flags=file_flags::read; # ifdef USE_MMAPS if(length>16384) flags=flags|file_flags::os_mmap; # endif /*_ defined(USE_MMAPS) _*/ file_reqs.push_back(path_req(lastdir, h->path()/entry.name(), flags)); file_openedfs.push_back(std::make_pair(async_op_flags::None, std::bind(&find_in_files::file_opened, this, std::placeholders::_1, std::placeholders::_2, length))); } } } auto file_opens=dispatcher->file(file_reqs); doscheduled(file_opens); auto file_openeds=dispatcher->completion(file_opens, file_openedfs); doscheduled(file_openeds); } #else /*_ 0 _*/ // Algorithm 2 { for(auto &entry : entries) { if(entry.st_type() == # ifdef BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS boost::afio::filesystem::file_type::regular_file) # else /*_ defined(BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS) _*/ boost::afio::filesystem::file_type::regular) # endif /*_ defined(BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS) _*/ { size_t length = (size_t) entry.st_size(); if(length) { file_flags flags = file_flags::read; auto file_open = dispatcher->file( path_req::absolute(lastdir, h->path() / entry.name(), flags)); auto file_opened = dispatcher->completion( file_open, std::make_pair(async_op_flags::none, std::function<dispatcher::completion_t>(std::bind( &find_in_files::file_opened, this, std::placeholders::_1, std::placeholders::_2, length)))); doscheduled({file_open, file_opened}); lastdir = file_opened; } } } } #endif /*_ 0 _*/ docompleted(2); return std::make_pair(true, h); } // A directory enumerating completion, called once per directory open in the tree std::pair<bool, handle_ptr> dir_opened(size_t id, future<> op) { handle_ptr h(op.get_handle()); // std::cout << "D " << h->path() << std::endl; // Now we have an open directory handle, schedule an enumeration auto enumeration = dispatcher->enumerate( enumerate_req(dispatcher->op_from_scheduled_id(id), metadata_flags::size, 1000)); future<> enumeration_op(enumeration); auto listing = std::make_shared<future<std::pair<std::vector<directory_entry>, bool>>>( std::move(enumeration)); auto enumeration_done = dispatcher->completion( enumeration_op, make_pair(async_op_flags::none, std::function<dispatcher::completion_t>( std::bind(&find_in_files::dir_enumerated, this, std::placeholders::_1, std::placeholders::_2, listing)))); doscheduled({enumeration, enumeration_done}); docompleted(2); // Complete only if not the cur dir opened return std::make_pair(true, h); }; void dowait() { // Prepare finished auto finished_waiter = finished.get_future(); #if 0 // Disabled to maximise performance // Wait till done, retrieving any exceptions as they come in to keep memory consumption down std::future_status status; do { status=finished_waiter.wait_for(std::chrono::milliseconds(1000)); std::cout << "\nDispatcher has " << dispatcher->count() << " fds open and " << dispatcher->wait_queue_depth() << " ops outstanding." << std::endl; std::vector<future<>> batch; batch.reserve(10000); { boost::lock_guard<decltype(opslock)> lock(opslock); while(status==std::future_status::timeout ? (ops.size()>5000) : !ops.empty()) { batch.push_back(std::move(ops.front())); ops.pop_front(); } } // Retrieve any exceptions std::cout << "Processed " << batch.size() << " ops for exception state." << std::endl; if(!batch.empty()) when_all_p(batch.begin(), batch.end()).wait(); } while(status==std::future_status::timeout); #else /*_ 0 // Disabled to maximise performance _*/ finished_waiter.wait(); #endif /*_ 0 // Disabled to maximise performance _*/ } // Constructor, which starts the ball rolling find_in_files(const char *_regexpr) : regexpr(_regexpr) , // Create an AFIO dispatcher that bypasses any filing system buffers dispatcher( make_dispatcher( "file:///", file_flags::will_be_sequentially_accessed /*|file_flags::os_direct*/).get()) , bytesread(0) , filesread(0) , filesmatched(0) , scheduled(0) , completed(0) { filepaths.reserve(50000); // Schedule the recursive enumeration of the current directory std::cout << "\n\nStarting directory enumerations ..." << std::endl; auto cur_dir = dispatcher->dir(path_req("")); auto cur_dir_opened = dispatcher->completion( cur_dir, std::make_pair(async_op_flags::none, std::function<dispatcher::completion_t>(std::bind( &find_in_files::dir_opened, this, std::placeholders::_1, std::placeholders::_2)))); doscheduled({cur_dir, cur_dir_opened}); dowait(); } }; int main(int argc, const char *argv[]) { using std::placeholders::_1; using std::placeholders::_2; using namespace boost::afio; typedef chrono::duration<double, ratio<1, 1>> secs_type; if(argc < 2) { std::cerr << "ERROR: Specify a regular expression to search all files in the " "current directory." << std::endl; return 1; } // Prime SpeedStep auto begin = chrono::high_resolution_clock::now(); while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now() - begin).count() < 1) ; try { begin = chrono::high_resolution_clock::now(); find_in_files finder(argv[1]); auto end = chrono::high_resolution_clock::now(); auto diff = chrono::duration_cast<secs_type>(end - begin); std::cout << "\n" << finder.filesmatched << " files matched out of " << finder.filesread << " files which was " << finder.bytesread << " bytes." << std::endl; std::cout << "The search took " << diff.count() << " seconds which was " << finder.filesread / diff.count() << " files per second or " << (finder.bytesread / diff.count() / 1024 / 1024) << " Mb/sec." << std::endl; } catch(...) { std::cerr << boost::current_exception_diagnostic_information(true) << std::endl; return 1; } return 0; }
Here the find_in_files
class is used to carry state across the callbacks — one could just
as equally bind the state into each callback's parameters instead via some
sort of pointer to a struct. But the difference in complexity is noticeable
— you're probably now asking, why choose this hideous complexity
over the much clearer OpenMP and iostreams implementation[10]?
Well, that's simple: do you want maximum file i/o performance? Here is a search for “Niall” in a Boost working directory which contained about 7Gb of data across ~35k files[11]:
Table 1.15. Find in files performance for traditional vs AFIO implementations
iostreams, single threaded |
iostreams, OpenMP |
Boost.AFIO |
Boost.AFIO |
Boost.AFIO |
|
---|---|---|---|---|---|
Warm cache |
812 Mb/sec |
1810 Mb/sec |
2663 Mb/sec |
N/A |
6512 Mb/sec |
+0% |
+123% |
+228% |
+702% |
||
Cold cache[b] |
16 Mb/sec |
8 Mb/sec |
15 Mb/sec |
13.14 Mb/sec |
24 Mb/sec |
+0% |
-50% |
-6% |
-18% |
+50% |
|
[a] The superiority of memory maps on Windows is because all buffered file i/o is done via memory copying to/from memory maps on Windows anyway, so eliminating that memory copy is huge. [b] File cache reset using http://technet.microsoft.com/en-us/sysinternals/ff700229.aspx |
Note how AFIO outperforms the OpenMP iostreams implementation by about
50% for a warm cache, with only a 6% penalty for a cold cache over a single
threaded implementation. Note the 50%
penalty the OpenMP iostreams implementation suffers for a cold cache —
a naive multithreaded implementation causes a lot of disc seeks. If you
map a file into memory using file_flags::os_mmap
,
AFIO will memcpy()
from that map instead of reading — or of course you can use the
map directly using the pointer returned by try_mapfile()
.
The eagle eyed amongst you will have noticed that the AFIO implementation looks hand tuned with a special depth first algorithm balancing concurrency with seek locality — that's because I invested two days into achieving maximum possible performance as a demonstration of AFIO's power (and to find and fix some bottlenecks). Some might argue that this is therefore not a fair comparison to the OpenMP iostreams implementation.
There are two parts to answering this argument. The first is that yes, the OpenMP iostreams search algorithm is fairly stupid and simply tries to read as many files as there are CPUs, and those files could be stored anywhere on the spinning disc. Because AFIO can issue far more concurrent file open and read requests than OpenMP, it gives a lot more scope to the filing system to optimise hard drive seeks and satisfy as many requests as is feasible — sufficiently so that with a cold cache, AFIO is a little slower than a single threaded iostreams implementation where the filing system can spot the access pattern and prefetch quite effectively. A completely valid solution to the OpenMP performance deficit would be to increase thread count dramatically.
The second part of answering that argument is this: AFIO's very flexible op chaining structure lets you very easily twiddle with the execution dependency graph to achieve maximum possible performance by balancing concurrency (too much or too little is slower, what you need is just the right balance) and disc seeks (enumerations are best not done in parallel, it defeats any prefetching algorithm), unlike the naive OpenMP implementation which is much harder to play around with. Don't get me wrong: if you have plenty of time on your hands, you can implement a hand written and tuned find in files implementation that is faster than AFIO's implementation, but it will have taken you a lot longer, and the code will neither be as portable nor as maintainable.
[10] Indeed many ask the same when programming WinRT apps — Microsoft's insistance on never allowing any call to block can make simple designs explode with complexities of nested callbacks.
[11] Test machine was a 3.5Ghz Intel i7-3770K Microsoft Windows 8 x64 machine with Seagate ST3320620AS 7200rpm hard drive. Note that AFIO has a native WinIOCP backend which leverages host asynchronous file i/o support.