Boost C++ Libraries Home Libraries People FAQ More

PrevUpHomeNext

4. Second attempt at a key-value store: How to implement atomic value updates and therefore Atomicity, Consistency, Isolation and Durability

The performance of and problems with this second generation design

The naive key-value store simply stored each key in a single flat directory, so:

When inserting or updating an item, it simply opened the filename and wrote out the contents thus creating races for concurrent users and potentially writing out the entire directory inode if that filesystem rebalances its directory contents. So how do we implement ACID-ity of updates with constant time updates?

It is actually surprisingly simple thanks to AFIO. Firstly, we are going to reorganise the store's structure with a directory per key name to become this instead:

We are going to change the lookup semantics to this instead:

  1. For some key, open the file store/key/0 for reading.

The insertion semantics becomes this:

  1. For some key, open the file store/key/tmpXXXXXXXXXXXXXXXX for writing optionally using afio::file_flags::always_sync (this causes the OS to not report write completion until the write reaches physical storage), creating any store/key directory as necessary, where XXXXXXXXXXXXXXXX is a cryptographically strong 128-bit random number.
  2. Write the new value using the output stream as normal, keeping the handle open.
  3. Atomically rename store/key/tmpXXXXXXXXXXXXXXXX to store/key/0 using AFIO's special atomic_relink() function (which because it acts on an open file handle, is invariant to path relocations of itself). This function does as it says, and atomically makes visible at store/key/0 our new value for the key, unlinking any file previously there[6]. Never at any point is there not a complete and finished value file on physical storage at store/key/0[7]. On Linux only we then execute a sync on the directory handle for store/key because Linux is different from other operating systems in that you must explicitly request metadata updates to be sent to physical storage.

Not that we'll be implementing this here, but the deletion semantics would be:

  1. Atomically rename store/key to store/deadXXXXXXXXXXXXXXXX where the latter part is another 128-bit crypto strong random number. Note that Microsoft Windows does not permit you to rename a directory containing an open file, so you may need to move all the contents of the directory into a newly created deadXXXXXXXXXXXXXXXX directory instead, being careful of races caused by someone setting a new key-value into the directory you are trying to delete.
  2. Recursively delete the dead directory. On Microsoft Windows, AFIO tags the items for later deletion when the last handle to them is closed. Remember that someone doing a lookup may still have a handle open to the just deleted version of the handle, but on both POSIX and Windows this is fine — the physical storage will be deallocated on last handle close, even (usually) under unexpected power loss.

What these new semantics achieve:

  1. Atomicity: Values always appear to concurrent readers complete and finished.
  2. Consistency: If a writer fatal exits during a key update, the store is left in a consistent state.
  3. Isolation: The operating system provides strong ordering guarantees about atomic relinking, thus creating a sequentially consistent ordering of key-value update visibility.
  4. (Durability): If power loss suddenly occurs, the key-value store will always be internally consistent, even if afio::file_flags::always_sync was not specified (see below for why). It may not have all the key-values in an identical order to how they were written as this second generation design does not implement transactions, but there will always be a complete value for a key which at some point was stored to that key. Strictly speaking, this means that durability is not fulfilled — durability means when you are told a write completes, it is durably stored, however for full durability simply add afio::file_flags::always_sync.
  5. Much improved update complexity: New key insertion may still be as poor as O(N) complexity, but updates of existing keys is now no worse than lookup complexity, which is a big improvement over the naive design. If your filing system has O(N) insert complexity, you could use prefix directories to form a binary tree, thus reducing insert complexity to O(log N).

Many readers will be wondering why afio::file_flags::always_sync is not necessary for consistency even under power loss, and the answer is because of journalled filing systems combined with how we are modifying data. If you examine the table of power loss safety guarantees in the design rationale, you will note that journalled file systems (with write barriers enabled) make strong guarantees about newly created file content but not about rewritten file content: if a newly created file appears in the filing system, it is guaranteed to have correct contents because journalled filing systems do not write an updated directory pointing to the new file until the file contents are on physical storage. Our naive design of creating a brand new file per key-value update exploits this guarantee, so the sequentially consistent ordering of writes to physical storage, even without afio::file_flags::always_sync, is this:

  1. Allocate extents for new file content. Write journal.
  2. Write new file content to physical storage. Write journal.
  3. Write new copy of directory containing atomic rename of randomly named file with deallocation of extents of previous file. Write journal.

