Home | Libraries | People | FAQ | More |
Let's see the same exact thing as in the last section, but this time with
a fully asynchronous public interface. Instead of returning outcome<>
,
we now return shared_future<>
:
namespace afio = BOOST_AFIO_V2_NAMESPACE; namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem; using BOOST_OUTCOME_V1_NAMESPACE::lightweight_futures::shared_future; class data_store { afio::dispatcher_ptr _dispatcher; afio::handle_ptr _store; public: // Type used for read streams using istream = std::shared_ptr<std::istream>; // Type used for write streams using ostream = std::shared_ptr<std::ostream>; // Type used for lookup using lookup_result_type = shared_future<istream>; // Type used for write using write_result_type = shared_future<ostream>; // Disposition flags static constexpr size_t writeable = (1<<0); // Open a data store at path data_store(size_t flags = 0, afio::path path = "store"); // Look up item named name for reading, returning an istream for the item shared_future<istream> lookup(std::string name) noexcept; // Look up item named name for writing, returning an ostream for that item shared_future<ostream> write(std::string name) noexcept; };
An extension to the Concurrency TS futures is that Boost.Monad futures
are also monads, and therefore implicitly convert from their allowed monadic
input types without you needing to write make_ready_future(T)
as with the Concurrency TS.
You may be interested to know that the benchmarking harness code is 100%
identical for all three of these implementations. This is because outcome<>
is sufficiently API-identical to future<>
that identical code can drive all
three implementations.
write()
is now implemented by scheduling the file to be opened for write access
and attaching a continuation which will propagate any error, and if no
error returns via the shared future the shared pointer to the output stream.
shared_future<data_store::ostream> data_store::write(std::string name) noexcept { if(!is_valid_name(name)) return error_code(EINVAL, generic_category()); try { // Schedule the opening of the file for writing afio::future<> h(afio::async_file(_store, name, afio::file_flags::create | afio::file_flags::write)); // When it completes, call this continuation return h.then([](const afio::future<> &h) -> shared_future<data_store::ostream> { // If file didn't open, return the error or exception immediately BOOST_OUTCOME_PROPAGATE(h); // Create an ostream which directly uses the file. data_store::ostream ret(std::make_shared<odirectstream>(h.get_handle())); return std::move(ret); }); } catch (...) { // Boost.Monad futures are also monads, so this implies a make_ready_future() return std::current_exception(); } }
lookup()
takes the chaining of continuations one sequence further. It schedules
the file to be opened for reading and attaches a continuation which firstly
propagates any error, and then if the file is big enough it memory maps
the file and returns the input stream. If the file is small enough it schedules
a read of the entire file, attaching a second continuation to return the
input stream.
The below pattern is 100% pure soon-to-be-standardised Concurrency TS future
continuations which can be easily coroutinised automatically on a C++ 1z
Coroutines supporting compiler (simply insert await
before any AFIO async_XXX
function). Apart from the fact that Boost.Outcome
lightweight futures also provide an error_code
transport, there is nothing below which wouldn't work with an upcoming
standard C++ library (unless the TS changes substantially, which is very
unlikely at this late stage).
Note | |
---|---|
This peer review edition of AFIO v1.40 provides a shim future type standing in for an eventual lightweight future implementation. C++ 1z coroutine support is not implemented for the shim future type. |
shared_future<data_store::istream> data_store::lookup(std::string name) noexcept { if(!is_valid_name(name)) return error_code(EINVAL, generic_category()); try { // Schedule the opening of the file for reading afio::future<> h(afio::async_file(_store, name, afio::file_flags::read)); // When it completes, call this continuation return h.then([](afio::future<> &_h) -> shared_future<data_store::istream> { // If file didn't open, return the error or exception immediately BOOST_OUTCOME_PROPAGATE(_h); size_t length=(size_t) _h->lstat(afio::metadata_flags::size).st_size; // Is a memory map more appropriate? if(length>=128*1024) { afio::handle::mapped_file_ptr mfp; if((mfp=_h->map_file())) { data_store::istream ret(std::make_shared<idirectstream>(_h.get_handle(), std::move(mfp), length)); return ret; } } // Schedule the reading of the file into a buffer auto buffer=std::make_shared<file_buffer_type>(length); afio::future<> h(afio::async_read(_h, buffer->data(), length, 0)); // When the read completes call this continuation return h.then([buffer, length](const afio::future<> &h) -> shared_future<data_store::istream> { // If read failed, return the error or exception immediately BOOST_OUTCOME_PROPAGATE(h); data_store::istream ret(std::make_shared<idirectstream>(h.get_handle(), buffer, length)); return ret; }); }); } catch(...) { // Boost.Monad futures are also monads, so this implies a make_ready_future() return std::current_exception(); } }
The input and output stream implementations are pretty similar to before, but here they are for reference:
namespace asio = BOOST_AFIO_V2_NAMESPACE::asio; using BOOST_AFIO_V2_NAMESPACE::error_code; using BOOST_AFIO_V2_NAMESPACE::generic_category; // A special allocator of highly efficient file i/o memory using file_buffer_type = std::vector<char, afio::utils::page_allocator<char>>; // An iostream which reads directly from a memory mapped AFIO file struct idirectstream : public std::istream { struct directstreambuf : public std::streambuf { afio::handle_ptr h; // Holds the file open std::shared_ptr<file_buffer_type> buffer; afio::handle::mapped_file_ptr mfp; // From a mmap directstreambuf(afio::handle_ptr _h, afio::handle::mapped_file_ptr _mfp, size_t length) : h(std::move(_h)), mfp(std::move(_mfp)) { // Set the get buffer this streambuf is to use setg((char *) mfp->addr, (char *) mfp->addr, (char *) mfp->addr + length); } // From a malloc directstreambuf(afio::handle_ptr _h, std::shared_ptr<file_buffer_type> _buffer, size_t length) : h(std::move(_h)), buffer(std::move(_buffer)) { // Set the get buffer this streambuf is to use setg(buffer->data(), buffer->data(), buffer->data() + length); } }; std::unique_ptr<directstreambuf> buf; template<class U> idirectstream(afio::handle_ptr h, U &&buffer, size_t length) : std::istream(new directstreambuf(std::move(h), std::forward<U>(buffer), length)), buf(static_cast<directstreambuf *>(rdbuf())) { } virtual ~idirectstream() override { // Reset the stream before deleting the buffer rdbuf(nullptr); } }; // An iostream which writes to an AFIO file in 4Kb pages struct odirectstream : public std::ostream { struct directstreambuf : public std::streambuf { using int_type = std::streambuf::int_type; using traits_type = std::streambuf::traits_type; afio::future<> lastwrite; // the last async write performed afio::off_t offset; // offset of next write file_buffer_type buffer; // a page size on this machine file_buffer_type lastbuffer; directstreambuf(afio::handle_ptr _h) : lastwrite(std::move(_h)), offset(0), buffer(afio::utils::page_sizes().front()) { // Set the put buffer this streambuf is to use setp(buffer.data(), buffer.data() + buffer.size()); } virtual ~directstreambuf() override { try { // Flush buffers and wait until last write completes // Schedule an asynchronous write of the buffer to storage size_t thisbuffer = pptr() - pbase(); if(thisbuffer) lastwrite = afio::async_write(afio::async_truncate(lastwrite, offset+thisbuffer), buffer.data(), thisbuffer, offset); lastwrite.get(); } catch(...) { } } virtual int_type overflow(int_type c) override { size_t thisbuffer=pptr()-pbase(); if(thisbuffer>=buffer.size()) sync(); if(c!=traits_type::eof()) { *pptr()=(char)c; pbump(1); return traits_type::to_int_type(c); } return traits_type::eof(); } virtual int sync() override { // Wait for the last write to complete, propagating any exceptions lastwrite.get(); size_t thisbuffer=pptr()-pbase(); if(thisbuffer > 0) { // Detach the current buffer and replace with a fresh one to allow the kernel to steal the page lastbuffer=std::move(buffer); buffer.resize(lastbuffer.size()); setp(buffer.data(), buffer.data() + buffer.size()); // Schedule an extension of physical storage by an extra page lastwrite = afio::async_truncate(lastwrite, offset + thisbuffer); // Schedule an asynchronous write of the buffer to storage lastwrite=afio::async_write(lastwrite, lastbuffer.data(), thisbuffer, offset); offset+=thisbuffer; } return 0; } }; std::unique_ptr<directstreambuf> buf; odirectstream(afio::handle_ptr h) : std::ostream(new directstreambuf(std::move(h))), buf(static_cast<directstreambuf *>(rdbuf())) { } virtual ~odirectstream() override { // Reset the stream before deleting the buffer rdbuf(nullptr); } };