Boost C++ Libraries Home Libraries People FAQ More

PrevUpHomeNext

Achieving atomicity on the filing system

TripleGit, the forthcoming reliable graph database store, ideally needs to allow multiple processes to concurrently read and write the graph store without mutual exclusion where possible. Each collection of hashes which form the tree which makes up some given version of the graph is itself a hashed object in the content addressable store, and you can have multiple named graphs in the graph store which may or may not share nodes. One problem as with all databases is how to efficiently issue an atomic transaction which updates multiple graphs simultaneously and atomically when there is the potential of concurrent writers also trying to issue write transactions which may, or may not, cause conflict with other transactions.

The really naive solution is to keep a single lock file which is created using O_EXCL i.e. fail if you didn't create the file for the entire graph store. This serialises all transactions, and therefore eliminates any problems with updates clashing. This is usefully well supported by all operating systems, and by NFS and Samba.

A less naive solution is to keep one lock file per graph, and to use a multiple lock and backoff strategy to lock the requisite number of graphs for some given transaction. The big problem with this approach is that you are unfairly penalised for especially large multi-graph transactions over others with smaller transactions as lock complexity is worse than linear. Nevertheless performance is actually not bad: these are results for my 3.9Ghz i7-3770K workstation using AFIO to implement the lock file with various numbers of concurrent writers (note that Windows provides magic flags for telling it about lock files, if not used expect a 40% performance reduction):

Table 1.11. Lock file performance on various operating systems:

writers

lock files

Win8.1 x64 NTFS HD

Win8.1 x64 NTFS SSD

Linux x64 ext4 SSD

FreeBSD 10.1 ZFS SSD

1

1

2468

2295

3590

9922

2

1

2507

2385

3583

9903

4

1

1966

2161

3664

9684

8

1

1400

1851

3703

6537

16

1

742

602

3833

1251

1

8

826

888

1378

2455

2

8

508

637

576

917

4

8

67

167

417

63

8

8

37

117

106

0.55

16

8

33

77

26

0.5


As you can see, Linux does a very good job of O(1) to waiters complexity, but performance on Windows and FreeBSD collapses once you exceed CPU cores. Also, Windows is sensitive to device block size — the hard drive outperforms the SSD because it has 512 byte sectors and the SSD has 4096 byte sectors. I suspect that internally Windows memcpy()'s in device native sector sizes, or is actually sending data to physical storage despite us marking this file as temporary. One very striking observation is how FreeBSD is a full 2.76x the performance of Linux and 4.32x that of Windows.

A much more intelligent way of solving this problem is to figure out which graphs are common across each of the transactions currently pending, and to strictly order the transactions in the sequence which maximises throughput without updates clashing. One way of many distributed writers constructing a shared graph of dependencies is to append messages into a shared file, and then one can deploy a distributed mutual exclusion algorithm of which those by Suzuki and Kasami, Maekawa and Ricart and Agrawala are the most famous. This requires the ability to atomically append to the shared file, something guaranteed on all operating systems, but unfortunately not guaranteed by NFS nor Samba (though when combined with advisory file locking those do work as expected, albeit with poor performance). This means that on an all-Windows setup, or if on POSIX and not using NFS nor Samba, the atomic append method could be valuable, especially as the cost of locking multiple actors is pretty much the same as locking a single actor so you get eight graphs locked for the same cost as locking one.

Table 1.12. Atomic append lock performance on various operating systems:

writers

lock files

Win8.1 x64 NTFS HD

Win8.1 x64 NTFS SSD

Linux x64 ext4 SSD

FreeBSD 10.1 ZFS SSD

1

1

2592

2875

1198

29

2

1

1284

2565

1344

25

4

1

1420

2384

1327

35

8

1

1262

1764

1254

55

16

1

428

520

1260

37


Linux once against does a great job of O(1) to waiters complexity, but at the third of the speed of a simple lock file and up to half the speed of Windows. Windows does better than Linux here especially on SSDs where it is faster than a simple lock file, but doesn't scale to waiters once they pass CPU core count. FreeBSD is two orders of magnitude slower which is because ZFS checksums and copy on writes file changes, so every time we append 16 bytes we are forcing a full copy of the 128Kb extent to be issued. It would appear that ZFS syncs its internal buffers when a different file descriptor atomic appends to the same file — this has the above pathological performance outcome unfortunately.

