Boost C++ Libraries Home Libraries People FAQ More

PrevUpHomeNext

3. World's simplest named blob store in AFIO (asynchronous)

The problems with this naive design

Let's see the same exact thing as in the last section, but this time with a fully asynchronous public interface. Instead of returning outcome<>, we now return shared_future<>:

namespace afio = BOOST_AFIO_V2_NAMESPACE;
namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem;
using BOOST_OUTCOME_V1_NAMESPACE::lightweight_futures::shared_future;

class data_store
{
  afio::dispatcher_ptr _dispatcher;
  afio::handle_ptr _store;
public:
  // Type used for read streams
  using istream = std::shared_ptr<std::istream>;
  // Type used for write streams
  using ostream = std::shared_ptr<std::ostream>;
  // Type used for lookup
  using lookup_result_type = shared_future<istream>;
  // Type used for write
  using write_result_type = shared_future<ostream>;

  // Disposition flags
  static constexpr size_t writeable = (1<<0);

  // Open a data store at path
  data_store(size_t flags = 0, afio::path path = "store");

  // Look up item named name for reading, returning an istream for the item
  shared_future<istream> lookup(std::string name) noexcept;
  // Look up item named name for writing, returning an ostream for that item
  shared_future<ostream> write(std::string name) noexcept;
};

An extension to the Concurrency TS futures is that Boost.Monad futures are also monads, and therefore implicitly convert from their allowed monadic input types without you needing to write make_ready_future(T) as with the Concurrency TS.

You may be interested to know that the benchmarking harness code is 100% identical for all three of these implementations. This is because outcome<> is sufficiently API-identical to future<> that identical code can drive all three implementations.

write() is now implemented by scheduling the file to be opened for write access and attaching a continuation which will propagate any error, and if no error returns via the shared future the shared pointer to the output stream.

shared_future<data_store::ostream> data_store::write(std::string name) noexcept
{
  if(!is_valid_name(name))
    return error_code(EINVAL, generic_category());
  try
  {
    // Schedule the opening of the file for writing
    afio::future<> h(afio::async_file(_store, name, afio::file_flags::create | afio::file_flags::write));
    // When it completes, call this continuation
    return h.then([](const afio::future<> &h) -> shared_future<data_store::ostream> {
      // If file didn't open, return the error or exception immediately
      BOOST_OUTCOME_PROPAGATE(h);
      // Create an ostream which directly uses the file.
      data_store::ostream ret(std::make_shared<odirectstream>(h.get_handle()));
      return std::move(ret);
    });
  }
  catch (...)
  {
    // Boost.Monad futures are also monads, so this implies a make_ready_future()
    return std::current_exception();
  }
}

lookup() takes the chaining of continuations one sequence further. It schedules the file to be opened for reading and attaches a continuation which firstly propagates any error, and then if the file is big enough it memory maps the file and returns the input stream. If the file is small enough it schedules a read of the entire file, attaching a second continuation to return the input stream.

The below pattern is 100% pure soon-to-be-standardised Concurrency TS future continuations which can be easily coroutinised automatically on a C++ 1z Coroutines supporting compiler (simply insert await before any AFIO async_XXX function). Apart from the fact that Boost.Outcome lightweight futures also provide an error_code transport, there is nothing below which wouldn't work with an upcoming standard C++ library (unless the TS changes substantially, which is very unlikely at this late stage).

[Note] Note

This peer review edition of AFIO v1.40 provides a shim future type standing in for an eventual lightweight future implementation. C++ 1z coroutine support is not implemented for the shim future type.

shared_future<data_store::istream> data_store::lookup(std::string name) noexcept
{
  if(!is_valid_name(name))
    return error_code(EINVAL, generic_category());
  try
  {
    // Schedule the opening of the file for reading
    afio::future<> h(afio::async_file(_store, name, afio::file_flags::read));
    // When it completes, call this continuation
    return h.then([](afio::future<> &_h) -> shared_future<data_store::istream> {
      // If file didn't open, return the error or exception immediately
      BOOST_OUTCOME_PROPAGATE(_h);
      size_t length=(size_t) _h->lstat(afio::metadata_flags::size).st_size;
      // Is a memory map more appropriate?
      if(length>=128*1024)
      {
        afio::handle::mapped_file_ptr mfp;
        if((mfp=_h->map_file()))
        {
          data_store::istream ret(std::make_shared<idirectstream>(_h.get_handle(), std::move(mfp), length));
          return ret;
        }
      }
      // Schedule the reading of the file into a buffer
      auto buffer=std::make_shared<file_buffer_type>(length);
      afio::future<> h(afio::async_read(_h, buffer->data(), length, 0));
      // When the read completes call this continuation
      return h.then([buffer, length](const afio::future<> &h) -> shared_future<data_store::istream> {
        // If read failed, return the error or exception immediately
        BOOST_OUTCOME_PROPAGATE(h);
        data_store::istream ret(std::make_shared<idirectstream>(h.get_handle(), buffer, length));
        return ret;
      });
    });
  }
  catch(...)
  {
    // Boost.Monad futures are also monads, so this implies a make_ready_future()
    return std::current_exception();
  }
}