If power is lost at any point, during journal replay on power restore a journalled filing system will throw away any extents allocated for content not referenced by a directory on physical storage. The filing system on power restore therefore refers to a previously consistent filing system i.e. how it was just before power was lost. This is why the store will be consistent even under power loss[8], though without afio::file_flags::always_sync you may lose up to 5-35 seconds of updates and there may be reordering of updates by up to 5-35 seconds. Here is the table from the design rationale once again, for your convenience:

Table 1.6. Power loss safety matrix: What non-trivially reconstructible data can you lose if power is suddenly lost? Any help which can be supplied in filling in the unknowns in this table would be hugely appreciated.

Newly created file content corruptable after close

File data content rewrite corruptable after close

Cosmic ray bitrot corruptable

Can punch holes into physical storage of files[a]

Default max seconds of writes reordered without using fsync()

FAT32

?

ext2

35

ext3/4 data=writeback

ext4 only

35[b]

ext3/4 data=ordered (default)

ext4 only

35

UFS + soft updates[c]

[d]

30

HFS+

?

NTFS[e]

Until idle or write limit

ext3/4 data=journal

ext4 only

5

BTRFS[f]

30

ReFS

not if integrity streams enabled

not if integrity streams enabled

Until idle or write limit

ZFS

30

[a] This is where a filing system permits you to deallocate the physical storage of a region of a file, so a file claiming to occupy 8Mb could be reduced to 1Mb of actual storage consumption. This may sound like sparse file support, but transparent compression support also counts as it would reduce a region written with all zeros to nearly zero physical storage

[b] This is the commit mount setting added to the /proc/sys/vm/dirty_expire_centiseconds value. Sources: https://www.kernel.org/doc/Documentation/filesystems/ext4.txt and http://www.westnet.com/~gsmith/content/linux-pdflush.htm

[d] BSD automatically detects extended regions of all bits zero, and eliminates their physical representation on storage.


It goes, of course, without saying that if you are not on a journalled filing system then you absolutely need afio::file_flags::always_sync and on Linux you had best sync the directories containing newly written files too.

So let's look at the code: you will be surprised at how few changes there are compared to the earlier asynchronous AFIO implementation. Firstly, the interface is unchanged from before:

namespace afio = BOOST_AFIO_V2_NAMESPACE;
namespace filesystem = BOOST_AFIO_V2_NAMESPACE::filesystem;
using BOOST_OUTCOME_V1_NAMESPACE::lightweight_futures::shared_future;

class data_store
{
  afio::dispatcher_ptr _dispatcher;
  afio::handle_ptr _store;
public:
  // Type used for read streams
  using istream = std::shared_ptr<std::istream>;
  // Type used for write streams
  using ostream = std::shared_ptr<std::ostream>;
  // Type used for lookup
  using lookup_result_type = shared_future<istream>;
  // Type used for write
  using write_result_type = shared_future<ostream>;

  // Disposition flags
  static constexpr size_t writeable = (1<<0);

  // Open a data store at path
  data_store(size_t flags = 0, afio::path path = "store");

  // Look up item named name for reading, returning an istream for the item
  shared_future<istream> lookup(std::string name) noexcept;
  // Look up item named name for writing, returning an ostream for that item
  shared_future<ostream> write(std::string name) noexcept;
};

lookup() is almost identical to before, now it simply opens key/0 instead.

namespace asio = BOOST_AFIO_V2_NAMESPACE::asio;
using BOOST_OUTCOME_V1_NAMESPACE::empty;
using BOOST_AFIO_V2_NAMESPACE::error_code;
using BOOST_AFIO_V2_NAMESPACE::generic_category;

