Home | Libraries | People | FAQ | More |
TripleGit, the forthcoming reliable graph database store, ideally needs to allow multiple processes to concurrently read and write the graph store without mutual exclusion where possible. Each collection of hashes which form the tree which makes up some given version of the graph is itself a hashed object in the content addressable store, and you can have multiple named graphs in the graph store which may or may not share nodes. One problem as with all databases is how to efficiently issue an atomic transaction which updates multiple graphs simultaneously and atomically when there is the potential of concurrent writers also trying to issue write transactions which may, or may not, cause conflict with other transactions.
The really naive solution is to keep a single lock file which is created using O_EXCL i.e. fail if you didn't create the file for the entire graph store. This serialises all transactions, and therefore eliminates any problems with updates clashing. This is usefully well supported by all operating systems, and by NFS and Samba.
A less naive solution is to keep one lock file per graph, and to use a multiple lock and backoff strategy to lock the requisite number of graphs for some given transaction. The big problem with this approach is that you are unfairly penalised for especially large multi-graph transactions over others with smaller transactions as lock complexity is worse than linear. Nevertheless performance is actually not bad: these are results for my 3.9Ghz i7-3770K workstation using AFIO to implement the lock file with various numbers of concurrent writers (note that Windows provides magic flags for telling it about lock files, if not used expect a 40% performance reduction):
Table 1.11. Lock file performance on various operating systems:
writers |
lock files |
Win8.1 x64 NTFS HD |
Win8.1 x64 NTFS SSD |
Linux x64 ext4 SSD |
FreeBSD 10.1 ZFS SSD |
---|---|---|---|---|---|
1 |
1 |
2468 |
2295 |
3590 |
9922 |
2 |
1 |
2507 |
2385 |
3583 |
9903 |
4 |
1 |
1966 |
2161 |
3664 |
9684 |
8 |
1 |
1400 |
1851 |
3703 |
6537 |
16 |
1 |
742 |
602 |
3833 |
1251 |
1 |
8 |
826 |
888 |
1378 |
2455 |
2 |
8 |
508 |
637 |
576 |
917 |
4 |
8 |
67 |
167 |
417 |
63 |
8 |
8 |
37 |
117 |
106 |
0.55 |
16 |
8 |
33 |
77 |
26 |
0.5 |
As you can see, Linux does a very good job of O(1) to waiters complexity, but performance on Windows and FreeBSD collapses once you exceed CPU cores. Also, Windows is sensitive to device block size — the hard drive outperforms the SSD because it has 512 byte sectors and the SSD has 4096 byte sectors. I suspect that internally Windows memcpy()'s in device native sector sizes, or is actually sending data to physical storage despite us marking this file as temporary. One very striking observation is how FreeBSD is a full 2.76x the performance of Linux and 4.32x that of Windows.
A much more intelligent way of solving this problem is to figure out which graphs are common across each of the transactions currently pending, and to strictly order the transactions in the sequence which maximises throughput without updates clashing. One way of many distributed writers constructing a shared graph of dependencies is to append messages into a shared file, and then one can deploy a distributed mutual exclusion algorithm of which those by Suzuki and Kasami, Maekawa and Ricart and Agrawala are the most famous. This requires the ability to atomically append to the shared file, something guaranteed on all operating systems, but unfortunately not guaranteed by NFS nor Samba (though when combined with advisory file locking those do work as expected, albeit with poor performance). This means that on an all-Windows setup, or if on POSIX and not using NFS nor Samba, the atomic append method could be valuable, especially as the cost of locking multiple actors is pretty much the same as locking a single actor so you get eight graphs locked for the same cost as locking one.
Table 1.12. Atomic append lock performance on various operating systems:
writers |
lock files |
Win8.1 x64 NTFS HD |
Win8.1 x64 NTFS SSD |
Linux x64 ext4 SSD |
FreeBSD 10.1 ZFS SSD |
---|---|---|---|---|---|
1 |
1 |
2592 |
2875 |
1198 |
29 |
2 |
1 |
1284 |
2565 |
1344 |
25 |
4 |
1 |
1420 |
2384 |
1327 |
35 |
8 |
1 |
1262 |
1764 |
1254 |
55 |
16 |
1 |
428 |
520 |
1260 |
37 |
Linux once against does a great job of O(1) to waiters complexity, but at the third of the speed of a simple lock file and up to half the speed of Windows. Windows does better than Linux here especially on SSDs where it is faster than a simple lock file, but doesn't scale to waiters once they pass CPU core count. FreeBSD is two orders of magnitude slower which is because ZFS checksums and copy on writes file changes, so every time we append 16 bytes we are forcing a full copy of the 128Kb extent to be issued. It would appear that ZFS syncs its internal buffers when a different file descriptor atomic appends to the same file — this has the above pathological performance outcome unfortunately.
This introduces the final potential solution which is that of the quagmire of advisory file locking. This is an area where Windows and POSIX diverge very significantly, and the interactions between Windows and POSIX when Windows locks regions in a file on a Samba share on a POSIX machine or when POSIX does byte range locking at all (there is a very fun stanza in the POSIX standard which basically releases all your locks on first file descriptor close) are full of quirks, races and other nasties. For this reason you should avoid the very temporary and experimental code currently in AFIO which implements Samba and NFS safe file range locking where theoretically both Windows and POSIX code can safely lock ranges in files concurrently, those APIs are not documented for good reason! Still, performance with these — despite the hoop jumping inefficiencies AFIO leaps through to implement relatively sane semantics — is impressive.
Table 1.13. Advisory lock file performance on various operating systems:
writers |
lock files |
Win8.1 x64 NTFS HD |
Win8.1 x64 NTFS SSD |
Linux x64 ext4 SSD |
FreeBSD 10.1 ZFS SSD |
---|---|---|---|---|---|
1 |
1 |
5799 |
5166 |
3466 |
21536 |
2 |
1 |
5788 |
6656 |
2215 |
11654 |
4 |
1 |
5775 |
7020 |
1073 |
5384 |
8 |
1 |
5773 |
6738 |
518 |
2584 |
16 |
1 |
5695 |
5617 |
360 |
1326 |
Fascinatingly the tables suddenly switch here: Windows sees O(1) to waiters complexity, whilst Linux and FreeBSD sees a mostly O(N) to waiters complexity drop in performance. FreeBSD, as with the simple lock file performance, blows all others out of the water again in advisory lock performance too. I should add here that because POSIX advisory locks are per process, the Linux and FreeBSD benchmarks were generated by running N copies of the benchmark program whereas the NT kernel inherits the really excellent and well thought through file byte range locking model of DEC VMS and treats locks as effectively reference counted byte ranges, and therefore works as expected from a single process. I have yet to add process-local byte range locking to simulate sane range locking for POSIX, so expect the numbers above to worsen.
After all of that, we are left with this locking strategy matrix for TripleGit:
Table 1.14. Fastest file system locking strategy for various operating systems:
Operating system |
Best locking policy |
---|---|
Win8.1 x64 NTFS |
Advisory locks fastest, then atomic append locks, finally lock files |
Linux x64 ext4 |
Lock files fastest, then advisory locks, finally atomic append locks |
FreeBSD 10.1 ZFS |
Advisory locks fastest, then lock files, avoid atomic append locks at all costs |
I should emphasise once again that the advisory locking code is riddled with bugs and you should not use it in your code at this time. Once I have a CI testing all possible combinations of locking and nothing is erroring out I'll release that code for production use, probably in v1.4.
All these benchmarks came from this benchmarking program I wrote using AFIO which illustrates how you might implement the techniques used above:
int main(int argc, const char *argv[]) { using namespace BOOST_AFIO_V2_NAMESPACE; using BOOST_AFIO_V2_NAMESPACE::off_t; typedef chrono::duration<double, ratio<1, 1>> secs_type; double traditional_locks = 0, atomic_log_locks = 0; try { filesystem::remove_all("testdir"); } catch(...) { } size_t totalwriters = 2, writers = totalwriters; if(argc > 1) writers = totalwriters = atoi(argv[1]); { auto dispatcher = make_dispatcher().get(); auto mkdir(dispatcher->dir(path_req("testdir", file_flags::create))); auto mkdir1(dispatcher->dir(path_req::relative(mkdir, "1", file_flags::create))); auto mkdir2(dispatcher->dir(path_req::relative(mkdir, "2", file_flags::create))); auto mkdir3(dispatcher->dir(path_req::relative(mkdir, "3", file_flags::create))); auto mkdir4(dispatcher->dir(path_req::relative(mkdir, "4", file_flags::create))); auto mkdir5(dispatcher->dir(path_req::relative(mkdir, "5", file_flags::create))); auto mkdir6(dispatcher->dir(path_req::relative(mkdir, "6", file_flags::create))); auto mkdir7(dispatcher->dir(path_req::relative(mkdir, "7", file_flags::create))); auto mkdir8(dispatcher->dir(path_req::relative(mkdir, "8", file_flags::create))); auto statfs_(dispatcher->statfs(mkdir, fs_metadata_flags::All)); auto statfs(statfs_.get()); std::cout << "The filing system holding our test directory is " << statfs.f_fstypename << " and has features:" << std::endl; #define PRINT_FIELD(field, ...) \ std::cout << " f_flags." #field ": "; \ std::cout << statfs.f_flags.field __VA_ARGS__ << std::endl PRINT_FIELD(rdonly); PRINT_FIELD(noexec); PRINT_FIELD(nosuid); PRINT_FIELD(acls); PRINT_FIELD(xattr); PRINT_FIELD(compression); PRINT_FIELD(extents); PRINT_FIELD(filecompression); #undef PRINT_FIELD #define PRINT_FIELD(field, ...) \ std::cout << " f_" #field ": "; \ std::cout << statfs.f_##field __VA_ARGS__ << std::endl PRINT_FIELD(bsize); PRINT_FIELD(iosize); PRINT_FIELD(blocks, << " (" << (statfs.f_blocks * statfs.f_bsize / 1024.0 / 1024.0 / 1024.0) << " Gb)"); PRINT_FIELD(bfree, << " (" << (statfs.f_bfree * statfs.f_bsize / 1024.0 / 1024.0 / 1024.0) << " Gb)"); PRINT_FIELD(bavail, << " (" << (statfs.f_bavail * statfs.f_bsize / 1024.0 / 1024.0 / 1024.0) << " Gb)"); #undef PRINT_FIELD } if(1) { std::cout << "\nBenchmarking a single traditional lock file with " << writers << " concurrent writers ...\n"; std::vector<thread> threads; atomic<bool> done(true); atomic<size_t> attempts(0), successes(0); for(size_t n = 0; n < writers; n++) { threads.push_back(thread( [&done, &attempts, &successes, n] { try { // Create a dispatcher auto dispatcher = make_dispatcher().get(); // Schedule opening the log file for writing log entries auto logfile(dispatcher->file( path_req("testdir/log", file_flags::create | file_flags::read_write))); // Retrieve any errors which occurred logfile.get(); // Wait until all threads are ready while(done) { this_thread::yield(); } while(!done) { // Traditional file locks are very simple: try to exclusively create the // lock file. // If you succeed, you have the lock. auto lockfile(dispatcher->file( path_req("testdir/log.lock", file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close))); attempts.fetch_add(1, memory_order_relaxed); // v1.4 of the AFIO engine will return error_code instead of exceptions // for this try { lockfile.get(); } catch(const system_error &e) { continue; } std::string logentry("I am log writer "), mythreadid(to_string(n)), logentryend("!\n"); // Fetch the size off_t where = logfile->lstat().st_size, entrysize = logentry.size() + mythreadid.size() + logentryend.size(); // Schedule extending the log file auto extendlog(dispatcher->truncate(logfile, where + entrysize)); // Schedule writing the new entry auto writetolog(dispatcher->write( make_io_req(extendlog, {logentry, mythreadid, logentryend}, where))); writetolog.get(); extendlog.get(); successes.fetch_add(1, memory_order_relaxed); } } catch(const system_error &e) { std::cerr << "ERROR: test exits via system_error code " << e.code().value() << "(" << e.what() << ")" << std::endl; abort(); } catch(const std::exception &e) { std::cerr << "ERROR: test exits via exception (" << e.what() << ")" << std::endl; abort(); } catch(...) { std::cerr << "ERROR: test exits via unknown exception" << std::endl; abort(); } })); } auto begin = chrono::high_resolution_clock::now(); done = false; std::this_thread::sleep_for(std::chrono::seconds(20)); done = true; std::cout << "Waiting for threads to exit ..." << std::endl; for(auto &i : threads) i.join(); auto end = chrono::high_resolution_clock::now(); auto diff = chrono::duration_cast<secs_type>(end - begin); std::cout << "For " << writers << " concurrent writers, achieved " << (attempts / diff.count()) << " attempts per second with a " "success rate of " << (successes / diff.count()) << " writes per second which is a " << (100.0 * successes / attempts) << "% success rate." << std::endl; traditional_locks = successes / diff.count(); } if(1) { std::cout << "\nBenchmarking eight traditional lock files with " << writers << " concurrent writers ...\n"; std::vector<thread> threads; atomic<bool> done(true); atomic<size_t> attempts(0), successes(0); for(size_t n = 0; n < writers; n++) { threads.push_back(thread( [&done, &attempts, &successes, n] { try { // Create a dispatcher auto dispatcher = make_dispatcher().get(); // Schedule opening the log file for writing log entries auto logfile(dispatcher->file( path_req("testdir/log", file_flags::create | file_flags::read_write))); // Retrieve any errors which occurred logfile.get(); // Wait until all threads are ready while(done) { this_thread::yield(); } while(!done) { // Parallel try to exclusively create all eight lock files std::vector<path_req> lockfiles; lockfiles.reserve(8); lockfiles.push_back( path_req("testdir/1/log.lock", file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)); lockfiles.push_back( path_req("testdir/2/log.lock", file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)); lockfiles.push_back( path_req("testdir/3/log.lock", file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)); lockfiles.push_back( path_req("testdir/4/log.lock", file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)); lockfiles.push_back( path_req("testdir/5/log.lock", file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)); lockfiles.push_back( path_req("testdir/6/log.lock", file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)); lockfiles.push_back( path_req("testdir/7/log.lock", file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)); lockfiles.push_back( path_req("testdir/8/log.lock", file_flags::create_only_if_not_exist | file_flags::write | file_flags::temporary_file | file_flags::delete_on_close)); auto lockfile(dispatcher->file(lockfiles)); attempts.fetch_add(1, memory_order_relaxed); #if 1 // v1.4 of the AFIO engine will return error_code instead of exceptions // for this try { lockfile[7].get(); } catch(const system_error &e) { continue; } try { lockfile[6].get(); } catch(const system_error &e) { continue; } try { lockfile[5].get(); } catch(const system_error &e) { continue; } try { lockfile[4].get(); } catch(const system_error &e) { continue; } try { lockfile[3].get(); } catch(const system_error &e) { continue; } try { lockfile[2].get(); } catch(const system_error &e) { continue; } try { lockfile[1].get(); } catch(const system_error &e) { continue; } try { lockfile[0].get(); } catch(const system_error &e) { continue; } #else /*_ 1 _*/ try { auto barrier(dispatcher->barrier(lockfile)); // v1.4 of the AFIO engine will return error_code instead of exceptions // for this for(size_t n = 0; n < 8; n++) barrier[n].get(); } catch(const system_error &e) { continue; } #endif /*_ 1 _*/ std::string logentry("I am log writer "), mythreadid(to_string(n)), logentryend("!\n"); // Fetch the size off_t where = logfile->lstat().st_size, entrysize = logentry.size() + mythreadid.size() + logentryend.size(); // Schedule extending the log file auto extendlog(dispatcher->truncate(logfile, where + entrysize)); // Schedule writing the new entry auto writetolog(dispatcher->write( make_io_req(extendlog, {logentry, mythreadid, logentryend}, where))); // Fetch errors from the last operation first to avoid sleep-wake cycling writetolog.get(); extendlog.get(); successes.fetch_add(1, memory_order_relaxed); } } catch(const system_error &e) { std::cerr << "ERROR: test exits via system_error code " << e.code().value() << "(" << e.what() << ")" << std::endl; abort(); } catch(const std::exception &e) { std::cerr << "ERROR: test exits via exception (" << e.what() << ")" << std::endl; abort(); } catch(...) { std::cerr << "ERROR: test exits via unknown exception" << std::endl; abort(); } })); } auto begin = chrono::high_resolution_clock::now(); done = false; std::this_thread::sleep_for(std::chrono::seconds(20)); done = true; std::cout << "Waiting for threads to exit ..." << std::endl; for(auto &i : threads) i.join(); auto end = chrono::high_resolution_clock::now(); auto diff = chrono::duration_cast<secs_type>(end - begin); std::cout << "For " << writers << " concurrent writers, achieved " << (attempts / diff.count()) << " attempts per second with a " "success rate of " << (successes / diff.count()) << " writes per second which is a " << (100.0 * successes / attempts) << "% success rate." << std::endl; } // **** WARNING UNSUPPORTED UNDOCUMENTED API DO NOT USE IN YOUR CODE **** if(1) { std::cout << "\nBenchmarking a ranged file lock with " << writers << " concurrent writers ...\n"; std::vector<thread> threads; atomic<bool> done(true); atomic<size_t> attempts(0), successes(0); for(size_t n = 0; n < writers; n++) { threads.push_back(thread( [&done, &attempts, &successes, n] { try { // Create a dispatcher auto dispatcher = make_dispatcher().get(); // Schedule opening the log file for writing log entries auto logfile(dispatcher->file( path_req("testdir/log", file_flags::create | file_flags::read_write | file_flags::os_lockable))); // Retrieve any errors which occurred logfile.get(); // Wait until all threads are ready while(done) { this_thread::yield(); } while(!done) { attempts.fetch_add(1, memory_order_relaxed); // **** WARNING UNSUPPORTED UNDOCUMENTED API DO NOT USE IN YOUR CODE **** dispatcher->lock({logfile}).front().get(); std::string logentry("I am log writer "), mythreadid(to_string(n)), logentryend("!\n"); // Fetch the size off_t where = logfile->lstat().st_size, entrysize = logentry.size() + mythreadid.size() + logentryend.size(); // Schedule extending the log file auto extendlog(dispatcher->truncate(logfile, where + entrysize)); // Schedule writing the new entry auto writetolog(dispatcher->write( make_io_req(extendlog, {logentry, mythreadid, logentryend}, where))); writetolog.get(); extendlog.get(); successes.fetch_add(1, memory_order_relaxed); dispatcher->lock({{logfile, nullptr}}).front().get(); } } catch(const system_error &e) { std::cerr << "ERROR: test exits via system_error code " << e.code().value() << "(" << e.what() << ")" << std::endl; abort(); } catch(const std::exception &e) { std::cerr << "ERROR: test exits via exception (" << e.what() << ")" << std::endl; abort(); } catch(...) { std::cerr << "ERROR: test exits via unknown exception" << std::endl; abort(); } })); } auto begin = chrono::high_resolution_clock::now(); done = false; std::this_thread::sleep_for(std::chrono::seconds(20)); done = true; std::cout << "Waiting for threads to exit ..." << std::endl; for(auto &i : threads) i.join(); auto end = chrono::high_resolution_clock::now(); auto diff = chrono::duration_cast<secs_type>(end - begin); std::cout << "For " << writers << " concurrent writers, achieved " << (attempts / diff.count()) << " attempts per second with a " "success rate of " << (successes / diff.count()) << " writes per second which is a " << (100.0 * successes / attempts) << "% success rate." << std::endl; } if(1) { std::cout << "\nBenchmarking file locks via atomic append with " << writers << " concurrent writers ...\n"; std::vector<thread> threads; atomic<bool> done(true); atomic<size_t> attempts(0), successes(0); for(size_t thread = 0; thread < writers; thread++) { threads.push_back(std::thread( [&done, &attempts, &successes, thread] { try { // Create a dispatcher auto dispatcher = make_dispatcher().get(); // Schedule opening the log file for writing log entries auto logfile(dispatcher->file( path_req("testdir/log", file_flags::create | file_flags::read_write))); // Schedule opening the lock file for scanning and hole punching auto lockfilez(dispatcher->file(path_req( "testdir/log.lock", file_flags::create | file_flags::read_write))); // Schedule opening the lock file for atomic appending auto lockfilea(dispatcher->file( path_req("testdir/log.lock", file_flags::create | file_flags::write | file_flags::append))); // Retrieve any errors which occurred lockfilea.get(); lockfilez.get(); logfile.get(); while(!done) { // Each lock log entry is 16 bytes in length. enum class message_code_t : uint8_t { unlock = 0, havelock = 1, rescind = 2, interest = 3, nominate = 5 }; #pragma pack(push, 1) union message_t { char bytes[16]; struct { message_code_t code; char __padding1[3]; uint32_t timestamp; // time_t uint64_t uniqueid; }; }; #pragma pack(pop) static_assert(sizeof(message_t) == 16, "message_t is not 16 bytes long!"); auto gettime = [] { return (uint32_t)(std::time(nullptr) - 1420070400UL /* 1st Jan 2015*/); }; message_t temp, buffers[256]; off_t buffersoffset; uint32_t nowtimestamp = gettime(); // TODO FIXME: If multiple machines are all accessing the lock file, // nowtimestamp // ought to be corrected for drift // Step 1: Register my interest memset(temp.bytes, 0, sizeof(temp)); temp.code = message_code_t::interest; temp.timestamp = nowtimestamp; temp.uniqueid = thread; // TODO FIXME: Needs to be a VERY random number // to prevent collision. dispatcher->write(make_io_req(lockfilea, temp.bytes, sizeof(temp), 0)) .get(); // Step 2: Wait until my interest message appears, also figure out what // interests precede // mine and where my start of interest begins, and if someone // currently has the lock off_t startofinterest = dispatcher->extents(lockfilez).get().front().first; off_t myuniqueid = (off_t) -1; bool findPreceding = true; std::vector<std::pair<bool, off_t>> preceding; std::pair<bool, off_t> lockid; auto iterate = [&] { size_t validPrecedingCount = 0; off_t lockfilesize = lockfilez->lstat(metadata_flags::size).st_size; buffersoffset = lockfilesize > sizeof(buffers) ? lockfilesize - sizeof(buffers) : 0; // buffersoffset-=buffersoffset % sizeof(buffers[0]); for(; !validPrecedingCount && buffersoffset >= startofinterest && buffersoffset < lockfilesize; buffersoffset -= sizeof(buffers)) { size_t amount = (size_t)(lockfilesize - buffersoffset); if(amount > sizeof(buffers)) amount = sizeof(buffers); dispatcher->read(make_io_req(lockfilez, (void *) buffers, amount, buffersoffset)).get(); for(size_t n = amount / sizeof(buffers[0]) - 1; !validPrecedingCount && n < amount / sizeof(buffers[0]); n--) { // Early exit if messages have become stale if(!buffers[n].timestamp || (buffers[n].timestamp < nowtimestamp && nowtimestamp - buffers[n].timestamp > 20)) { startofinterest = buffersoffset + n * sizeof(buffers[0]); break; } // Find if he is locked or unlocked if(lockid.second == (off_t) -1) { if(buffers[n].code == message_code_t::unlock) lockid = std::make_pair(false, buffers[n].uniqueid); else if(buffers[n].code == message_code_t::havelock) lockid = std::make_pair(true, buffers[n].uniqueid); } // Am I searching for my interest? if(myuniqueid == (off_t) -1) { if(!memcmp(buffers + n, &temp, sizeof(temp))) myuniqueid = buffersoffset + n * sizeof(buffers[0]); } else if(findPreceding && (buffers[n].uniqueid < myuniqueid || buffersoffset + n * sizeof(buffers[0]) < myuniqueid)) { // We are searching for preceding claims now if(buffers[n].code == message_code_t::rescind || buffers[n].code == message_code_t::unlock) preceding.push_back( std::make_pair(false, buffers[n].uniqueid)); else if(buffers[n].code == message_code_t::nominate || buffers[n].code == message_code_t::havelock) { if(buffers[n].uniqueid < myuniqueid && preceding.end() == std::find( preceding.begin(), preceding.end(), std::make_pair(false, (off_t) buffers[n].uniqueid))) { preceding.push_back( std::make_pair(true, buffers[n].uniqueid)); validPrecedingCount++; } } else if(buffers[n].code == message_code_t::interest) { if(buffersoffset + n * sizeof(buffers[0]) < myuniqueid && preceding.end() == std::find(preceding.begin(), preceding.end(), std::make_pair(false, buffersoffset + n * sizeof(buffers[0])))) { preceding.push_back(std::make_pair( true, buffersoffset + n * sizeof(buffers[0]))); validPrecedingCount++; } } } } } #if 0 std::cout << thread << ": myuniqueid=" << myuniqueid << " startofinterest=" << startofinterest << " size=" << lockfilez->lstat(metadata_flags::size).st_size << " lockid=" << lockid.first << "," << lockid.second << " preceding="; for(auto &i : preceding) std::cout << i.first << "," << i.second << ";"; std::cout << std::endl; #endif /*_ 0 _*/ if(findPreceding) { // Remove all rescinded interests preceding ours preceding.erase(std::remove_if(preceding.begin(), preceding.end(), [](const std::pair<bool, off_t> &i) { return !i.first; }), preceding.end()); std::sort(preceding.begin(), preceding.end()); findPreceding = false; } }; do { lockid = std::make_pair(false, (off_t) -1); iterate(); // Didn't find it, so sleep and retry, maybe st_size will have updated // by then if(myuniqueid == (off_t) -1) this_thread::sleep_for(chrono::milliseconds(1)); } while(myuniqueid == (off_t) -1); // Step 3: If there is no lock and no interest precedes mine, claim the // mutex. Else issue a nominate for myself, // once per ten seconds. { nowtimestamp = 0; mutex m; condition_variable c; unique_lock<decltype(m)> l(m); atomic<bool> fileChanged(false); for(;;) { attempts.fetch_add(1, memory_order_relaxed); temp.timestamp = gettime(); temp.uniqueid = myuniqueid; if(preceding.empty()) { temp.code = message_code_t::havelock; dispatcher->write(make_io_req(lockfilea, temp.bytes, sizeof(temp), 0)).get(); // Zero the range between startofinterest and myuniqueid if(startofinterest < myuniqueid) { std::vector<std::pair<off_t, off_t>> range = { {startofinterest, myuniqueid - startofinterest}}; dispatcher->zero(lockfilez, range).get(); // std::cout << thread << ": lock taken for // myuniqueid=" << myuniqueid << ", zeroing " // << range.front().first << ", " << // range.front().second << std::endl; } break; } else { auto lockfilechanged = [&] { fileChanged = true; c.notify_all(); }; // TODO FIXME: Put a modify watch on the lockfile instead of // spinning if(temp.timestamp - nowtimestamp >= 10) { temp.code = message_code_t::nominate; dispatcher->write(make_io_req(lockfilea, temp.bytes, sizeof(temp), 0)).get(); nowtimestamp = temp.timestamp; } // c.wait_for(l, chrono::milliseconds(1), [&fileChanged]{ return // fileChanged==true; }); fileChanged = false; preceding.clear(); findPreceding = true; lockid = std::make_pair(false, (off_t) -1); iterate(); } } } // Step 4: I now have the lock, so do my thing std::string logentry("I am log writer "), mythreadid(to_string(thread)), logentryend("!\n"); // Fetch the size off_t where = logfile->lstat().st_size, entrysize = logentry.size() + mythreadid.size() + logentryend.size(); // Schedule extending the log file auto extendlog(dispatcher->truncate(logfile, where + entrysize)); // Schedule writing the new entry auto writetolog(dispatcher->write( make_io_req(extendlog, {logentry, mythreadid, logentryend}, where))); // Fetch errors from the last operation first to avoid sleep-wake cycling writetolog.get(); extendlog.get(); successes.fetch_add(1, memory_order_relaxed); // std::cout << thread << ": doing work for myuniqueid=" << // myuniqueid << std::endl; // this_thread::sleep_for(chrono::milliseconds(250)); // Step 5: Release the lock temp.code = message_code_t::unlock; temp.timestamp = gettime(); temp.uniqueid = myuniqueid; // std::cout << thread << ": lock released for myuniqueid=" // << myuniqueid << std::endl; dispatcher->write(make_io_req(lockfilea, temp.bytes, sizeof(temp), 0)) .get(); } } catch(const system_error &e) { std::cerr << "ERROR: test exits via system_error code " << e.code().value() << "(" << e.what() << ")" << std::endl; abort(); } catch(const std::exception &e) { std::cerr << "ERROR: test exits via exception (" << e.what() << ")" << std::endl; abort(); } catch(...) { std::cerr << "ERROR: test exits via unknown exception" << std::endl; abort(); } })); } auto begin = chrono::high_resolution_clock::now(); done = false; std::this_thread::sleep_for(std::chrono::seconds(20)); done = true; std::cout << "Waiting for threads to exit ..." << std::endl; for(auto &i : threads) i.join(); auto end = chrono::high_resolution_clock::now(); auto diff = chrono::duration_cast<secs_type>(end - begin); std::cout << "For " << writers << " concurrent writers, achieved " << (attempts / diff.count()) << " attempts per second with a " "success rate of " << (successes / diff.count()) << " writes per second which is a " << (100.0 * successes / attempts) << "% success rate." << std::endl; atomic_log_locks = successes / diff.count(); } filesystem::remove_all("testdir"); std::cout << "Traditional locks were " << (traditional_locks / atomic_log_locks) << " times faster." << std::endl; return 0; }