Boost C++ Libraries Home Libraries People FAQ More

PrevUpHomeNext

2. World's simplest named blob store in AFIO (synchronous)

Let's see the same thing as in the last section, but written using AFIO. First, the interface is identical to before, just with different private member variables:

namespace afio = BOOST_AFIO_V2_NAMESPACE;
namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem;
using BOOST_OUTCOME_V1_NAMESPACE::outcome;

class data_store
{
  afio::dispatcher_ptr _dispatcher;
  afio::handle_ptr _store;
public:
  // Type used for read streams
  using istream = std::shared_ptr<std::istream>;
  // Type used for write streams
  using ostream = std::shared_ptr<std::ostream>;
  // Type used for lookup
  using lookup_result_type = outcome<istream>;
  // Type used for write
  using write_result_type = outcome<ostream>;

  // Disposition flags
  static constexpr size_t writeable = (1<<0);

  // Open a data store at path
  data_store(size_t flags = 0, afio::path path = "store");

  // Look up item named name for reading, returning a std::istream for the item if it exists
  outcome<istream> lookup(std::string name) noexcept;
  // Look up item named name for writing, returning an ostream for that item
  outcome<ostream> write(std::string name) noexcept;
};

We now have a dispatcher_ptr and a handle_ptr to the store directory which is created/opened during construction of data_store. Instead of constructing to a filesystem path, we now construct to an AFIO path. Note that AFIO paths are always either absolute or relative to some open file handle, and therefore in this situation at the point of construction the current working directory will be fetched and an absolute path constructed. This differs from filesystem which passes through relative paths and lets the OS resolve from the current working directory — AFIO does it this way as the current working directory setting at some later point of asynchronous execution is inherently unpredictable. As the data_store interface is identical, so is the use case from the previous page. The implementation is rather different however:

namespace asio = BOOST_AFIO_V2_NAMESPACE::asio;
using BOOST_OUTCOME_V1_NAMESPACE::empty;
using BOOST_AFIO_V2_NAMESPACE::error_code;
using BOOST_AFIO_V2_NAMESPACE::generic_category;

static bool is_valid_name(const std::string &name) noexcept
{
  static const std::string banned("<>:\"/\\|?*\0", 10);
  if(std::string::npos!=name.find_first_of(banned))
    return false;
  // No leading period
  return name[0]!='.';
}

data_store::data_store(size_t flags, afio::path path)
{
  // Make a dispatcher for the local filesystem URI, masking out write flags on all operations if not writeable
  _dispatcher=afio::make_dispatcher("file:///", afio::file_flags::none, !(flags & writeable) ? afio::file_flags::write : afio::file_flags::none).get();
  // Set the dispatcher for this thread, and open a handle to the store directory
  afio::current_dispatcher_guard h(_dispatcher);
  _store=afio::dir(std::move(path), afio::file_flags::create);  // throws if there was an error
}

outcome<data_store::istream> data_store::lookup(std::string name) noexcept
{
  if(!is_valid_name(name))
    return error_code(EINVAL, generic_category());
  try
  {
    error_code ec;
    // Open the file using the handle to the store directory as the base.
    // The store directory can be freely renamed by any third party process
    // and everything here will work perfectly.
    afio::handle_ptr h(afio::file(ec, _store, name, afio::file_flags::read));
    if(ec)
    {
      // If the file was not found, return empty else the error
      if(ec==error_code(ENOENT, generic_category()))
        return empty;
      return ec;
    }
    // Create an istream which directly uses the mapped file.
    data_store::istream ret(std::make_shared<idirectstream>(ec, std::move(h)));
    if(ec)
      return ec;
    return ret;
  }
  catch(...)
  {
    return std::current_exception();
  }
}