static bool is_valid_name(const std::string &name) noexcept
{
  static const std::string banned("<>:\"/\\|?*\0", 10);
  if(std::string::npos!=name.find_first_of(banned))
    return false;
  // No leading period
  return name[0]!='.';
}

// Keep a cache of crypto strong random names
static std::string random_name()
{
  static struct random_names_type
  {
    std::vector<std::string> names;
    size_t idx;
    random_names_type(size_t count) : names(count), idx(0)
    {
      for(size_t n=0; n<count; n++)
        names[n]=afio::utils::random_string(16);  // 128 bits
    }
    std::string get()
    {
      if(idx==names.size())
        idx=0;
      return names[idx++];
    }
  } random_names(10000);
  return random_names.get();
}

data_store::data_store(size_t flags, afio::path path)
{
  // Make a dispatcher for the local filesystem URI, masking out write flags on all operations if not writeable
  _dispatcher=afio::make_dispatcher("file:///", afio::file_flags::none, !(flags & writeable) ? afio::file_flags::write : afio::file_flags::none).get();
  // Set the dispatcher for this thread, and open a handle to the store directory
  afio::current_dispatcher_guard h(_dispatcher);
  _store=afio::dir(std::move(path), afio::file_flags::create);  // throws if there was an error
  // Precalculate the cache of random names
  random_name();
}

shared_future<data_store::istream> data_store::lookup(std::string name) noexcept
{
  if(!is_valid_name(name))
    return error_code(EINVAL, generic_category());
  try
  {
    name.append("/0");
    // Schedule the opening of the file for reading
    afio::future<> h(afio::async_file(_store, name, afio::file_flags::read));
    // When it completes, call this continuation
    return h.then([](afio::future<> &_h) -> shared_future<data_store::istream> {
      // If file didn't open, return the error or exception immediately
      BOOST_OUTCOME_PROPAGATE(_h);
      size_t length=(size_t) _h->lstat(afio::metadata_flags::size).st_size;
      // Is a memory map more appropriate?
      if(length>=128*1024)
      {
        afio::handle::mapped_file_ptr mfp;
        if((mfp=_h->map_file()))
        {
          data_store::istream ret(std::make_shared<idirectstream>(_h.get_handle(), std::move(mfp), length));
          return ret;
        }
      }
      // Schedule the reading of the file into a buffer
      auto buffer=std::make_shared<file_buffer_type>(length);
      afio::future<> h(afio::async_read(_h, buffer->data(), length, 0));
      // When the read completes call this continuation
      return h.then([buffer, length](const afio::future<> &h) -> shared_future<data_store::istream> {
        // If read failed, return the error or exception immediately
        BOOST_OUTCOME_PROPAGATE(h);
        data_store::istream ret(std::make_shared<idirectstream>(h.get_handle(), buffer, length));
        return ret;
      });
    });
  }
  catch(...)
  {
    return std::current_exception();
  }
}

write() is a bit different. We now open the key directory, and while that is happening asynchronously we generate a crypto strong random name which tends to be slow. We then schedule creating that temporary file with the extra flag afio::file_flags::hold_parent_open (which incidentally can dramatically increase AFIO filesystem performance because a number of fast code paths can be taken e.g. fetching most metadata about a file on Windows will actually do a single glob directory enumeration on its parent directory when available which is about 20x faster. The reason it is not enabled by default is because of process file descriptor limits on POSIX, especially the severely low default limit on Apple OS X). In this particular case, we just want our temporary file to retain knowledge of its parent because we will fetch that parent later.

