Boost C++ Libraries Home Libraries People FAQ More

PrevUpHomeNext

What performance benefit does asynchronous file i/o bring me? A demonstration

So we can schedule file i/o operations asynchronously with AFIO: what's the benefit of doing that instead of creating separate threads of execution and executing the file i/o there instead?

As a quick summary as we're about to prove our case, there are three main benefits to using AFIO over doing it by hand:

  1. You don't have to worry about threading or race conditions or losing error state (as much). AFIO does all that for you.
  2. On platforms which provide native asynchronous file i/o support and/or native scatter/gather file i/o support, AFIO will use that instead of issuing multiple filing system operations using a thread pool to achieve concurrency. This can very significantly reduce the number of threads needed to keep your storage device fully occupied — remember that queue depth i.e. that metric you see all over storage device benchmarks is the number of operations in flight at once, which implies the number of threads needed. Most storage devices are IOPS limited due to SATA or SAS connection latencies without introducing queue depth — in particular, modern SSDs cannot achieve tens of thousands of IOPS range without substantial queue depths, which without using a native asynchronous file i/o API means lots of threads.
  3. It's very, very easy to have AFIO turn off file system caching, readahead or buffering on a case by case basis which makes writing scalable random synchronous file i/o applications much easier.

What these three items mean is that writing scalable high-performance filing system code is much easier. Let us take a real world comparative example, this being a simple STL iostreams, Boost Filesystem and OpenMP based find regular expression in files implementation:

int main(int argc, const char *argv[])
{
  using namespace std;
  namespace filesystem = boost::afio::filesystem;
#if BOOST_AFIO_USE_BOOST_FILESYSTEM
  using boost::filesystem::ifstream;
#endif  /*_ BOOST_AFIO_USE_BOOST_FILESYSTEM _*/
  typedef chrono::duration<double, ratio<1, 1>> secs_type;
  if(argc < 2)
  {
    cerr << "ERROR: Specify a regular expression to search all files in the current "
            "directory." << endl;
    return 1;
  }
  // Prime SpeedStep
  auto begin = chrono::high_resolution_clock::now();
  while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now() -
                                         begin).count() < 1)
    ;
  size_t bytesread = 0, filesread = 0, filesmatched = 0;
  try
  {
    begin = chrono::high_resolution_clock::now();

    // Generate a list of all files here and below the current working directory
    vector<filesystem::path> filepaths;
    for(auto it = filesystem::recursive_directory_iterator(".");
        it != filesystem::recursive_directory_iterator(); ++it)
    {
      if(it->status().type() !=
#ifdef BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS
         filesystem::file_type::regular_file)
#else  /*_ defined(BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS) _*/
         filesystem::file_type::regular)
#endif  /*_ defined(BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS) _*/
        continue;
      filepaths.push_back(it->path());
    }

    // Compile the regular expression, and have OpenMP parallelise the loop
    regex regexpr(argv[1]);
#pragma omp parallel for schedule(dynamic)
    for(int n = 0; n < (int) filepaths.size(); n++)
    {
      // Open the file
      ifstream s(filepaths[n], ifstream::binary);
      s.exceptions(fstream::failbit |
                   fstream::badbit);  // Turn on exception throwing
      // Get its length
      s.seekg(0, ios::end);
      size_t length = (size_t) s.tellg();
      s.seekg(0, ios::beg);
      // Allocate a sufficient buffer, avoiding the byte clearing vector would do
      unique_ptr<char[]> buffer(new char[length + 1]);
      // Read in the file, terminating with zero
      s.read(buffer.get(), length);
      buffer.get()[length] = 0;
      // Search the buffer for the regular expression
      if(regex_search(buffer.get(), regexpr))
      {
#pragma omp critical
        {
          cout << filepaths[n] << endl;
        }
        filesmatched++;
      }
      filesread++;
      bytesread += length;
    }
    auto end = chrono::high_resolution_clock::now();
    auto diff = chrono::duration_cast<secs_type>(end - begin);
    cout << "\n" << filesmatched << " files matched out of " << filesread
         << " files which was " << bytesread << " bytes." << endl;
    cout << "The search took " << diff.count() << " seconds which was "
         << filesread / diff.count() << " files per second or "
         << (bytesread / diff.count() / 1024 / 1024) << " Mb/sec." << endl;
  }
  catch(...)
  {
    cerr << boost::current_exception_diagnostic_information(true) << endl;
    return 1;
  }
  return 0;
}

