Home | Libraries | People | FAQ | More |
Let's see the same thing as in the last section, but written using AFIO. First, the interface is identical to before, just with different private member variables:
namespace afio = BOOST_AFIO_V2_NAMESPACE; namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem; using BOOST_OUTCOME_V1_NAMESPACE::outcome; class data_store { afio::dispatcher_ptr _dispatcher; afio::handle_ptr _store; public: // Type used for read streams using istream = std::shared_ptr<std::istream>; // Type used for write streams using ostream = std::shared_ptr<std::ostream>; // Type used for lookup using lookup_result_type = outcome<istream>; // Type used for write using write_result_type = outcome<ostream>; // Disposition flags static constexpr size_t writeable = (1<<0); // Open a data store at path data_store(size_t flags = 0, afio::path path = "store"); // Look up item named name for reading, returning a std::istream for the item if it exists outcome<istream> lookup(std::string name) noexcept; // Look up item named name for writing, returning an ostream for that item outcome<ostream> write(std::string name) noexcept; };
We now have a dispatcher_ptr
and a handle_ptr
to the
store directory which is created/opened during construction of data_store
. Instead of constructing to
a filesystem path, we now construct to an AFIO path. Note that AFIO paths
are always either absolute or relative to some open file handle, and therefore
in this situation at the point of construction the current working directory
will be fetched and an absolute path constructed. This differs from filesystem
which passes through relative paths and lets the OS resolve from the current
working directory — AFIO does it this way as the current working
directory setting at some later point of asynchronous execution is inherently
unpredictable. As the data_store
interface is identical, so is the use case from the previous page. The
implementation is rather different however:
namespace asio = BOOST_AFIO_V2_NAMESPACE::asio; using BOOST_OUTCOME_V1_NAMESPACE::empty; using BOOST_AFIO_V2_NAMESPACE::error_code; using BOOST_AFIO_V2_NAMESPACE::generic_category; static bool is_valid_name(const std::string &name) noexcept { static const std::string banned("<>:\"/\\|?*\0", 10); if(std::string::npos!=name.find_first_of(banned)) return false; // No leading period return name[0]!='.'; } data_store::data_store(size_t flags, afio::path path) { // Make a dispatcher for the local filesystem URI, masking out write flags on all operations if not writeable _dispatcher=afio::make_dispatcher("file:///", afio::file_flags::none, !(flags & writeable) ? afio::file_flags::write : afio::file_flags::none).get(); // Set the dispatcher for this thread, and open a handle to the store directory afio::current_dispatcher_guard h(_dispatcher); _store=afio::dir(std::move(path), afio::file_flags::create); // throws if there was an error } outcome<data_store::istream> data_store::lookup(std::string name) noexcept { if(!is_valid_name(name)) return error_code(EINVAL, generic_category()); try { error_code ec; // Open the file using the handle to the store directory as the base. // The store directory can be freely renamed by any third party process // and everything here will work perfectly. afio::handle_ptr h(afio::file(ec, _store, name, afio::file_flags::read)); if(ec) { // If the file was not found, return empty else the error if(ec==error_code(ENOENT, generic_category())) return empty; return ec; } // Create an istream which directly uses the mapped file. data_store::istream ret(std::make_shared<idirectstream>(ec, std::move(h))); if(ec) return ec; return ret; } catch(...) { return std::current_exception(); } } outcome<data_store::ostream> data_store::write(std::string name) noexcept { if(!is_valid_name(name)) return error_code(EINVAL, generic_category()); try { error_code ec; // Open the file using the handle to the store directory as the base. // The store directory can be freely renamed by any third party process // and everything here will work perfectly. You could enable direct // buffer writing - this sends 4Kb pages directly to the physical hardware // bypassing the kernel file page cache, however this is not optimal if reads of // the value are likely to occur soon. afio::handle_ptr h(afio::file(ec, _store, name, afio::file_flags::create | afio::file_flags::write /*| afio::file_flags::os_direct*/)); if(ec) return ec; // Create an ostream which directly uses the mapped file. return outcome<data_store::ostream>(std::make_shared<odirectstream>(std::move(h))); } catch (...) { return std::current_exception(); } }
This is a good bit longer and looks more complex, however there is already
a big if not obvious gain: third party processes can happily rename and
move around the store directory once it has been opened and this implementation
will work perfectly. This is because we open a handle to the store directory
in the data_store
constructor,
and thereafter use that open handle as a base location for all leaf path
operations. Except on OS X which currently doesn't support the POSIX race
free filesystem extensions, that makes all data store operations invariant
to store path mutation on all supported platforms.
Another big gain is we are now using memory mapped files for the lookup which avoids any memory copying, and there is a hint we are also avoiding memory copies in the write too.
You will also note that in the constructor, we specify a URI to make_dispatcher()
.
This lets you open different kinds of filesystem e.g. ZIP archives, HTTP
sites etc. AFIO currently doesn't provide backends except for file:///
i.e. the local filesystem, however future versions will. Note that a dispatcher
can force on or off file_flags
for all operations scheduled against that dispatcher — here we force
mask out any attempt to open anything for writing if we are opening the
store read-only.
Finally, you may wonder why we only use current_dispatcher_guard
once in the constructor. This is because if AFIO can deduce the dispatcher
to use from any precondition you supply, you need not set the thread local
dispatcher. As the opening of the store directory is done as an absolute
path lookup and therefore takes no inputs specifying a dispatcher, you
need to set the current dispatcher in that one situation alone.
The remaining magic is in the custom iostreams implementations idirectstream
and odirectstream
:
namespace asio = BOOST_AFIO_V2_NAMESPACE::asio; using BOOST_OUTCOME_V1_NAMESPACE::empty; using BOOST_AFIO_V2_NAMESPACE::error_code; using BOOST_AFIO_V2_NAMESPACE::generic_category; // A special allocator of highly efficient file i/o memory using file_buffer_type = std::vector<char, afio::utils::page_allocator<char>>; // An iostream which reads directly from a memory mapped AFIO file struct idirectstream : public std::istream { struct directstreambuf : public std::streambuf { afio::handle_ptr h; // Holds the file open and therefore mapped file_buffer_type buffer; afio::handle::mapped_file_ptr mfp; directstreambuf(error_code &ec, afio::handle_ptr _h) : h(std::move(_h)) { // Get the size of the file. If greater than 128Kb mmap it size_t length=(size_t) h->lstat(afio::metadata_flags::size).st_size; char *p=nullptr; if(length>=128*1024) { if((mfp=h->map_file())) p = (char *) mfp->addr; } if(!p) { buffer.resize(length); afio::read(ec, h, buffer.data(), length, 0); p=buffer.data(); } // Set the get buffer this streambuf is to use setg(p, p, p + length); } }; std::unique_ptr<directstreambuf> buf; idirectstream(error_code &ec, afio::handle_ptr h) : std::istream(new directstreambuf(ec, std::move(h))), buf(static_cast<directstreambuf *>(rdbuf())) { } virtual ~idirectstream() override { // Reset the stream before deleting the buffer rdbuf(nullptr); } }; // An iostream which writes to an AFIO file in 4Kb pages struct odirectstream : public std::ostream { struct directstreambuf : public std::streambuf { using int_type = std::streambuf::int_type; using traits_type = std::streambuf::traits_type; afio::future<> lastwrite; // the last async write performed afio::off_t offset; // offset of next write file_buffer_type buffer; // a page size on this machine file_buffer_type lastbuffer; directstreambuf(afio::handle_ptr _h) : lastwrite(std::move(_h)), offset(0), buffer(afio::utils::page_sizes().front()) { // Set the put buffer this streambuf is to use setp(buffer.data(), buffer.data() + buffer.size()); } virtual ~directstreambuf() override { try { // Flush buffers and wait until last write completes // Schedule an asynchronous write of the buffer to storage size_t thisbuffer = pptr() - pbase(); if(thisbuffer) lastwrite = afio::async_write(afio::async_truncate(lastwrite, offset+thisbuffer), buffer.data(), thisbuffer, offset); lastwrite.get(); } catch(...) { } } virtual int_type overflow(int_type c) override { size_t thisbuffer=pptr()-pbase(); if(thisbuffer>=buffer.size()) sync(); if(c!=traits_type::eof()) { *pptr()=(char)c; pbump(1); return traits_type::to_int_type(c); } return traits_type::eof(); } virtual int sync() override { // Wait for the last write to complete, propagating any exceptions lastwrite.get(); size_t thisbuffer=pptr()-pbase(); if(thisbuffer > 0) { // Detach the current buffer and replace with a fresh one to allow the kernel to steal the page lastbuffer=std::move(buffer); buffer.resize(lastbuffer.size()); setp(buffer.data(), buffer.data() + buffer.size()); // Schedule an extension of physical storage by an extra page lastwrite = afio::async_truncate(lastwrite, offset + thisbuffer); // Schedule an asynchronous write of the buffer to storage lastwrite=afio::async_write(lastwrite, lastbuffer.data(), thisbuffer, offset); offset+=thisbuffer; } return 0; } }; std::unique_ptr<directstreambuf> buf; odirectstream(afio::handle_ptr h) : std::ostream(new directstreambuf(std::move(h))), buf(static_cast<directstreambuf *>(rdbuf())) { } virtual ~odirectstream() override { // Reset the stream before deleting the buffer rdbuf(nullptr); } };
These are not hugely conforming iostreams implementations in order to keep
them brief for the purposes of this tutorial. The read stream is very straightforward,
we simply have streambuf
directly use the memory map of the file.
The write stream is a bit more complicated: we fill a 4Kb page and asynchronously
write it out using a page stealing friendly algorithm which in the usual
case means the kernel simply takes possession of the 4Kb page as-is with
no memory copying at all. At some future point it will be flushed onto
storage. The reason this works is thanks to the special STL allocator
utils::page_allocator<>
which returns whole kernel page cache pages. Most kernels will mark whole
pages scheduled for write with copy-on-write behaviour such that they can
be safely DMAed by the kernel DMA engine as any subsequent write will cause
a copy, so because we never write to the page between the write request
and freeing the page, most kernels simply transfer ownership of the page
from the user space process to the file page cache with no further processing.
Hence the asynchronous write of the page tends to complete very quickly
— indeed far faster than copying 4Kb of memory and often quicker
than the time to fill another page, and hence we wait for any previous
write to complete before scheduling the next write in order to report any
errors which occurred during the write.
This is the first use of asynchronous i/o in this tutorial. AFIO provides
a custom future<T>
type extending the lightweight monadic futures framework in Boost.Outcome,
so you get all the C++
1z Concurrency TS extensions, C++
1z coroutines support and Boost.Thread
future extensions in the AFIO custom future<>
. There are also many additional
extensions beyond what Boost.Thread or the Concurrency TS provides, including
foreign future composable waits.
Table 1.4. This solution will perform reasonably well under these conditions:
Condition |
|
---|---|
✔ |
On Microsoft Windows you can place the store deep in a directory hierarchy and use long key names. |
✔ |
Third party threads and processes can rename the location of the store during use. |
✔ |
The size of all the values being read at any given time fits into your virtual address space (which is at least 2Gb on 32 bit, 8Tb on 64 bit). |
✘ |
Only one thread or process will ever interact with the key-value store at a time. |
✘ |
You don't care what happens if your process unexpectedly exits during a modify. |
✘ |
Maximum performance isn't important to you. |
✘ |
You don't care what happens under power loss. |
✘ |
You don't need to update more than one key-value at once. |