shared_future<data_store::ostream> data_store::write(std::string name) noexcept
{
  if(!is_valid_name(name))
    return error_code(EINVAL, generic_category());
  try
  {
    // Schedule the opening of the directory
    afio::future<> dirh(afio::async_dir(_store, name, afio::file_flags::create));
#ifdef __linux__
    // Flush metadata on Linux only. This will be a noop unless we created a new directory
    // above, and if we don't flush the new key directory it and its contents may not appear
    // in the store directory after a suddenly power loss, even if it and its contents are
    // all on physical storage.
    dirh.then([this](const afio::future<> &h) { async_sync(_store); });
#endif
    // Make a crypto strong random file name
    std::string randomname("tmp");
    randomname.append(random_name());
    // Schedule the opening of the file for writing
    afio::future<> h(afio::async_file(dirh, randomname, afio::file_flags::create | afio::file_flags::write
      | afio::file_flags::hold_parent_open    // handle should keep a handle_ptr to its parent dir
      /*| afio::file_flags::always_sync*/     // writes don't complete until upon physical storage
      ));
    // When it completes, call this continuation
    return h.then([](const afio::future<> &h) -> shared_future<data_store::ostream> {
      // If file didn't open, return the error or exception immediately
      BOOST_OUTCOME_PROPAGATE(h);
      // Create an ostream which directly uses the file.
      data_store::ostream ret(std::make_shared<odirectstream>(h.get_handle()));
      return std::move(ret);
    });
  }
  catch (...)
  {
    return std::current_exception();
  }
}

Finally the input and output streams. The input stream is unchanged, whilst the output stream destructor now simply atomically renames the temp file to "0" using the parent directory handle as the base for the rename target in order to ensure race freedom.

namespace asio = BOOST_AFIO_V2_NAMESPACE::asio;
using BOOST_OUTCOME_V1_NAMESPACE::empty;
using BOOST_AFIO_V2_NAMESPACE::error_code;
using BOOST_AFIO_V2_NAMESPACE::generic_category;

// A special allocator of highly efficient file i/o memory
using file_buffer_type = std::vector<char, afio::utils::page_allocator<char>>;

// An iostream which reads directly from a memory mapped AFIO file
struct idirectstream : public std::istream
{
  struct directstreambuf : public std::streambuf
  {
    afio::handle_ptr h;  // Holds the file open
    std::shared_ptr<file_buffer_type> buffer;
    afio::handle::mapped_file_ptr mfp;
    // From a mmap
    directstreambuf(afio::handle_ptr _h, afio::handle::mapped_file_ptr _mfp, size_t length) : h(std::move(_h)), mfp(std::move(_mfp))
    {
      // Set the get buffer this streambuf is to use
      setg((char *) mfp->addr, (char *) mfp->addr, (char *) mfp->addr + length);
    }
    // From a malloc
    directstreambuf(afio::handle_ptr _h, std::shared_ptr<file_buffer_type> _buffer, size_t length) : h(std::move(_h)), buffer(std::move(_buffer))
    {
      // Set the get buffer this streambuf is to use
      setg(buffer->data(), buffer->data(), buffer->data() + length);
    }
  };
  std::unique_ptr<directstreambuf> buf;
  template<class U> idirectstream(afio::handle_ptr h, U &&buffer, size_t length) : std::istream(new directstreambuf(std::move(h), std::forward<U>(buffer), length)), buf(static_cast<directstreambuf *>(rdbuf()))
  {
  }
  virtual ~idirectstream() override
  {
    // Reset the stream before deleting the buffer
    rdbuf(nullptr);
  }
};