This implementation is fairly straightforward: enumerate all files in all directories below the current directory into a vector, fire off N threads to open each file, read it entirely into memory, regex it for the pattern and if found print the file's path.

Let's now look at the AFIO implementation, and you should prepare yourself because it is far more mind bendy due to the many nestings of callbacks needed (it may remind you of WinRT or .NET code, everything is asynchronous callback):

[Warning] Warning

In the v1.4 engine we ought to gain C++ 1z stackful coroutine support which will let me rewrite the below to be vastly simpler and without the multitude of nested callback handlers.

using namespace boost::afio;

// Often it's easiest for a lot of nesting callbacks to carry state via a this
// pointer
class find_in_files
{
public:
  std::promise<int> finished;
  std::regex regexpr;  // The precompiled regular expression
  dispatcher_ptr dispatcher;
  recursive_mutex opslock;
  std::deque<future<>> ops;  // For exception gathering
  std::atomic<size_t> bytesread, filesread, filesmatched, scheduled, completed;
  std::vector<std::pair<boost::afio::filesystem::path, size_t>> filepaths;

  // Signals finish once all scheduled ops have completed
  void docompleted(size_t inc)
  {
    size_t c = (completed += inc);
    if(c == scheduled)
      finished.set_value(0);
  };
  // Adds ops to the list of scheduled
  void doscheduled(std::initializer_list<future<>> list)
  {
    scheduled += list.size();
    // boost::lock_guard<decltype(opslock)> lock(opslock);
    // ops.insert(ops.end(), list.begin(), list.end());
  }
  void doscheduled(std::vector<future<>> list)
  {
    scheduled += list.size();
    // boost::lock_guard<decltype(opslock)> lock(opslock);
    // ops.insert(ops.end(), list.begin(), list.end());
  }
  void dosearch(handle_ptr h, const char *buffer, size_t length)
  {
    // Search the buffer for the regular expression
    if(std::regex_search(buffer, regexpr))
    {
#pragma omp critical
      {
        std::cout << h->path() << std::endl;
      }
      ++filesmatched;
    }
    ++filesread;
    bytesread += length;
  }
  // A file searching completion, called when each file read completes
  std::pair<bool, handle_ptr> file_read(
  size_t id, future<> op,
  std::shared_ptr<std::vector<char, detail::aligned_allocator<char, 4096, false>>>
  _buffer,
  size_t length)
  {
    handle_ptr h(op.get_handle());
    // std::cout << "R " << h->path() << std::endl;
    char *buffer = _buffer->data();
    buffer[length] = 0;
    dosearch(h, buffer, length);
    docompleted(2);
    // Throw away the buffer now rather than later to keep memory consumption down
    _buffer->clear();
    return std::make_pair(true, h);
  }
  // A file reading completion, called when each file open completes
  std::pair<bool, handle_ptr> file_opened(size_t id, future<> op, size_t length)
  {
    handle_ptr h(op.get_handle());
    // std::cout << "F " << h->path() << std::endl;
    if(length)
    {
#ifdef USE_MMAPS
      auto map(h->map_file());
      if(map)
      {
        dosearch(h, (const char *) map->addr, length);
      }
      else
#endif  /*_ defined(USE_MMAPS) _*/
      {
        // Allocate a sufficient 4Kb aligned buffer
        size_t _length = (4095 + length) & ~4095;
        auto buffer = std::make_shared<
        std::vector<char, detail::aligned_allocator<char, 4096, false>>>(_length +
                                                                         1);
        // Schedule a read of the file
        auto read = dispatcher->read(make_io_req(
        dispatcher->op_from_scheduled_id(id), buffer->data(), _length, 0));
        auto read_done = dispatcher->completion(
        read, std::make_pair(async_op_flags::none /*regex search might be slow*/,
                             std::function<dispatcher::completion_t>(std::bind(
                             &find_in_files::file_read, this, std::placeholders::_1,
                             std::placeholders::_2, buffer, length))));
        doscheduled({read, read_done});
      }
    }
    docompleted(2);
    return std::make_pair(true, h);
  }
  // An enumeration parsing completion, called when each directory enumeration
  // completes
  std::pair<bool, handle_ptr> dir_enumerated(
  size_t id, future<> op,
  std::shared_ptr<future<std::pair<std::vector<directory_entry>, bool>>> listing)
  {
    handle_ptr h(op.get_handle());
    future<> lastdir, thisop(dispatcher->op_from_scheduled_id(id));
    // Get the entries from the ready stl_future
    std::vector<directory_entry> entries(std::move(listing->get().first));
// std::cout << "E " << h->path() << std::endl;
// For each of the directories schedule an open and enumeration
#if 0
        // Algorithm 1
        {
            std::vector<path_req> dir_reqs; dir_reqs.reserve(entries.size());
            for(auto &entry : entries)
            {
                if((entry.st_type()&S_IFDIR)==S_IFDIR)
                {
                    dir_reqs.push_back(path_req(thisop, h->path()/entry.name()));
                }
            }
            if(!dir_reqs.empty())
            {
                std::vector<std::pair<async_op_flags, std::function<dispatcher::completion_t>>> dir_openedfs(dir_reqs.size(), std::make_pair(async_op_flags::None, std::bind(&find_in_files::dir_opened, this, std::placeholders::_1, std::placeholders::_2)));
                auto dir_opens=dispatcher->dir(dir_reqs);
                doscheduled(dir_opens);
                auto dir_openeds=dispatcher->completion(dir_opens, dir_openedfs);
                doscheduled(dir_openeds);
                // Hold back some of the concurrency
                lastdir=dir_openeds.back();
            }
        }
#else  /*_ 0 _*/
    // Algorithm 2
    // The Windows NT kernel filing system driver gets upset with too much
    // concurrency
    // when used with OSDirect so throttle directory enumerations to enforce some
    // depth first traversal.
    {
      std::pair<async_op_flags, std::function<dispatcher::completion_t>>
      dir_openedf = std::make_pair(
      async_op_flags::none, std::bind(&find_in_files::dir_opened, this,
                                      std::placeholders::_1, std::placeholders::_2));
      for(auto &entry : entries)
      {
        if(entry.st_type() ==
# ifdef BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS
           boost::afio::filesystem::file_type::directory_file)
# else  /*_ defined(BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS) _*/
           boost::afio::filesystem::file_type::directory)
# endif  /*_ defined(BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS) _*/
        {
          auto dir_open =
          dispatcher->dir(path_req::absolute(lastdir, h->path() / entry.name()));
          auto dir_opened = dispatcher->completion(dir_open, dir_openedf);
          doscheduled({dir_open, dir_opened});
          lastdir = dir_opened;
        }
      }
    }
#endif  /*_ 0 _*/

// For each of the files schedule an open and search
#if 0
        // Algorithm 1
        {
            std::vector<path_req> file_reqs; file_reqs.reserve(entries.size());
            std::vector<std::pair<async_op_flags, std::function<dispatcher::completion_t>>> file_openedfs; file_openedfs.reserve(entries.size());
            for(auto &entry : entries)
            {
                if((entry.st_type()&S_IFREG)==S_IFREG)
                {
                    size_t length=(size_t)entry.st_size();
                    if(length)
                    {
                        file_flags flags=file_flags::read;
# ifdef USE_MMAPS
                        if(length>16384) flags=flags|file_flags::os_mmap;
# endif  /*_ defined(USE_MMAPS) _*/
                        file_reqs.push_back(path_req(lastdir, h->path()/entry.name(), flags));
                        file_openedfs.push_back(std::make_pair(async_op_flags::None, std::bind(&find_in_files::file_opened, this, std::placeholders::_1, std::placeholders::_2, length)));
                    }
                }
            }
            auto file_opens=dispatcher->file(file_reqs);
            doscheduled(file_opens);
            auto file_openeds=dispatcher->completion(file_opens, file_openedfs);
            doscheduled(file_openeds);
        }
#else  /*_ 0 _*/
    // Algorithm 2
    {
      for(auto &entry : entries)
      {
        if(entry.st_type() ==
# ifdef BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS
           boost::afio::filesystem::file_type::regular_file)
# else  /*_ defined(BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS) _*/
           boost::afio::filesystem::file_type::regular)
# endif  /*_ defined(BOOST_AFIO_USE_LEGACY_FILESYSTEM_SEMANTICS) _*/
        {
          size_t length = (size_t) entry.st_size();
          if(length)
          {
            file_flags flags = file_flags::read;
            auto file_open = dispatcher->file(
            path_req::absolute(lastdir, h->path() / entry.name(), flags));
            auto file_opened = dispatcher->completion(
            file_open,
            std::make_pair(async_op_flags::none,
                           std::function<dispatcher::completion_t>(std::bind(
                           &find_in_files::file_opened, this, std::placeholders::_1,
                           std::placeholders::_2, length))));
            doscheduled({file_open, file_opened});
            lastdir = file_opened;
          }
        }
      }
    }