outcome<data_store::ostream> data_store::write(std::string name) noexcept
{
  if(!is_valid_name(name))
    return error_code(EINVAL, generic_category());
  try
  {
    error_code ec;
    // Open the file using the handle to the store directory as the base.
    // The store directory can be freely renamed by any third party process
    // and everything here will work perfectly. You could enable direct
    // buffer writing - this sends 4Kb pages directly to the physical hardware
    // bypassing the kernel file page cache, however this is not optimal if reads of
    // the value are likely to occur soon.
    afio::handle_ptr h(afio::file(ec, _store, name, afio::file_flags::create | afio::file_flags::write
      /*| afio::file_flags::os_direct*/));
    if(ec)
      return ec;
    // Create an ostream which directly uses the mapped file.
    return outcome<data_store::ostream>(std::make_shared<odirectstream>(std::move(h)));
  }
  catch (...)
  {
    return std::current_exception();
  }
}

This is a good bit longer and looks more complex, however there is already a big if not obvious gain: third party processes can happily rename and move around the store directory once it has been opened and this implementation will work perfectly. This is because we open a handle to the store directory in the data_store constructor, and thereafter use that open handle as a base location for all leaf path operations. Except on OS X which currently doesn't support the POSIX race free filesystem extensions, that makes all data store operations invariant to store path mutation on all supported platforms.

Another big gain is we are now using memory mapped files for the lookup which avoids any memory copying, and there is a hint we are also avoiding memory copies in the write too.

You will also note that in the constructor, we specify a URI to make_dispatcher(). This lets you open different kinds of filesystem e.g. ZIP archives, HTTP sites etc. AFIO currently doesn't provide backends except for file:/// i.e. the local filesystem, however future versions will. Note that a dispatcher can force on or off file_flags for all operations scheduled against that dispatcher — here we force mask out any attempt to open anything for writing if we are opening the store read-only.

Finally, you may wonder why we only use current_dispatcher_guard once in the constructor. This is because if AFIO can deduce the dispatcher to use from any precondition you supply, you need not set the thread local dispatcher. As the opening of the store directory is done as an absolute path lookup and therefore takes no inputs specifying a dispatcher, you need to set the current dispatcher in that one situation alone.

The remaining magic is in the custom iostreams implementations idirectstream and odirectstream:

namespace asio = BOOST_AFIO_V2_NAMESPACE::asio;
using BOOST_OUTCOME_V1_NAMESPACE::empty;
using BOOST_AFIO_V2_NAMESPACE::error_code;
using BOOST_AFIO_V2_NAMESPACE::generic_category;

// A special allocator of highly efficient file i/o memory
using file_buffer_type = std::vector<char, afio::utils::page_allocator<char>>;

// An iostream which reads directly from a memory mapped AFIO file
struct idirectstream : public std::istream
{
  struct directstreambuf : public std::streambuf
  {
    afio::handle_ptr h;  // Holds the file open and therefore mapped
    file_buffer_type buffer;
    afio::handle::mapped_file_ptr mfp;
    directstreambuf(error_code &ec, afio::handle_ptr _h) : h(std::move(_h))
    {
      // Get the size of the file. If greater than 128Kb mmap it
      size_t length=(size_t) h->lstat(afio::metadata_flags::size).st_size;
      char *p=nullptr;
      if(length>=128*1024)
      {
        if((mfp=h->map_file()))
          p = (char *) mfp->addr;
      }
      if(!p)
      {
        buffer.resize(length);
        afio::read(ec, h, buffer.data(), length, 0);
        p=buffer.data();
      }
      // Set the get buffer this streambuf is to use
      setg(p, p, p + length);
    }
  };
  std::unique_ptr<directstreambuf> buf;
  idirectstream(error_code &ec, afio::handle_ptr h) : std::istream(new directstreambuf(ec, std::move(h))), buf(static_cast<directstreambuf *>(rdbuf()))
  {
  }
  virtual ~idirectstream() override
  {
    // Reset the stream before deleting the buffer
    rdbuf(nullptr);
  }
};