The input and output stream implementations are pretty similar to before, but here they are for reference:

namespace asio = BOOST_AFIO_V2_NAMESPACE::asio;
using BOOST_AFIO_V2_NAMESPACE::error_code;
using BOOST_AFIO_V2_NAMESPACE::generic_category;

// A special allocator of highly efficient file i/o memory
using file_buffer_type = std::vector<char, afio::utils::page_allocator<char>>;

// An iostream which reads directly from a memory mapped AFIO file
struct idirectstream : public std::istream
{
  struct directstreambuf : public std::streambuf
  {
    afio::handle_ptr h;  // Holds the file open
    std::shared_ptr<file_buffer_type> buffer;
    afio::handle::mapped_file_ptr mfp;
    // From a mmap
    directstreambuf(afio::handle_ptr _h, afio::handle::mapped_file_ptr _mfp, size_t length) : h(std::move(_h)), mfp(std::move(_mfp))
    {
      // Set the get buffer this streambuf is to use
      setg((char *) mfp->addr, (char *) mfp->addr, (char *) mfp->addr + length);
    }
    // From a malloc
    directstreambuf(afio::handle_ptr _h, std::shared_ptr<file_buffer_type> _buffer, size_t length) : h(std::move(_h)), buffer(std::move(_buffer))
    {
      // Set the get buffer this streambuf is to use
      setg(buffer->data(), buffer->data(), buffer->data() + length);
    }
  };
  std::unique_ptr<directstreambuf> buf;
  template<class U> idirectstream(afio::handle_ptr h, U &&buffer, size_t length) : std::istream(new directstreambuf(std::move(h), std::forward<U>(buffer), length)), buf(static_cast<directstreambuf *>(rdbuf()))
  {
  }
  virtual ~idirectstream() override
  {
    // Reset the stream before deleting the buffer
    rdbuf(nullptr);
  }
};

// An iostream which writes to an AFIO file in 4Kb pages
struct odirectstream : public std::ostream
{
  struct directstreambuf : public std::streambuf
  {
    using int_type = std::streambuf::int_type;
    using traits_type = std::streambuf::traits_type;
    afio::future<> lastwrite; // the last async write performed
    afio::off_t offset;       // offset of next write
    file_buffer_type buffer;  // a page size on this machine
    file_buffer_type lastbuffer;
    directstreambuf(afio::handle_ptr _h) : lastwrite(std::move(_h)), offset(0), buffer(afio::utils::page_sizes().front())
    {
      // Set the put buffer this streambuf is to use
      setp(buffer.data(), buffer.data() + buffer.size());
    }
    virtual ~directstreambuf() override
    {
      try
      {
        // Flush buffers and wait until last write completes
        // Schedule an asynchronous write of the buffer to storage
        size_t thisbuffer = pptr() - pbase();
        if(thisbuffer)
          lastwrite = afio::async_write(afio::async_truncate(lastwrite, offset+thisbuffer), buffer.data(), thisbuffer, offset);
        lastwrite.get();
      }
      catch(...)
      {
      }
    }
    virtual int_type overflow(int_type c) override
    {
      size_t thisbuffer=pptr()-pbase();
      if(thisbuffer>=buffer.size())
        sync();
      if(c!=traits_type::eof())
      {
        *pptr()=(char)c;
        pbump(1);
        return traits_type::to_int_type(c);
      }
      return traits_type::eof();
    }
    virtual int sync() override
    {
      // Wait for the last write to complete, propagating any exceptions
      lastwrite.get();
      size_t thisbuffer=pptr()-pbase();
      if(thisbuffer > 0)
      {
        // Detach the current buffer and replace with a fresh one to allow the kernel to steal the page
        lastbuffer=std::move(buffer);
        buffer.resize(lastbuffer.size());
        setp(buffer.data(), buffer.data() + buffer.size());
        // Schedule an extension of physical storage by an extra page
        lastwrite = afio::async_truncate(lastwrite, offset + thisbuffer);
        // Schedule an asynchronous write of the buffer to storage
        lastwrite=afio::async_write(lastwrite, lastbuffer.data(), thisbuffer, offset);
        offset+=thisbuffer;
      }
      return 0;
    }
  };
  std::unique_ptr<directstreambuf> buf;
  odirectstream(afio::handle_ptr h) : std::ostream(new directstreambuf(std::move(h))), buf(static_cast<directstreambuf *>(rdbuf()))
  {
  }
  virtual ~odirectstream() override
  {
    // Reset the stream before deleting the buffer
    rdbuf(nullptr);
  }
};

PrevUpHomeNext