#endif  /*_ 0 _*/
    docompleted(2);
    return std::make_pair(true, h);
  }
  // A directory enumerating completion, called once per directory open in the tree
  std::pair<bool, handle_ptr> dir_opened(size_t id, future<> op)
  {
    handle_ptr h(op.get_handle());
    // std::cout << "D " << h->path() << std::endl;
    // Now we have an open directory handle, schedule an enumeration
    auto enumeration = dispatcher->enumerate(
    enumerate_req(dispatcher->op_from_scheduled_id(id), metadata_flags::size, 1000));
    future<> enumeration_op(enumeration);
    auto listing =
    std::make_shared<future<std::pair<std::vector<directory_entry>, bool>>>(
    std::move(enumeration));
    auto enumeration_done = dispatcher->completion(
    enumeration_op,
    make_pair(async_op_flags::none,
              std::function<dispatcher::completion_t>(
              std::bind(&find_in_files::dir_enumerated, this, std::placeholders::_1,
                        std::placeholders::_2, listing))));
    doscheduled({enumeration, enumeration_done});
    docompleted(2);
    // Complete only if not the cur dir opened
    return std::make_pair(true, h);
  };
  void dowait()
  {
    // Prepare finished
    auto finished_waiter = finished.get_future();
#if 0  // Disabled to maximise performance
        // Wait till done, retrieving any exceptions as they come in to keep memory consumption down
        std::future_status status;
        do
        {
            status=finished_waiter.wait_for(std::chrono::milliseconds(1000));
            std::cout << "\nDispatcher has " << dispatcher->count() << " fds open and " << dispatcher->wait_queue_depth() << " ops outstanding." << std::endl;
            std::vector<future<>> batch; batch.reserve(10000);
            {
                boost::lock_guard<decltype(opslock)> lock(opslock);
                while(status==std::future_status::timeout ? (ops.size()>5000) : !ops.empty())
                {
                    batch.push_back(std::move(ops.front()));
                    ops.pop_front();
                }
            }
            // Retrieve any exceptions
            std::cout << "Processed " << batch.size() << " ops for exception state." << std::endl;
            if(!batch.empty())
                when_all_p(batch.begin(), batch.end()).wait();
        } while(status==std::future_status::timeout);
#else  /*_ 0 // Disabled to maximise performance _*/
    finished_waiter.wait();
#endif  /*_ 0 // Disabled to maximise performance _*/
  }
  // Constructor, which starts the ball rolling
  find_in_files(const char *_regexpr)
      : regexpr(_regexpr)
      ,
      // Create an AFIO dispatcher that bypasses any filing system buffers
      dispatcher(
      make_dispatcher(
      "file:///",
      file_flags::will_be_sequentially_accessed /*|file_flags::os_direct*/).get())
      , bytesread(0)
      , filesread(0)
      , filesmatched(0)
      , scheduled(0)
      , completed(0)
  {
    filepaths.reserve(50000);

    // Schedule the recursive enumeration of the current directory
    std::cout << "\n\nStarting directory enumerations ..." << std::endl;
    auto cur_dir = dispatcher->dir(path_req(""));
    auto cur_dir_opened = dispatcher->completion(
    cur_dir, std::make_pair(async_op_flags::none,
                            std::function<dispatcher::completion_t>(std::bind(
                            &find_in_files::dir_opened, this, std::placeholders::_1,
                            std::placeholders::_2))));
    doscheduled({cur_dir, cur_dir_opened});
    dowait();
  }
};