// An iostream which writes to an AFIO file in 4Kb pages
struct odirectstream : public std::ostream
{
  struct directstreambuf : public std::streambuf
  {
    using int_type = std::streambuf::int_type;
    using traits_type = std::streambuf::traits_type;
    afio::future<> lastwrite; // the last async write performed
    afio::off_t offset;       // offset of next write
    file_buffer_type buffer;  // a page size on this machine
    file_buffer_type lastbuffer;
    directstreambuf(afio::handle_ptr _h) : lastwrite(std::move(_h)), offset(0), buffer(afio::utils::page_sizes().front())
    {
      // Set the put buffer this streambuf is to use
      setp(buffer.data(), buffer.data() + buffer.size());
    }
    virtual ~directstreambuf() override
    {
      try
      {
        // Flush buffers and wait until last write completes
        // Schedule an asynchronous write of the buffer to storage
        size_t thisbuffer = pptr() - pbase();
        if(thisbuffer)
          lastwrite = afio::async_write(afio::async_truncate(lastwrite, offset+thisbuffer), buffer.data(), thisbuffer, offset);
        lastwrite.get();
      }
      catch(...)
      {
      }
    }
    virtual int_type overflow(int_type c) override
    {
      size_t thisbuffer=pptr()-pbase();
      if(thisbuffer>=buffer.size())
        sync();
      if(c!=traits_type::eof())
      {
        *pptr()=(char)c;
        pbump(1);
        return traits_type::to_int_type(c);
      }
      return traits_type::eof();
    }
    virtual int sync() override
    {
      // Wait for the last write to complete, propagating any exceptions
      lastwrite.get();
      size_t thisbuffer=pptr()-pbase();
      if(thisbuffer > 0)
      {
        // Detach the current buffer and replace with a fresh one to allow the kernel to steal the page
        lastbuffer=std::move(buffer);
        buffer.resize(lastbuffer.size());
        setp(buffer.data(), buffer.data() + buffer.size());
        // Schedule an extension of physical storage by an extra page
        lastwrite = afio::async_truncate(lastwrite, offset + thisbuffer);
        // Schedule an asynchronous write of the buffer to storage
        lastwrite=afio::async_write(lastwrite, lastbuffer.data(), thisbuffer, offset);
        offset+=thisbuffer;
      }
      return 0;
    }
  };
  std::unique_ptr<directstreambuf> buf;
  odirectstream(afio::handle_ptr h) : std::ostream(new directstreambuf(std::move(h))), buf(static_cast<directstreambuf *>(rdbuf()))
  {
  }
  virtual ~odirectstream() override
  {
    // Reset the stream before deleting the buffer
    rdbuf(nullptr);
  }
};

These are not hugely conforming iostreams implementations in order to keep them brief for the purposes of this tutorial. The read stream is very straightforward, we simply have streambuf directly use the memory map of the file.

The write stream is a bit more complicated: we fill a 4Kb page and asynchronously write it out using a page stealing friendly algorithm which in the usual case means the kernel simply takes possession of the 4Kb page as-is with no memory copying at all. At some future point it will be flushed onto storage. The reason this works is thanks to the special STL allocator utils::page_allocator<> which returns whole kernel page cache pages. Most kernels will mark whole pages scheduled for write with copy-on-write behaviour such that they can be safely DMAed by the kernel DMA engine as any subsequent write will cause a copy, so because we never write to the page between the write request and freeing the page, most kernels simply transfer ownership of the page from the user space process to the file page cache with no further processing. Hence the asynchronous write of the page tends to complete very quickly — indeed far faster than copying 4Kb of memory and often quicker than the time to fill another page, and hence we wait for any previous write to complete before scheduling the next write in order to report any errors which occurred during the write.

This is the first use of asynchronous i/o in this tutorial. AFIO provides a custom future<T> type extending the lightweight monadic futures framework in Boost.Outcome, so you get all the C++ 1z Concurrency TS extensions, C++ 1z coroutines support and Boost.Thread future extensions in the AFIO custom future<>. There are also many additional extensions beyond what Boost.Thread or the Concurrency TS provides, including foreign future composable waits.

Table 1.4. This solution will perform reasonably well under these conditions:

Condition

On Microsoft Windows you can place the store deep in a directory hierarchy and use long key names.

Third party threads and processes can rename the location of the store during use.

The size of all the values being read at any given time fits into your virtual address space (which is at least 2Gb on 32 bit, 8Tb on 64 bit).

Only one thread or process will ever interact with the key-value store at a time.

You don't care what happens if your process unexpectedly exits during a modify.

Maximum performance isn't important to you.

You don't care what happens under power loss.

You don't need to update more than one key-value at once.



PrevUpHomeNext