This introduces the final potential solution which is that of the quagmire of advisory file locking. This is an area where Windows and POSIX diverge very significantly, and the interactions between Windows and POSIX when Windows locks regions in a file on a Samba share on a POSIX machine or when POSIX does byte range locking at all (there is a very fun stanza in the POSIX standard which basically releases all your locks on first file descriptor close) are full of quirks, races and other nasties. For this reason you should avoid the very temporary and experimental code currently in AFIO which implements Samba and NFS safe file range locking where theoretically both Windows and POSIX code can safely lock ranges in files concurrently, those APIs are not documented for good reason! Still, performance with these — despite the hoop jumping inefficiencies AFIO leaps through to implement relatively sane semantics — is impressive.

Table 1.13. Advisory lock file performance on various operating systems:

writers

lock files

Win8.1 x64 NTFS HD

Win8.1 x64 NTFS SSD

Linux x64 ext4 SSD

FreeBSD 10.1 ZFS SSD

1

1

5799

5166

3466

21536

2

1

5788

6656

2215

11654

4

1

5775

7020

1073

5384

8

1

5773

6738

518

2584

16

1

5695

5617

360

1326


Fascinatingly the tables suddenly switch here: Windows sees O(1) to waiters complexity, whilst Linux and FreeBSD sees a mostly O(N) to waiters complexity drop in performance. FreeBSD, as with the simple lock file performance, blows all others out of the water again in advisory lock performance too. I should add here that because POSIX advisory locks are per process, the Linux and FreeBSD benchmarks were generated by running N copies of the benchmark program whereas the NT kernel inherits the really excellent and well thought through file byte range locking model of DEC VMS and treats locks as effectively reference counted byte ranges, and therefore works as expected from a single process. I have yet to add process-local byte range locking to simulate sane range locking for POSIX, so expect the numbers above to worsen.

After all of that, we are left with this locking strategy matrix for TripleGit:

Table 1.14. Fastest file system locking strategy for various operating systems:

Operating system

Best locking policy

Win8.1 x64 NTFS

Advisory locks fastest, then atomic append locks, finally lock files

Linux x64 ext4

Lock files fastest, then advisory locks, finally atomic append locks

FreeBSD 10.1 ZFS

Advisory locks fastest, then lock files, avoid atomic append locks at all costs


I should emphasise once again that the advisory locking code is riddled with bugs and you should not use it in your code at this time. Once I have a CI testing all possible combinations of locking and nothing is erroring out I'll release that code for production use, probably in v1.4.

All these benchmarks came from this benchmarking program I wrote using AFIO which illustrates how you might implement the techniques used above:

int main(int argc, const char *argv[])
{
  using namespace BOOST_AFIO_V2_NAMESPACE;
  using BOOST_AFIO_V2_NAMESPACE::off_t;
  typedef chrono::duration<double, ratio<1, 1>> secs_type;
  double traditional_locks = 0, atomic_log_locks = 0;
  try
  {
    filesystem::remove_all("testdir");
  }
  catch(...)
  {
  }

  size_t totalwriters = 2, writers = totalwriters;
  if(argc > 1)
    writers = totalwriters = atoi(argv[1]);
  {
    auto dispatcher = make_dispatcher().get();
    auto mkdir(dispatcher->dir(path_req("testdir", file_flags::create)));
    auto mkdir1(dispatcher->dir(path_req::relative(mkdir, "1", file_flags::create)));
    auto mkdir2(dispatcher->dir(path_req::relative(mkdir, "2", file_flags::create)));
    auto mkdir3(dispatcher->dir(path_req::relative(mkdir, "3", file_flags::create)));
    auto mkdir4(dispatcher->dir(path_req::relative(mkdir, "4", file_flags::create)));
    auto mkdir5(dispatcher->dir(path_req::relative(mkdir, "5", file_flags::create)));
    auto mkdir6(dispatcher->dir(path_req::relative(mkdir, "6", file_flags::create)));
    auto mkdir7(dispatcher->dir(path_req::relative(mkdir, "7", file_flags::create)));
    auto mkdir8(dispatcher->dir(path_req::relative(mkdir, "8", file_flags::create)));
    auto statfs_(dispatcher->statfs(mkdir, fs_metadata_flags::All));
    auto statfs(statfs_.get());
    std::cout << "The filing system holding our test directory is "
              << statfs.f_fstypename << " and has features:" << std::endl;
#define PRINT_FIELD(field, ...)                                                     \
  std::cout << "  f_flags." #field ": ";                                            \
  std::cout << statfs.f_flags.field __VA_ARGS__ << std::endl
    PRINT_FIELD(rdonly);
    PRINT_FIELD(noexec);
    PRINT_FIELD(nosuid);
    PRINT_FIELD(acls);
    PRINT_FIELD(xattr);
    PRINT_FIELD(compression);
    PRINT_FIELD(extents);
    PRINT_FIELD(filecompression);
#undef PRINT_FIELD
#define PRINT_FIELD(field, ...)                                                     \
  std::cout << "  f_" #field ": ";                                                  \
  std::cout << statfs.f_##field __VA_ARGS__ << std::endl
    PRINT_FIELD(bsize);
    PRINT_FIELD(iosize);
    PRINT_FIELD(blocks, << " (" << (statfs.f_blocks * statfs.f_bsize / 1024.0 /
                                    1024.0 / 1024.0) << " Gb)");
    PRINT_FIELD(bfree, << " (" << (statfs.f_bfree * statfs.f_bsize / 1024.0 /
                                   1024.0 / 1024.0) << " Gb)");
    PRINT_FIELD(bavail, << " (" << (statfs.f_bavail * statfs.f_bsize / 1024.0 /
                                    1024.0 / 1024.0) << " Gb)");
#undef PRINT_FIELD
  }
  if(1)
  {
    std::cout << "\nBenchmarking a single traditional lock file with " << writers
              << " concurrent writers ...\n";
    std::vector<thread> threads;
    atomic<bool> done(true);
    atomic<size_t> attempts(0), successes(0);
    for(size_t n = 0; n < writers; n++)
    {
      threads.push_back(thread(
      [&done, &attempts, &successes, n]
      {
        try
        {
          // Create a dispatcher
          auto dispatcher = make_dispatcher().get();
          // Schedule opening the log file for writing log entries
          auto logfile(dispatcher->file(
          path_req("testdir/log", file_flags::create | file_flags::read_write)));
          // Retrieve any errors which occurred
          logfile.get();
          // Wait until all threads are ready
          while(done)
          {
            this_thread::yield();
          }
          while(!done)
          {
            // Traditional file locks are very simple: try to exclusively create the
            // lock file.
            // If you succeed, you have the lock.
            auto lockfile(dispatcher->file(
            path_req("testdir/log.lock",
                     file_flags::create_only_if_not_exist | file_flags::write |
                     file_flags::temporary_file | file_flags::delete_on_close)));
            attempts.fetch_add(1, memory_order_relaxed);
            // v1.4 of the AFIO engine will return error_code instead of exceptions
            // for this
            try
            {
              lockfile.get();
            }
            catch(const system_error &e)
            {
              continue;
            }
            std::string logentry("I am log writer "), mythreadid(to_string(n)),
            logentryend("!\n");
            // Fetch the size
            off_t where = logfile->lstat().st_size,
                  entrysize =
                  logentry.size() + mythreadid.size() + logentryend.size();
            // Schedule extending the log file
            auto extendlog(dispatcher->truncate(logfile, where + entrysize));
            // Schedule writing the new entry
            auto writetolog(dispatcher->write(
            make_io_req(extendlog, {logentry, mythreadid, logentryend}, where)));
            writetolog.get();
            extendlog.get();
            successes.fetch_add(1, memory_order_relaxed);
          }
        }
        catch(const system_error &e)
        {
          std::cerr << "ERROR: test exits via system_error code " << e.code().value()
                    << "(" << e.what() << ")" << std::endl;
          abort();
        }
        catch(const std::exception &e)
        {
          std::cerr << "ERROR: test exits via exception (" << e.what() << ")"
                    << std::endl;
          abort();
        }
        catch(...)
        {
          std::cerr << "ERROR: test exits via unknown exception" << std::endl;
          abort();
        }
      }));
    }
    auto begin = chrono::high_resolution_clock::now();
    done = false;
    std::this_thread::sleep_for(std::chrono::seconds(20));
    done = true;
    std::cout << "Waiting for threads to exit ..." << std::endl;
    for(auto &i : threads)
      i.join();
    auto end = chrono::high_resolution_clock::now();
    auto diff = chrono::duration_cast<secs_type>(end - begin);
    std::cout << "For " << writers << " concurrent writers, achieved "
              << (attempts / diff.count()) << " attempts per second with a "
                                              "success rate of "
              << (successes / diff.count()) << " writes per second which is a "
              << (100.0 * successes / attempts) << "% success rate." << std::endl;
    traditional_locks = successes / diff.count();
  }

  if(1)
  {
    std::cout << "\nBenchmarking eight traditional lock files with " << writers
              << " concurrent writers ...\n";
    std::vector<thread> threads;
    atomic<bool> done(true);
    atomic<size_t> attempts(0), successes(0);
    for(size_t n = 0; n < writers; n++)
    {
      threads.push_back(thread(
      [&done, &attempts, &successes, n]
      {
        try
        {
          // Create a dispatcher
          auto dispatcher = make_dispatcher().get();
          // Schedule opening the log file for writing log entries
          auto logfile(dispatcher->file(
          path_req("testdir/log", file_flags::create | file_flags::read_write)));
          // Retrieve any errors which occurred
          logfile.get();
          // Wait until all threads are ready
          while(done)
          {
            this_thread::yield();
          }
          while(!done)
          {
            // Parallel try to exclusively create all eight lock files
            std::vector<path_req> lockfiles;
            lockfiles.reserve(8);
            lockfiles.push_back(
            path_req("testdir/1/log.lock",
                     file_flags::create_only_if_not_exist | file_flags::write |
                     file_flags::temporary_file | file_flags::delete_on_close));
            lockfiles.push_back(
            path_req("testdir/2/log.lock",
                     file_flags::create_only_if_not_exist | file_flags::write |
                     file_flags::temporary_file | file_flags::delete_on_close));
            lockfiles.push_back(
            path_req("testdir/3/log.lock",
                     file_flags::create_only_if_not_exist | file_flags::write |
                     file_flags::temporary_file | file_flags::delete_on_close));
            lockfiles.push_back(
            path_req("testdir/4/log.lock",
                     file_flags::create_only_if_not_exist | file_flags::write |
                     file_flags::temporary_file | file_flags::delete_on_close));
            lockfiles.push_back(
            path_req("testdir/5/log.lock",
                     file_flags::create_only_if_not_exist | file_flags::write |
                     file_flags::temporary_file | file_flags::delete_on_close));
            lockfiles.push_back(
            path_req("testdir/6/log.lock",
                     file_flags::create_only_if_not_exist | file_flags::write |
                     file_flags::temporary_file | file_flags::delete_on_close));
            lockfiles.push_back(
            path_req("testdir/7/log.lock",
                     file_flags::create_only_if_not_exist | file_flags::write |
                     file_flags::temporary_file | file_flags::delete_on_close));
            lockfiles.push_back(
            path_req("testdir/8/log.lock",
                     file_flags::create_only_if_not_exist | file_flags::write |
                     file_flags::temporary_file | file_flags::delete_on_close));
            auto lockfile(dispatcher->file(lockfiles));
            attempts.fetch_add(1, memory_order_relaxed);
#if 1
            // v1.4 of the AFIO engine will return error_code instead of exceptions
            // for this
            try
            {
              lockfile[7].get();
            }
            catch(const system_error &e)
            {
              continue;
            }
            try
            {
              lockfile[6].get();
            }
            catch(const system_error &e)
            {
              continue;
            }
            try
            {
              lockfile[5].get();
            }
            catch(const system_error &e)
            {
              continue;
            }
            try
            {
              lockfile[4].get();
            }
            catch(const system_error &e)
            {
              continue;
            }
            try
            {
              lockfile[3].get();
            }
            catch(const system_error &e)
            {
              continue;
            }
            try
            {
              lockfile[2].get();
            }
            catch(const system_error &e)
            {
              continue;
            }
            try
            {
              lockfile[1].get();
            }
            catch(const system_error &e)
            {
              continue;
            }
            try
            {
              lockfile[0].get();
            }
            catch(const system_error &e)
            {
              continue;
            }
#else  /*_ 1 _*/
            try
            {
              auto barrier(dispatcher->barrier(lockfile));
              // v1.4 of the AFIO engine will return error_code instead of exceptions
              // for this
              for(size_t n = 0; n < 8; n++)
                barrier[n].get();
            }
            catch(const system_error &e)
            {
              continue;
            }
#endif  /*_ 1 _*/
            std::string logentry("I am log writer "), mythreadid(to_string(n)),
            logentryend("!\n");
            // Fetch the size
            off_t where = logfile->lstat().st_size,
                  entrysize =
                  logentry.size() + mythreadid.size() + logentryend.size();
            // Schedule extending the log file
            auto extendlog(dispatcher->truncate(logfile, where + entrysize));
            // Schedule writing the new entry
            auto writetolog(dispatcher->write(
            make_io_req(extendlog, {logentry, mythreadid, logentryend}, where)));
            // Fetch errors from the last operation first to avoid sleep-wake cycling
            writetolog.get();
            extendlog.get();
            successes.fetch_add(1, memory_order_relaxed);
          }
        }
        catch(const system_error &e)
        {
          std::cerr << "ERROR: test exits via system_error code " << e.code().value()
                    << "(" << e.what() << ")" << std::endl;
          abort();
        }
        catch(const std::exception &e)
        {
          std::cerr << "ERROR: test exits via exception (" << e.what() << ")"
                    << std::endl;
          abort();
        }
        catch(...)
        {
          std::cerr << "ERROR: test exits via unknown exception" << std::endl;
          abort();
        }
      }));
    }
    auto begin = chrono::high_resolution_clock::now();
    done = false;
    std::this_thread::sleep_for(std::chrono::seconds(20));
    done = true;
    std::cout << "Waiting for threads to exit ..." << std::endl;
    for(auto &i : threads)
      i.join();
    auto end = chrono::high_resolution_clock::now();
    auto diff = chrono::duration_cast<secs_type>(end - begin);
    std::cout << "For " << writers << " concurrent writers, achieved "
              << (attempts / diff.count()) << " attempts per second with a "
                                              "success rate of "
              << (successes / diff.count()) << " writes per second which is a "
              << (100.0 * successes / attempts) << "% success rate." << std::endl;
  }

  // **** WARNING UNSUPPORTED UNDOCUMENTED API DO NOT USE IN YOUR CODE ****
  if(1)
  {
    std::cout << "\nBenchmarking a ranged file lock with " << writers
              << " concurrent writers ...\n";
    std::vector<thread> threads;
    atomic<bool> done(true);
    atomic<size_t> attempts(0), successes(0);
    for(size_t n = 0; n < writers; n++)
    {
      threads.push_back(thread(
      [&done, &attempts, &successes, n]
      {
        try
        {
          // Create a dispatcher
          auto dispatcher = make_dispatcher().get();
          // Schedule opening the log file for writing log entries
          auto logfile(dispatcher->file(
          path_req("testdir/log", file_flags::create | file_flags::read_write |
                                  file_flags::os_lockable)));
          // Retrieve any errors which occurred
          logfile.get();
          // Wait until all threads are ready
          while(done)
          {
            this_thread::yield();
          }
          while(!done)
          {
            attempts.fetch_add(1, memory_order_relaxed);
            // **** WARNING UNSUPPORTED UNDOCUMENTED API DO NOT USE IN YOUR CODE ****
            dispatcher->lock({logfile}).front().get();
            std::string logentry("I am log writer "), mythreadid(to_string(n)),
            logentryend("!\n");
            // Fetch the size
            off_t where = logfile->lstat().st_size,
                  entrysize =
                  logentry.size() + mythreadid.size() + logentryend.size();
            // Schedule extending the log file
            auto extendlog(dispatcher->truncate(logfile, where + entrysize));
            // Schedule writing the new entry
            auto writetolog(dispatcher->write(
            make_io_req(extendlog, {logentry, mythreadid, logentryend}, where)));
            writetolog.get();
            extendlog.get();
            successes.fetch_add(1, memory_order_relaxed);
            dispatcher->lock({{logfile, nullptr}}).front().get();
          }
        }
        catch(const system_error &e)
        {
          std::cerr << "ERROR: test exits via system_error code " << e.code().value()
                    << "(" << e.what() << ")" << std::endl;
          abort();
        }
        catch(const std::exception &e)
        {
          std::cerr << "ERROR: test exits via exception (" << e.what() << ")"
                    << std::endl;
          abort();
        }
        catch(...)
        {
          std::cerr << "ERROR: test exits via unknown exception" << std::endl;
          abort();
        }
      }));
    }
    auto begin = chrono::high_resolution_clock::now();
    done = false;
    std::this_thread::sleep_for(std::chrono::seconds(20));
    done = true;
    std::cout << "Waiting for threads to exit ..." << std::endl;
    for(auto &i : threads)
      i.join();
    auto end = chrono::high_resolution_clock::now();
    auto diff = chrono::duration_cast<secs_type>(end - begin);
    std::cout << "For " << writers << " concurrent writers, achieved "
              << (attempts / diff.count()) << " attempts per second with a "
                                              "success rate of "
              << (successes / diff.count()) << " writes per second which is a "
              << (100.0 * successes / attempts) << "% success rate." << std::endl;
  }

  if(1)
  {
    std::cout << "\nBenchmarking file locks via atomic append with " << writers
              << " concurrent writers ...\n";
    std::vector<thread> threads;
    atomic<bool> done(true);
    atomic<size_t> attempts(0), successes(0);
    for(size_t thread = 0; thread < writers; thread++)
    {
      threads.push_back(std::thread(
      [&done, &attempts, &successes, thread]
      {
        try
        {
          // Create a dispatcher
          auto dispatcher = make_dispatcher().get();
          // Schedule opening the log file for writing log entries
          auto logfile(dispatcher->file(
          path_req("testdir/log", file_flags::create | file_flags::read_write)));
          // Schedule opening the lock file for scanning and hole punching
          auto lockfilez(dispatcher->file(path_req(
          "testdir/log.lock", file_flags::create | file_flags::read_write)));
          // Schedule opening the lock file for atomic appending
          auto lockfilea(dispatcher->file(
          path_req("testdir/log.lock",
                   file_flags::create | file_flags::write | file_flags::append)));
          // Retrieve any errors which occurred
          lockfilea.get();
          lockfilez.get();
          logfile.get();
          while(!done)
          {
            // Each lock log entry is 16 bytes in length.
            enum class message_code_t : uint8_t
            {
              unlock = 0,
              havelock = 1,
              rescind = 2,
              interest = 3,
              nominate = 5
            };
#pragma pack(push, 1)
            union message_t
            {
              char bytes[16];
              struct
              {
                message_code_t code;
                char __padding1[3];
                uint32_t timestamp;  // time_t
                uint64_t uniqueid;
              };
            };
#pragma pack(pop)
            static_assert(sizeof(message_t) == 16,
                          "message_t is not 16 bytes long!");
            auto gettime = []
            {
              return (uint32_t)(std::time(nullptr) - 1420070400UL /* 1st Jan 2015*/);
            };
            message_t temp, buffers[256];
            off_t buffersoffset;
            uint32_t nowtimestamp = gettime();
            // TODO FIXME: If multiple machines are all accessing the lock file,
            // nowtimestamp
            //             ought to be corrected for drift

            // Step 1: Register my interest
            memset(temp.bytes, 0, sizeof(temp));
            temp.code = message_code_t::interest;
            temp.timestamp = nowtimestamp;
            temp.uniqueid = thread;  // TODO FIXME: Needs to be a VERY random number
                                     // to prevent collision.
            dispatcher->write(make_io_req(lockfilea, temp.bytes, sizeof(temp), 0))
            .get();

            // Step 2: Wait until my interest message appears, also figure out what
            // interests precede
            //         mine and where my start of interest begins, and if someone
            //         currently has the lock
            off_t startofinterest =
            dispatcher->extents(lockfilez).get().front().first;
            off_t myuniqueid = (off_t) -1;
            bool findPreceding = true;
            std::vector<std::pair<bool, off_t>> preceding;
            std::pair<bool, off_t> lockid;
            auto iterate = [&]
            {
              size_t validPrecedingCount = 0;
              off_t lockfilesize = lockfilez->lstat(metadata_flags::size).st_size;
              buffersoffset =
              lockfilesize > sizeof(buffers) ? lockfilesize - sizeof(buffers) : 0;
              // buffersoffset-=buffersoffset % sizeof(buffers[0]);
              for(; !validPrecedingCount && buffersoffset >= startofinterest &&
                    buffersoffset < lockfilesize;
                  buffersoffset -= sizeof(buffers))
              {
                size_t amount = (size_t)(lockfilesize - buffersoffset);
                if(amount > sizeof(buffers))
                  amount = sizeof(buffers);
                dispatcher->read(make_io_req(lockfilez, (void *) buffers, amount,
                                             buffersoffset)).get();
                for(size_t n = amount / sizeof(buffers[0]) - 1;
                    !validPrecedingCount && n < amount / sizeof(buffers[0]); n--)
                {
                  // Early exit if messages have become stale
                  if(!buffers[n].timestamp ||
                     (buffers[n].timestamp < nowtimestamp &&
                      nowtimestamp - buffers[n].timestamp > 20))
                  {
                    startofinterest = buffersoffset + n * sizeof(buffers[0]);
                    break;
                  }
                  // Find if he is locked or unlocked
                  if(lockid.second == (off_t) -1)
                  {
                    if(buffers[n].code == message_code_t::unlock)
                      lockid = std::make_pair(false, buffers[n].uniqueid);
                    else if(buffers[n].code == message_code_t::havelock)
                      lockid = std::make_pair(true, buffers[n].uniqueid);
                  }
                  // Am I searching for my interest?
                  if(myuniqueid == (off_t) -1)
                  {
                    if(!memcmp(buffers + n, &temp, sizeof(temp)))
                      myuniqueid = buffersoffset + n * sizeof(buffers[0]);
                  }
                  else if(findPreceding &&
                          (buffers[n].uniqueid < myuniqueid ||
                           buffersoffset + n * sizeof(buffers[0]) < myuniqueid))
                  {
                    // We are searching for preceding claims now
                    if(buffers[n].code == message_code_t::rescind ||
                       buffers[n].code == message_code_t::unlock)
                      preceding.push_back(
                      std::make_pair(false, buffers[n].uniqueid));
                    else if(buffers[n].code == message_code_t::nominate ||
                            buffers[n].code == message_code_t::havelock)
                    {
                      if(buffers[n].uniqueid < myuniqueid &&
                         preceding.end() ==
                         std::find(
                         preceding.begin(), preceding.end(),
                         std::make_pair(false, (off_t) buffers[n].uniqueid)))
                      {
                        preceding.push_back(
                        std::make_pair(true, buffers[n].uniqueid));
                        validPrecedingCount++;
                      }
                    }
                    else if(buffers[n].code == message_code_t::interest)
                    {
                      if(buffersoffset + n * sizeof(buffers[0]) < myuniqueid &&
                         preceding.end() ==
                         std::find(preceding.begin(), preceding.end(),
                                   std::make_pair(false, buffersoffset +
                                                         n * sizeof(buffers[0]))))
                      {
                        preceding.push_back(std::make_pair(
                        true, buffersoffset + n * sizeof(buffers[0])));
                        validPrecedingCount++;
                      }
                    }
                  }
                }
              }
#if 0
                std::cout << thread << ": myuniqueid=" << myuniqueid << " startofinterest=" << startofinterest << " size=" << lockfilez->lstat(metadata_flags::size).st_size << " lockid=" << lockid.first << "," << lockid.second << " preceding=";
                for(auto &i : preceding)
                  std::cout << i.first << "," << i.second << ";";
                std::cout << std::endl;
#endif  /*_ 0 _*/
              if(findPreceding)
              {
                // Remove all rescinded interests preceding ours
                preceding.erase(std::remove_if(preceding.begin(), preceding.end(),
                                               [](const std::pair<bool, off_t> &i)
                                               {
                                                 return !i.first;
                                               }),
                                preceding.end());
                std::sort(preceding.begin(), preceding.end());
                findPreceding = false;
              }
            };
            do
            {
              lockid = std::make_pair(false, (off_t) -1);
              iterate();
              // Didn't find it, so sleep and retry, maybe st_size will have updated
              // by then
              if(myuniqueid == (off_t) -1)
                this_thread::sleep_for(chrono::milliseconds(1));
            } while(myuniqueid == (off_t) -1);

            // Step 3: If there is no lock and no interest precedes mine, claim the
            // mutex. Else issue a nominate for myself,
            //         once per ten seconds.
            {
              nowtimestamp = 0;
              mutex m;
              condition_variable c;
              unique_lock<decltype(m)> l(m);
              atomic<bool> fileChanged(false);
              for(;;)
              {
                attempts.fetch_add(1, memory_order_relaxed);
                temp.timestamp = gettime();
                temp.uniqueid = myuniqueid;
                if(preceding.empty())
                {
                  temp.code = message_code_t::havelock;
                  dispatcher->write(make_io_req(lockfilea, temp.bytes, sizeof(temp),
                                                0)).get();
                  // Zero the range between startofinterest and myuniqueid
                  if(startofinterest < myuniqueid)
                  {
                    std::vector<std::pair<off_t, off_t>> range = {
                    {startofinterest, myuniqueid - startofinterest}};
                    dispatcher->zero(lockfilez, range).get();
                    //                    std::cout << thread << ": lock taken for
                    //                    myuniqueid=" << myuniqueid << ", zeroing "
                    //                    << range.front().first << ", " <<
                    //                    range.front().second << std::endl;
                  }
                  break;
                }
                else
                {
                  auto lockfilechanged = [&]
                  {
                    fileChanged = true;
                    c.notify_all();
                  };
                  // TODO FIXME: Put a modify watch on the lockfile instead of
                  // spinning
                  if(temp.timestamp - nowtimestamp >= 10)
                  {
                    temp.code = message_code_t::nominate;
                    dispatcher->write(make_io_req(lockfilea, temp.bytes,
                                                  sizeof(temp), 0)).get();
                    nowtimestamp = temp.timestamp;
                  }
                  // c.wait_for(l, chrono::milliseconds(1), [&fileChanged]{ return
                  // fileChanged==true; });
                  fileChanged = false;
                  preceding.clear();
                  findPreceding = true;
                  lockid = std::make_pair(false, (off_t) -1);
                  iterate();
                }
              }
            }

            // Step 4: I now have the lock, so do my thing
            std::string logentry("I am log writer "), mythreadid(to_string(thread)),
            logentryend("!\n");
            // Fetch the size
            off_t where = logfile->lstat().st_size,
                  entrysize =
                  logentry.size() + mythreadid.size() + logentryend.size();
            // Schedule extending the log file
            auto extendlog(dispatcher->truncate(logfile, where + entrysize));
            // Schedule writing the new entry
            auto writetolog(dispatcher->write(
            make_io_req(extendlog, {logentry, mythreadid, logentryend}, where)));
            // Fetch errors from the last operation first to avoid sleep-wake cycling
            writetolog.get();
            extendlog.get();
            successes.fetch_add(1, memory_order_relaxed);
            //              std::cout << thread << ": doing work for myuniqueid=" <<
            //              myuniqueid << std::endl;
            //              this_thread::sleep_for(chrono::milliseconds(250));

            // Step 5: Release the lock
            temp.code = message_code_t::unlock;
            temp.timestamp = gettime();
            temp.uniqueid = myuniqueid;
            //              std::cout << thread << ": lock released for myuniqueid="
            //              << myuniqueid << std::endl;
            dispatcher->write(make_io_req(lockfilea, temp.bytes, sizeof(temp), 0))
            .get();
          }
        }
        catch(const system_error &e)
        {
          std::cerr << "ERROR: test exits via system_error code " << e.code().value()
                    << "(" << e.what() << ")" << std::endl;
          abort();
        }
        catch(const std::exception &e)
        {
          std::cerr << "ERROR: test exits via exception (" << e.what() << ")"
                    << std::endl;
          abort();
        }
        catch(...)
        {
          std::cerr << "ERROR: test exits via unknown exception" << std::endl;
          abort();
        }
      }));
    }
    auto begin = chrono::high_resolution_clock::now();
    done = false;
    std::this_thread::sleep_for(std::chrono::seconds(20));
    done = true;
    std::cout << "Waiting for threads to exit ..." << std::endl;
    for(auto &i : threads)
      i.join();
    auto end = chrono::high_resolution_clock::now();
    auto diff = chrono::duration_cast<secs_type>(end - begin);
    std::cout << "For " << writers << " concurrent writers, achieved "
              << (attempts / diff.count()) << " attempts per second with a "
                                              "success rate of "
              << (successes / diff.count()) << " writes per second which is a "
              << (100.0 * successes / attempts) << "% success rate." << std::endl;
    atomic_log_locks = successes / diff.count();
  }
  filesystem::remove_all("testdir");
  std::cout << "Traditional locks were " << (traditional_locks / atomic_log_locks)
            << " times faster." << std::endl;
  return 0;
}


PrevUpHomeNext