int main(int argc, const char *argv[])
{
  using std::placeholders::_1;
  using std::placeholders::_2;
  using namespace boost::afio;
  typedef chrono::duration<double, ratio<1, 1>> secs_type;
  if(argc < 2)
  {
    std::cerr << "ERROR: Specify a regular expression to search all files in the "
                 "current directory." << std::endl;
    return 1;
  }
  // Prime SpeedStep
  auto begin = chrono::high_resolution_clock::now();
  while(chrono::duration_cast<secs_type>(chrono::high_resolution_clock::now() -
                                         begin).count() < 1)
    ;
  try
  {
    begin = chrono::high_resolution_clock::now();
    find_in_files finder(argv[1]);
    auto end = chrono::high_resolution_clock::now();
    auto diff = chrono::duration_cast<secs_type>(end - begin);
    std::cout << "\n" << finder.filesmatched << " files matched out of "
              << finder.filesread << " files which was " << finder.bytesread
              << " bytes." << std::endl;
    std::cout << "The search took " << diff.count() << " seconds which was "
              << finder.filesread / diff.count() << " files per second or "
              << (finder.bytesread / diff.count() / 1024 / 1024) << " Mb/sec."
              << std::endl;
  }
  catch(...)
  {
    std::cerr << boost::current_exception_diagnostic_information(true) << std::endl;
    return 1;
  }
  return 0;
}

