Home | Libraries | People | FAQ | More |
The naive key-value store simply stored each key in a single flat directory, so:
store/dog
store/cat
store/horse
When inserting or updating an item, it simply opened the filename and wrote out the contents thus creating races for concurrent users and potentially writing out the entire directory inode if that filesystem rebalances its directory contents. So how do we implement ACID-ity of updates with constant time updates?
It is actually surprisingly simple thanks to AFIO. Firstly, we are going to reorganise the store's structure with a directory per key name to become this instead:
store/dog/0
store/cat/0
store/horse/0
We are going to change the lookup semantics to this instead:
store/key/0
for
reading.
The insertion semantics becomes this:
store/key/tmpXXXXXXXXXXXXXXXX
for writing optionally using afio::file_flags::always_sync
(this causes the OS to not report write completion until the write
reaches physical storage), creating any store/key
directory as necessary, where XXXXXXXXXXXXXXXX
is a cryptographically strong 128-bit random number.
store/key/tmpXXXXXXXXXXXXXXXX
to store/key/0
using AFIO's special atomic_relink()
function (which because it acts on an open file handle, is invariant
to path relocations of itself). This function does as it says, and
atomically makes visible at store/key/0
our
new value for the key, unlinking any file previously there[6]. Never at any point is there not
a complete and finished value file on physical storage at store/key/0
[7]. On Linux only we then execute a sync on the directory
handle for store/key
because Linux is different from
other operating systems in that you must explicitly request metadata
updates to be sent to physical storage.
Not that we'll be implementing this here, but the deletion semantics would be:
store/key
to store/deadXXXXXXXXXXXXXXXX
where the latter
part is another 128-bit crypto strong random number. Note that Microsoft
Windows does not permit you to rename a directory containing an open
file, so you may need to move all the contents of the directory into
a newly created deadXXXXXXXXXXXXXXXX
directory instead, being careful of races caused by someone setting
a new key-value into the directory you are trying to delete.
What these new semantics achieve:
afio::file_flags::always_sync
was not specified (see below for why). It may not have all the key-values
in an identical order to how they were written as this second generation
design does not implement transactions, but there will always be a
complete value for a key which at some point was stored to that key.
Strictly speaking, this means that durability is not fulfilled —
durability means when you are told a write completes, it is durably
stored, however for full durability simply add afio::file_flags::always_sync
.
O(N)
complexity, but updates of existing
keys is now no worse than lookup complexity, which is a big improvement
over the naive design. If your filing system has O(N)
insert complexity, you could use prefix
directories to form a binary tree, thus reducing insert complexity
to O(log N)
.
Many readers will be wondering why afio::file_flags::always_sync
is not necessary for consistency even under power loss, and the answer
is because of journalled filing systems combined with how we are modifying
data. If you examine the
table of power loss safety guarantees in the design rationale, you
will note that journalled file systems (with write barriers enabled) make
strong guarantees about newly created file content but not about rewritten
file content: if a newly created file appears in the filing system, it
is guaranteed to have correct contents because journalled filing
systems do not write an updated directory pointing to the new file until
the file contents are on physical storage. Our naive design
of creating a brand new file per key-value update exploits this guarantee,
so the sequentially consistent ordering of writes to physical storage,
even without afio::file_flags::always_sync
, is this:
If power is lost at any point, during journal replay on power restore a
journalled filing system will throw away any extents allocated for content
not referenced by a directory on physical storage. The filing system on
power restore therefore refers to a previously consistent filing system
i.e. how it was just before power was lost. This is why the store will
be consistent even under power loss[8], though without afio::file_flags::always_sync
you may lose up to 5-35 seconds of updates and there may be reordering
of updates by up to 5-35 seconds. Here is the table from the design rationale
once again, for your convenience:
Table 1.6. Power loss safety matrix: What non-trivially reconstructible data can you lose if power is suddenly lost? Any help which can be supplied in filling in the unknowns in this table would be hugely appreciated.
Newly created file content corruptable after close |
File data content rewrite corruptable after close |
Cosmic ray bitrot corruptable |
Can punch holes into physical storage of files[a] |
Default max seconds of writes reordered
without using |
|
---|---|---|---|---|---|
FAT32 |
✔ |
✔ |
✔ |
✘ |
? |
ext2 |
✔ |
✔ |
✔ |
✘ |
35 |
ext3/4 |
✔ |
✔ |
✔ |
ext4 only |
35[b] |
ext3/4 |
✘ |
✔ |
✔ |
ext4 only |
35 |
UFS + soft updates[c] |
✘ |
✔ |
✔ |
✔[d] |
30 |
HFS+ |
✘ |
✔ |
✔ |
✔ |
? |
NTFS[e] |
✘ |
✔ |
✔ |
✔ |
Until idle or write limit |
ext3/4 |
✘ |
✘ |
✔ |
ext4 only |
5 |
BTRFS[f] |
✘ |
✘ |
✘ |
✔ |
30 |
ReFS |
✘ |
not if integrity streams enabled |
not if integrity streams enabled |
✔ |
Until idle or write limit |
ZFS |
✘ |
✘ |
✘ |
✔ |
30 |
[a] This is where a filing system permits you to deallocate the physical storage of a region of a file, so a file claiming to occupy 8Mb could be reduced to 1Mb of actual storage consumption. This may sound like sparse file support, but transparent compression support also counts as it would reduce a region written with all zeros to nearly zero physical storage [b]
This is the [d] BSD automatically detects extended regions of all bits zero, and eliminates their physical representation on storage. [f] Source: https://wiki.archlinux.org/index.php/Btrfs |
It goes, of course, without saying that if you are not
on a journalled filing system then you absolutely need afio::file_flags::always_sync
and on Linux you had best sync the directories containing newly written
files too.
So let's look at the code: you will be surprised at how few changes there are compared to the earlier asynchronous AFIO implementation. Firstly, the interface is unchanged from before:
namespace afio = BOOST_AFIO_V2_NAMESPACE; namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem; using BOOST_OUTCOME_V1_NAMESPACE::lightweight_futures::shared_future; class data_store { afio::dispatcher_ptr _dispatcher; afio::handle_ptr _store; public: // Type used for read streams using istream = std::shared_ptr<std::istream>; // Type used for write streams using ostream = std::shared_ptr<std::ostream>; // Type used for lookup using lookup_result_type = shared_future<istream>; // Type used for write using write_result_type = shared_future<ostream>; // Disposition flags static constexpr size_t writeable = (1<<0); // Open a data store at path data_store(size_t flags = 0, afio::path path = "store"); // Look up item named name for reading, returning an istream for the item shared_future<istream> lookup(std::string name) noexcept; // Look up item named name for writing, returning an ostream for that item shared_future<ostream> write(std::string name) noexcept; };
lookup()
is almost identical to before, now it simply opens key/0
instead.
namespace asio = BOOST_AFIO_V2_NAMESPACE::asio; using BOOST_OUTCOME_V1_NAMESPACE::empty; using BOOST_AFIO_V2_NAMESPACE::error_code; using BOOST_AFIO_V2_NAMESPACE::generic_category; static bool is_valid_name(const std::string &name) noexcept { static const std::string banned("<>:\"/\\|?*\0", 10); if(std::string::npos!=name.find_first_of(banned)) return false; // No leading period return name[0]!='.'; } // Keep a cache of crypto strong random names static std::string random_name() { static struct random_names_type { std::vector<std::string> names; size_t idx; random_names_type(size_t count) : names(count), idx(0) { for(size_t n=0; n<count; n++) names[n]=afio::utils::random_string(16); // 128 bits } std::string get() { if(idx==names.size()) idx=0; return names[idx++]; } } random_names(10000); return random_names.get(); } data_store::data_store(size_t flags, afio::path path) { // Make a dispatcher for the local filesystem URI, masking out write flags on all operations if not writeable _dispatcher=afio::make_dispatcher("file:///", afio::file_flags::none, !(flags & writeable) ? afio::file_flags::write : afio::file_flags::none).get(); // Set the dispatcher for this thread, and open a handle to the store directory afio::current_dispatcher_guard h(_dispatcher); _store=afio::dir(std::move(path), afio::file_flags::create); // throws if there was an error // Precalculate the cache of random names random_name(); } shared_future<data_store::istream> data_store::lookup(std::string name) noexcept { if(!is_valid_name(name)) return error_code(EINVAL, generic_category()); try { name.append("/0"); // Schedule the opening of the file for reading afio::future<> h(afio::async_file(_store, name, afio::file_flags::read)); // When it completes, call this continuation return h.then([](afio::future<> &_h) -> shared_future<data_store::istream> { // If file didn't open, return the error or exception immediately BOOST_OUTCOME_PROPAGATE(_h); size_t length=(size_t) _h->lstat(afio::metadata_flags::size).st_size; // Is a memory map more appropriate? if(length>=128*1024) { afio::handle::mapped_file_ptr mfp; if((mfp=_h->map_file())) { data_store::istream ret(std::make_shared<idirectstream>(_h.get_handle(), std::move(mfp), length)); return ret; } } // Schedule the reading of the file into a buffer auto buffer=std::make_shared<file_buffer_type>(length); afio::future<> h(afio::async_read(_h, buffer->data(), length, 0)); // When the read completes call this continuation return h.then([buffer, length](const afio::future<> &h) -> shared_future<data_store::istream> { // If read failed, return the error or exception immediately BOOST_OUTCOME_PROPAGATE(h); data_store::istream ret(std::make_shared<idirectstream>(h.get_handle(), buffer, length)); return ret; }); }); } catch(...) { return std::current_exception(); } }
write()
is a bit different. We now open the key directory, and while that is happening
asynchronously we generate a crypto strong random name which tends to be
slow. We then schedule creating that temporary file with the extra flag
afio::file_flags::hold_parent_open
(which incidentally
can dramatically increase AFIO filesystem performance because a number
of fast code paths can be taken e.g. fetching most metadata about a file
on Windows will actually do a single glob directory enumeration on its
parent directory when available which is about 20x faster. The reason it
is not enabled by default is because of process file descriptor limits
on POSIX, especially the severely low default limit on Apple OS X). In
this particular case, we just want our temporary file to retain knowledge
of its parent because we will fetch that parent later.
shared_future<data_store::ostream> data_store::write(std::string name) noexcept { if(!is_valid_name(name)) return error_code(EINVAL, generic_category()); try { // Schedule the opening of the directory afio::future<> dirh(afio::async_dir(_store, name, afio::file_flags::create)); #ifdef __linux__ // Flush metadata on Linux only. This will be a noop unless we created a new directory // above, and if we don't flush the new key directory it and its contents may not appear // in the store directory after a suddenly power loss, even if it and its contents are // all on physical storage. dirh.then([this](const afio::future<> &h) { async_sync(_store); }); #endif // Make a crypto strong random file name std::string randomname("tmp"); randomname.append(random_name()); // Schedule the opening of the file for writing afio::future<> h(afio::async_file(dirh, randomname, afio::file_flags::create | afio::file_flags::write | afio::file_flags::hold_parent_open // handle should keep a handle_ptr to its parent dir /*| afio::file_flags::always_sync*/ // writes don't complete until upon physical storage )); // When it completes, call this continuation return h.then([](const afio::future<> &h) -> shared_future<data_store::ostream> { // If file didn't open, return the error or exception immediately BOOST_OUTCOME_PROPAGATE(h); // Create an ostream which directly uses the file. data_store::ostream ret(std::make_shared<odirectstream>(h.get_handle())); return std::move(ret); }); } catch (...) { return std::current_exception(); } }
Finally the input and output streams. The input stream is unchanged, whilst the output stream destructor now simply atomically renames the temp file to "0" using the parent directory handle as the base for the rename target in order to ensure race freedom.
namespace asio = BOOST_AFIO_V2_NAMESPACE::asio; using BOOST_OUTCOME_V1_NAMESPACE::empty; using BOOST_AFIO_V2_NAMESPACE::error_code; using BOOST_AFIO_V2_NAMESPACE::generic_category; // A special allocator of highly efficient file i/o memory using file_buffer_type = std::vector<char, afio::utils::page_allocator<char>>; // An iostream which reads directly from a memory mapped AFIO file struct idirectstream : public std::istream { struct directstreambuf : public std::streambuf { afio::handle_ptr h; // Holds the file open std::shared_ptr<file_buffer_type> buffer; afio::handle::mapped_file_ptr mfp; // From a mmap directstreambuf(afio::handle_ptr _h, afio::handle::mapped_file_ptr _mfp, size_t length) : h(std::move(_h)), mfp(std::move(_mfp)) { // Set the get buffer this streambuf is to use setg((char *) mfp->addr, (char *) mfp->addr, (char *) mfp->addr + length); } // From a malloc directstreambuf(afio::handle_ptr _h, std::shared_ptr<file_buffer_type> _buffer, size_t length) : h(std::move(_h)), buffer(std::move(_buffer)) { // Set the get buffer this streambuf is to use setg(buffer->data(), buffer->data(), buffer->data() + length); } }; std::unique_ptr<directstreambuf> buf; template<class U> idirectstream(afio::handle_ptr h, U &&buffer, size_t length) : std::istream(new directstreambuf(std::move(h), std::forward<U>(buffer), length)), buf(static_cast<directstreambuf *>(rdbuf())) { } virtual ~idirectstream() override { // Reset the stream before deleting the buffer rdbuf(nullptr); } }; // An iostream which writes to an AFIO file in 4Kb pages struct odirectstream : public std::ostream { struct directstreambuf : public std::streambuf { using int_type = std::streambuf::int_type; using traits_type = std::streambuf::traits_type; afio::future<> lastwrite; // the last async write performed afio::off_t offset; // offset of next write file_buffer_type buffer; // a page size on this machine file_buffer_type lastbuffer; directstreambuf(afio::handle_ptr _h) : lastwrite(std::move(_h)), offset(0), buffer(afio::utils::page_sizes().front()) { // Set the put buffer this streambuf is to use setp(buffer.data(), buffer.data() + buffer.size()); } virtual ~directstreambuf() override { try { // Flush buffers and wait until last write completes // Schedule an asynchronous write of the buffer to storage size_t thisbuffer = pptr() - pbase(); if(thisbuffer) lastwrite = afio::async_write(afio::async_truncate(lastwrite, offset+thisbuffer), buffer.data(), thisbuffer, offset); lastwrite.get(); // TODO: On Windows do I need to close and reopen the file to flush metadata before // the rename or does the rename do it for me? // Get handle to the parent directory auto dirh(lastwrite->container()); // Atomically rename "tmpXXXXXXXXXXXXXXXX" to "0" lastwrite->atomic_relink(afio::path_req::relative(dirh, "0")); #ifdef __linux__ // Journalled Linux filing systems don't need this, but if you enabled afio::file_flags::always_sync // you might want to issue this too. afio::sync(dirh); #endif } catch(...) { } } virtual int_type overflow(int_type c) override { size_t thisbuffer=pptr()-pbase(); if(thisbuffer>=buffer.size()) sync(); if(c!=traits_type::eof()) { *pptr()=(char)c; pbump(1); return traits_type::to_int_type(c); } return traits_type::eof(); } virtual int sync() override { // Wait for the last write to complete, propagating any exceptions lastwrite.get(); size_t thisbuffer=pptr()-pbase(); if(thisbuffer > 0) { // Detach the current buffer and replace with a fresh one to allow the kernel to steal the page lastbuffer=std::move(buffer); buffer.resize(lastbuffer.size()); setp(buffer.data(), buffer.data() + buffer.size()); // Schedule an extension of physical storage by an extra page lastwrite = afio::async_truncate(lastwrite, offset + thisbuffer); // Schedule an asynchronous write of the buffer to storage lastwrite=afio::async_write(lastwrite, lastbuffer.data(), thisbuffer, offset); offset+=thisbuffer; } return 0; } }; std::unique_ptr<directstreambuf> buf; odirectstream(afio::handle_ptr h) : std::ostream(new directstreambuf(std::move(h))), buf(static_cast<directstreambuf *>(rdbuf())) { } virtual ~odirectstream() override { // Reset the stream before deleting the buffer rdbuf(nullptr); } };
Obviously enough we don't have any garbage collection in here for failed
or aborted writes. Thanks to the unique naming of those files, these are
very easy to spot during a GC pass and if they had a last modified date
several days ago they would ideal for purging. Implementing one of those
is easy using async_enumerate()
,
and is left as an exercise for the reader.
Table 1.7. This second generation solution will perform reasonably well under these conditions:
Condition |
|
---|---|
✔ |
On Microsoft Windows you can place the store deep in a directory hierarchy and use long key names. |
✔ |
Third party threads and processes can rename the location of the store during use. |
✔ |
The size of all the values being read at any given time fits into your virtual address space (which is at least 2Gb on 32 bit, 8Tb on 64 bit). |
✔ |
As many processes and threads may read and write to the store concurrently as you like, including any mix of CIFS and NFS clients. |
✔ |
Processes may unexpectedly exit during modifies with no consequence on consistency. |
✘ |
Maximum performance isn't important to you. |
✔ |
Sudden power loss may not maintain original write ordering across multiple key-value updates, however each key will have some complete value which it had at some point in history. |
✘ |
You don't need to update more than one key-value at once. |
[6] This operation is not possible on Microsoft Windows using the Win32 API — nowhere in the Win32 API is this operation made available. However the NT kernel API does provide this operation, hence AFIO can provide POSIX atomic relink guarantees on Windows.
[7] This may not be true on older NFS mounts, or NFS mounts with the wrong configuration. See the NFS documentation. It is also possible to configure Samba in a way which breaks the atomicity of atomic renaming, see the Samba documentation.
[8] This is as yet untested by empirical testing, however AFIO does no magic tricks, it just thinly wraps the operating system APIs.