// An iostream which writes to an AFIO file in 4Kb pages
struct odirectstream : public std::ostream
{
  struct directstreambuf : public std::streambuf
  {
    using int_type = std::streambuf::int_type;
    using traits_type = std::streambuf::traits_type;
    afio::future<> lastwrite; // the last async write performed
    afio::off_t offset;       // offset of next write
    file_buffer_type buffer;  // a page size on this machine
    file_buffer_type lastbuffer;
    directstreambuf(afio::handle_ptr _h) : lastwrite(std::move(_h)), offset(0), buffer(afio::utils::page_sizes().front())
    {
      // Set the put buffer this streambuf is to use
      setp(buffer.data(), buffer.data() + buffer.size());
    }
    virtual ~directstreambuf() override
    {
      try
      {
        // Flush buffers and wait until last write completes
        // Schedule an asynchronous write of the buffer to storage
        size_t thisbuffer = pptr() - pbase();
        if(thisbuffer)
          lastwrite = afio::async_write(afio::async_truncate(lastwrite, offset+thisbuffer), buffer.data(), thisbuffer, offset);
        lastwrite.get();
        // TODO: On Windows do I need to close and reopen the file to flush metadata before
        //       the rename or does the rename do it for me?
        // Get handle to the parent directory
        auto dirh(lastwrite->container());
        // Atomically rename "tmpXXXXXXXXXXXXXXXX" to "0"
        lastwrite->atomic_relink(afio::path_req::relative(dirh, "0"));
#ifdef __linux__
        // Journalled Linux filing systems don't need this, but if you enabled afio::file_flags::always_sync
        // you might want to issue this too.
        afio::sync(dirh);
#endif
      }
      catch(...)
      {
      }
    }
    virtual int_type overflow(int_type c) override
    {
      size_t thisbuffer=pptr()-pbase();
      if(thisbuffer>=buffer.size())
        sync();
      if(c!=traits_type::eof())
      {
        *pptr()=(char)c;
        pbump(1);
        return traits_type::to_int_type(c);
      }
      return traits_type::eof();
    }
    virtual int sync() override
    {
      // Wait for the last write to complete, propagating any exceptions
      lastwrite.get();
      size_t thisbuffer=pptr()-pbase();
      if(thisbuffer > 0)
      {
        // Detach the current buffer and replace with a fresh one to allow the kernel to steal the page
        lastbuffer=std::move(buffer);
        buffer.resize(lastbuffer.size());
        setp(buffer.data(), buffer.data() + buffer.size());
        // Schedule an extension of physical storage by an extra page
        lastwrite = afio::async_truncate(lastwrite, offset + thisbuffer);
        // Schedule an asynchronous write of the buffer to storage
        lastwrite=afio::async_write(lastwrite, lastbuffer.data(), thisbuffer, offset);
        offset+=thisbuffer;
      }
      return 0;
    }
  };
  std::unique_ptr<directstreambuf> buf;
  odirectstream(afio::handle_ptr h) : std::ostream(new directstreambuf(std::move(h))), buf(static_cast<directstreambuf *>(rdbuf()))
  {
  }
  virtual ~odirectstream() override
  {
    // Reset the stream before deleting the buffer
    rdbuf(nullptr);
  }
};

Obviously enough we don't have any garbage collection in here for failed or aborted writes. Thanks to the unique naming of those files, these are very easy to spot during a GC pass and if they had a last modified date several days ago they would ideal for purging. Implementing one of those is easy using async_enumerate(), and is left as an exercise for the reader.

Table 1.7. This second generation solution will perform reasonably well under these conditions:

Condition

On Microsoft Windows you can place the store deep in a directory hierarchy and use long key names.

Third party threads and processes can rename the location of the store during use.

The size of all the values being read at any given time fits into your virtual address space (which is at least 2Gb on 32 bit, 8Tb on 64 bit).

As many processes and threads may read and write to the store concurrently as you like, including any mix of CIFS and NFS clients.

Processes may unexpectedly exit during modifies with no consequence on consistency.

Maximum performance isn't important to you.

Sudden power loss may not maintain original write ordering across multiple key-value updates, however each key will have some complete value which it had at some point in history.

You don't need to update more than one key-value at once.




[6] This operation is not possible on Microsoft Windows using the Win32 API — nowhere in the Win32 API is this operation made available. However the NT kernel API does provide this operation, hence AFIO can provide POSIX atomic relink guarantees on Windows.

[7] This may not be true on older NFS mounts, or NFS mounts with the wrong configuration. See the NFS documentation. It is also possible to configure Samba in a way which breaks the atomicity of atomic renaming, see the Samba documentation.

[8] This is as yet untested by empirical testing, however AFIO does no magic tricks, it just thinly wraps the operating system APIs.


PrevUpHomeNext