Here the find_in_files class is used to carry state across the callbacks — one could just as equally bind the state into each callback's parameters instead via some sort of pointer to a struct. But the difference in complexity is noticeable — you're probably now asking, why choose this hideous complexity over the much clearer OpenMP and iostreams implementation[10]?

Well, that's simple: do you want maximum file i/o performance? Here is a search for Niall in a Boost working directory which contained about 7Gb of data across ~35k files[11]:

Table 1.15. Find in files performance for traditional vs AFIO implementations

iostreams, single threaded

iostreams, OpenMP

Boost.AFIO

Boost.AFIO file_flags::os_direct

Boost.AFIO file_flags::os_mmap[a]

Warm cache

812 Mb/sec

1810 Mb/sec

2663 Mb/sec

N/A

6512 Mb/sec

+0%

+123%

+228%

+702%

Cold cache[b]

16 Mb/sec

8 Mb/sec

15 Mb/sec

13.14 Mb/sec

24 Mb/sec

+0%

-50%

-6%

-18%

+50%

[a] The superiority of memory maps on Windows is because all buffered file i/o is done via memory copying to/from memory maps on Windows anyway, so eliminating that memory copy is huge.


Note how AFIO outperforms the OpenMP iostreams implementation by about 50% for a warm cache, with only a 6% penalty for a cold cache over a single threaded implementation. Note the 50% penalty the OpenMP iostreams implementation suffers for a cold cache — a naive multithreaded implementation causes a lot of disc seeks. If you map a file into memory using file_flags::os_mmap, AFIO will memcpy() from that map instead of reading — or of course you can use the map directly using the pointer returned by try_mapfile().

The eagle eyed amongst you will have noticed that the AFIO implementation looks hand tuned with a special depth first algorithm balancing concurrency with seek locality — that's because I invested two days into achieving maximum possible performance as a demonstration of AFIO's power (and to find and fix some bottlenecks). Some might argue that this is therefore not a fair comparison to the OpenMP iostreams implementation.

There are two parts to answering this argument. The first is that yes, the OpenMP iostreams search algorithm is fairly stupid and simply tries to read as many files as there are CPUs, and those files could be stored anywhere on the spinning disc. Because AFIO can issue far more concurrent file open and read requests than OpenMP, it gives a lot more scope to the filing system to optimise hard drive seeks and satisfy as many requests as is feasible — sufficiently so that with a cold cache, AFIO is a little slower than a single threaded iostreams implementation where the filing system can spot the access pattern and prefetch quite effectively. A completely valid solution to the OpenMP performance deficit would be to increase thread count dramatically.

The second part of answering that argument is this: AFIO's very flexible op chaining structure lets you very easily twiddle with the execution dependency graph to achieve maximum possible performance by balancing concurrency (too much or too little is slower, what you need is just the right balance) and disc seeks (enumerations are best not done in parallel, it defeats any prefetching algorithm), unlike the naive OpenMP implementation which is much harder to play around with. Don't get me wrong: if you have plenty of time on your hands, you can implement a hand written and tuned find in files implementation that is faster than AFIO's implementation, but it will have taken you a lot longer, and the code will neither be as portable nor as maintainable.



[10] Indeed many ask the same when programming WinRT apps — Microsoft's insistance on never allowing any call to block can make simple designs explode with complexities of nested callbacks.

[11] Test machine was a 3.5Ghz Intel i7-3770K Microsoft Windows 8 x64 machine with Seagate ST3320620AS 7200rpm hard drive. Note that AFIO has a native WinIOCP backend which leverages host asynchronous file i/o support.


PrevUpHomeNext