Bucket Hash Table
A concurrent (thread safe) implementation of the bucket hashing principle. Uses a configured number of partitions/shards to store a subset of the items stored in the table. Each partition/shard is implemented similar to an open addressing hash table, while all the overflow items are stored in a common binary search tree (
std::set
).Note: The implementation is incomplete, and the iterators are not thread safe.
HashTable
Header file that declares the interface for the hash table. Note that since the overflow is stored in a binary search tree, the elements being stored must also define the less than operator (
<
) or provide a custom functor.// ParallelHashTable.h #pragma once #include <array> #include <memory> #include <mutex> #include <optional> #include <set> #include <shared_mutex> #include <sstream> namespace csc280 { template <typename T, std::size_t P, std::size_t B, typename Hash = std::hash<T>, typename Equals = std::equal_to<T>, typename Less = std::less<T>> class ParallelHashTable final { #include "private/PHTTable.h" using Table = PHTTable<T, B, Equals>; using Array = std::array<Table, P>; using ArrayPtr = std::unique_ptr<Array>; using Overflow = std::set<T, Less>; public: #include "private/PHTIterator.h" using key_type = T; using value_type = T; using iterator = PHTIterator<T, P, B, Hash, Equals, Less>; using const_iterator = PHTIterator<T, P, B, Hash, Equals, Less>; ParallelHashTable() : tables{ std::make_unique<Array>() } {} ~ParallelHashTable() = default; ParallelHashTable( const ParallelHashTable& ) = delete; ParallelHashTable& operator=( const ParallelHashTable& rhs ) = delete; auto begin() const -> iterator; auto cbegin() const -> const_iterator { return begin(); } auto end() const -> iterator; auto cend() const -> const_iterator { return end(); } auto insert( T data ) -> iterator; template <typename... Args> iterator emplace( Args&&... args ) { return insert( { args } ); } auto find( const T& data ) const -> iterator; auto erase( const T& data ) -> iterator; void clear(); bool empty() const { return !size(); } std::size_t size() const { return stats.occupied + stats.overflow; } constexpr std::size_t capacity() const { return P * B; } std::string getStats() const { return static_cast<std::string>( stats ); } private: #include "private/OAStats.h" std::size_t tableIndex( const std::size_t hashCode ) const; ArrayPtr tables; Overflow overflow; OAStats<P*B> stats; Hash hash; Equals equals; const std::hash<std::size_t> tableHash{}; mutable std::shared_mutex mutex; }; #include "private/PHTImpl.h" }
Iterator
Iterator implementation of the hash table. Note that the iterator is not thread safe at present.
// private/PHTIterator.h template <typename T, std::size_t P, std::size_t B, typename Hash, typename Equals, typename Less> struct PHTIterator final { using difference_type = std::size_t; using value_type = T; using reference = T& ; using const_reference = const T&; using pointer = T* ; using const_pointer = const T*; using iterator_category = std::bidirectional_iterator_tag; PHTIterator() = delete; PHTIterator( const PHTIterator& ) = default; PHTIterator& operator++() { if ( titer == tables->cend() && oiter == overflow->cend() ) return *this; if ( titer == tables->cend() ) { ++oiter; return *this; } if ( biter != titer->cend() ) ++biter; if ( biter == titer->cend() ) { ++titer; if ( titer != tables->cend() ) { biter = titer->cbegin(); return *this; } --titer; biter = titer->cend(); ++titer; } return *this; } PHTIterator& operator--() { if ( titer == tables->cbegin() && biter == titer->cbegin() ) return *this; if ( oiter != overflow->cbegin() ) { --oiter; return *this; } if ( biter != titer->cbegin() ) { --biter; return *this; } if ( titer != tables->cbegin() ) { --titer; biter = --( titer->cend() ); } return *this; } bool operator==( const PHTIterator& rhs ) const { return tables == rhs.tables && overflow == rhs.overflow && titer == rhs.titer && biter == rhs.biter && oiter == rhs.oiter; } bool operator!=( const PHTIterator& rhs ) const { return !operator==( rhs ); } const_reference operator*() const { return ( titer == tables->cend() ) ? oiter.operator*() : biter.operator*()->value(); } const_pointer operator->() const { return ( titer == tables->cend() ) ? oiter.operator->() : *(biter.operator->()->value); } private: explicit PHTIterator( const Array* tables, const Overflow* overflow ) : tables{ tables }, overflow{ overflow }, titer{ tables->cbegin() }, biter{ titer->cbegin() }, oiter { overflow->cbegin() } {} static PHTIterator createBegin( const Array* tables, const Overflow* overflow ) { return PHTIterator( tables, overflow ); } static PHTIterator createEnd( const Array* tables, const Overflow* overflow ) { auto iter = PHTIterator( tables, overflow ); iter.titer = tables->cend(); --( iter.titer ); iter.biter = iter.titer->cend(); ++( iter.titer ); iter.oiter = overflow->cend(); return iter; } static PHTIterator createTable( const Array* tables, const Overflow* overflow, const std::size_t tableIndex, const std::size_t bucketIndex ) { auto iter = PHTIterator( tables, overflow ); std::advance( iter.titer, tableIndex ); iter.biter = iter.titer->cbegin(); std::advance( iter.biter, bucketIndex ); return iter; } static PHTIterator createOverflow( const Array* tables, const Overflow* overflow, typename const Overflow::const_iterator oiter ) { auto iter = PHTIterator( tables, overflow ); iter.titer = --( tables->cend() ); iter.biter = iter.titer->cend(); ++( iter.titer ); iter.oiter = oiter; return iter; } const Array* tables; const Overflow* overflow; typename Array::const_iterator titer; typename Table::const_iterator biter; typename Overflow::const_iterator oiter; friend class ParallelHashTable<T, P, B, Hash, Equals, Less>; };
Table
Implementation of the open address hash table used as a single partition within the parallel hash table implementation.
// private/PHTTable.h template <typename T, std::size_t B, typename Equals = std::equal_to<T>> struct PHTTable final { using Data = std::optional<T>; using Ptr = std::unique_ptr<Data>; using Array = std::array<Ptr, B>; using ArrayPtr = std::unique_ptr<Array>; using const_iterator = typename Array::const_iterator; PHTTable() : buckets{ std::make_unique<Array>() } {} std::size_t insertIndex( const T& data, const std::size_t hashCode ) { if ( stats.occupied == B ) return B; return probe( data, hashCode ); } void insert( T data, const std::size_t index ) { std::unique_lock<std::shared_mutex> lock( mutex ); buckets->at( index ) = std::make_unique<Data>( Data{ std::move( data ) } ); ++stats.occupied; } std::size_t find( const T& data, const std::size_t hashCode ) { const auto hasValue = [this, &data]( const std::size_t index ) -> bool { const auto optional = buckets->at( index ).get(); return ( optional->has_value() && equals( data, optional->value() ) ); }; auto index = hashCode % B; std::shared_lock<std::shared_mutex> lock( mutex ); if ( !buckets->at( index ) ) return B + 1; if ( hasValue( index ) ) return index; std::size_t i = 0; while ( true ) { if ( i > 10 ) return B; index = ( hashCode + ( i * i ) ) % B; if ( !buckets->at( index ) ) return B + 1; if ( hasValue( index ) ) return index; ++i; } return B; } std::size_t erase( const T& data, const std::size_t hashCode ) { auto index = find( data, hashCode ); if ( index >= B ) return index; std::unique_lock<std::shared_mutex> lock( mutex ); buckets->at( index )->reset(); --stats.occupied; return index; } const Data* at( const std::size_t index ) const { std::shared_lock<std::shared_mutex> lock( mutex ); return buckets->at( index ).get(); } void clear() { std::unique_lock<std::shared_mutex> lock( mutex ); for ( auto iter = buckets->begin(); iter != buckets->end(); ++iter ) { iter->reset(); } stats.clear(); count = 0; } const_iterator cbegin() const { return buckets->cbegin(); } const_iterator cend() const { return buckets->cend(); } std::size_t probe( const T& data, const std::size_t hashCode ) const { if ( stats.occupied == B ) return B; auto index = hashCode % B; std::size_t i = 0; std::shared_lock<std::shared_mutex> lock( mutex ); while ( !isEmpty( index ) ) { if ( equals( data, buckets->at( index )->value() ) ) return index; ++stats.collisions; if ( ++i > 10 ) return B; index = ( hashCode + ( i * i ) ) % B; } return index; } bool isEmpty( const std::size_t index ) const { auto ptr = buckets->at( index ).get(); return ( !ptr ) ? true : !ptr->has_value(); } #include "private/OAStats.h" ArrayPtr buckets; mutable OAStats<B> stats; mutable std::shared_mutex mutex; Equals equals; };
Implementation
The implementation of the functions for the hash table.
// private/PHTImpl.h template<typename T, std::size_t P, std::size_t B, typename Hash, typename Equals, typename Less> auto ParallelHashTable<T, P, B, Hash, Equals, Less>::begin() const -> iterator { return iterator::createBegin( tables.get(), &overflow ); } template<typename T, std::size_t P, std::size_t B, typename Hash, typename Equals, typename Less> auto ParallelHashTable<T, P, B, Hash, Equals, Less>::end() const -> iterator { return iterator::createEnd( tables.get(), &overflow ); } template <typename T, std::size_t P, std::size_t B, typename Hash, typename Equals, typename Less> auto ParallelHashTable<T, P, B, Hash, Equals, Less>::insert( T data ) -> iterator { const auto hashCode = hash( data ); const auto partition = tableIndex( hashCode ); auto& table = tables->at( partition ); const auto begin = table.stats.collisions; const auto index = table.insertIndex( data, hashCode ); const auto end = table.stats.collisions; stats.collisions += end - begin; if ( index == B ) { std::unique_lock<std::shared_mutex> lock( mutex ); const auto pair = overflow.insert( std::move( data ) ); if ( pair.second ) ++stats.overflow; return iterator::createOverflow( tables.get(), &overflow, pair.first ); } auto ptr = table.at( index ); if ( !ptr || !ptr->has_value() ) { table.insert( std::move( data ), index ); std::unique_lock<std::shared_mutex> lock( mutex ); ++stats.occupied; } return iterator::createTable( tables.get(), &overflow, partition, index ); } template <typename T, std::size_t P, std::size_t B, typename Hash, typename Equals, typename Less> auto ParallelHashTable<T, P, B, Hash, Equals, Less>::find( const T& data ) const -> iterator { const auto hashCode = hash( data ); const auto partition = tableIndex( hashCode ); const auto index = tables->at( partition ).find( data, hashCode ); if ( index == B + 1 ) return end(); // does not exist if ( index == B ) { std::shared_lock<std::shared_mutex> lock( mutex ); const auto iter = overflow.find( std::move( data ) ); return iterator::createOverflow( tables.get(), &overflow, iter ); } return iterator::createTable( tables.get(), &overflow, partition, index ); } template <typename T, std::size_t P, std::size_t B, typename Hash, typename Equals, typename Less> auto ParallelHashTable<T, P, B, Hash, Equals, Less>::erase( const T& data ) -> iterator { const auto hashCode = hash( data ); const auto partition = tableIndex( hashCode ); const auto index = tables->at( partition ).erase( data, hashCode ); std::unique_lock<std::shared_mutex> lock( mutex ); if ( index >= B ) { const auto iter = overflow.find( data ); if ( iter != overflow.cend() ) { const auto oiter = overflow.erase( iter ); --stats.overflow; return iterator::createOverflow( tables.get(), &overflow, oiter ); } } --stats.occupied; return iterator::createTable( tables.get(), &overflow, partition, index ); } template <typename T, std::size_t P, std::size_t B, typename Hash, typename Equals, typename Less> void ParallelHashTable<T, P, B, Hash, Equals, Less>::clear() { for ( auto iter = tables->begin(); iter != tables->end(); ++iter ) iter->clear(); std::unique_lock<std::shared_mutex> lock( mutex ); overflow.clear(); stats.clear(); } template <typename T, std::size_t P, std::size_t B, typename Hash, typename Equals, typename Less> std::size_t ParallelHashTable<T, P, B, Hash, Equals, Less>::tableIndex( const std::size_t hashCode ) const { const auto phash = tableHash( hashCode ); const auto partition = phash % P; return partition; }
Test1
A simple test using the parallel hash table as a regular hash table in the same manner as an open addressing based hash table implementation.
// ParallelHashTable.cpp #include "catch.hpp" #include "functions.h" #include "../main/ParallelHashTable.h" #include <chrono> #include <iostream> SCENARIO( "Generating statistics for parallel hash tables with varying capacity" ) { using std::chrono::duration_cast; using std::chrono::high_resolution_clock; using std::chrono::microseconds; GIVEN( "A hash table of strings with capacity 10000" ) { constexpr std::size_t size = 10000; csc280::ParallelHashTable<std::string, 10, 1000> hash; WHEN( "Adding same number of random strings as table size" ) { const auto t1 = high_resolution_clock::now(); for ( std::size_t i = 0; i < size; ++i ) { auto value = csc280::test::generateRandomString(); while ( hash.find( value ) != hash.cend() ) value = csc280::test::generateRandomString(); hash.insert( std::move( value ) ); } REQUIRE( size == hash.size() ); csc280::test::ofs << "Parallel hash table of strings stats for capacity 10000\n" << hash.getStats(); const auto t2 = high_resolution_clock::now(); const auto duration = std::chrono::duration_cast<microseconds>( t2 - t1 ).count(); std::cout << "Time to add " << size << " strings to parallel hash table: " << duration << " microseconds\n"; } AND_WHEN( "Finding each string added to table" ) { std::vector<std::string> values; values.reserve( size ); for ( std::size_t i = 0; i < size; ++i ) { auto value = csc280::test::generateRandomString(); while ( hash.find( value ) != hash.cend() ) value = csc280::test::generateRandomString(); values.push_back( value ); hash.insert( std::move( value ) ); } REQUIRE( size == hash.size() ); const auto t1 = high_resolution_clock::now(); const auto iter = hash.cend(); for ( const auto& value : values ) { REQUIRE( hash.find( value ) != iter ); } const auto t2 = high_resolution_clock::now(); const auto duration = duration_cast<microseconds>( t2 - t1 ).count(); std::cout << "Time to find " << size << " strings in parallel hash table: " << duration << " microseconds\n"; } AND_WHEN( "Finding each string added to table of double capacity" ) { std::vector<std::string> values; values.reserve( size / 2 ); for ( std::size_t i = 0; i < size / 2; ++i ) { auto value = csc280::test::generateRandomString(); while ( hash.find( value ) != hash.cend() ) value = csc280::test::generateRandomString(); values.push_back( value ); hash.insert( std::move( value ) ); } const auto t1 = high_resolution_clock::now(); const auto iter = hash.cend(); for ( const auto& value : values ) { REQUIRE( hash.find( value ) != iter ); } const auto t2 = high_resolution_clock::now(); const auto duration = duration_cast<microseconds>( t2 - t1 ).count(); std::cout << "Time to find " << size / 2 << " strings in parallel hash table of capacity " << size << ": " << duration << " microseconds\n"; } } csc280::test::ofs.flush(); }
Test2
A simple test suite for the parallel hash table when used concurrently using multiple threads to read and write to the table.
// MultiThreaded.cpp #include "catch.hpp" #include "functions.h" #include "../main/ParallelHashTable.h" #include <chrono> #include <execution> #include <future> #include <iostream> #include <iostream> using HashTable = csc280::ParallelHashTable<std::string, 10, 1000>; using Vector = std::vector<std::string>; constexpr std::size_t size = 10000; namespace csc280::mt { Vector createStrings( HashTable& hash ) { Vector values; values.reserve( 10000 ); const auto func = [&hash]() { Vector values; values.reserve( 2000 ); for ( std::size_t i = 0; i < 2000; ++i ) { auto value = csc280::test::generateRandomString(); while ( hash.find( value ) != hash.cend() ) value = csc280::test::generateRandomString(); values.push_back( value ); hash.insert( std::move( value ) ); } return values; }; auto fut1 = std::async( std::launch::async, func); auto fut2 = std::async( std::launch::async, func); auto fut3 = std::async( std::launch::async, func); auto fut4 = std::async( std::launch::async, func); auto fut5 = std::async( std::launch::async, func); const auto&& v1 = fut1.get(); values.insert( values.end(), v1.begin(), v1.end() ); const auto&& v2 = fut2.get(); values.insert( values.end(), v2.begin(), v2.end() ); const auto&& v3 = fut3.get(); values.insert( values.end(), v3.begin(), v3.end() ); const auto&& v4 = fut4.get(); values.insert( values.end(), v4.begin(), v4.end() ); const auto&& v5 = fut5.get(); values.insert( values.end(), v5.begin(), v5.end() ); return values; } void findValues( const HashTable& hash, const Vector& values ) { const auto iter = hash.cend(); std::for_each( std::execution::par_unseq, values.begin(), values.end(), [&hash, &iter] ( auto&& value ) { const auto f = hash.find( value ); REQUIRE( f != iter ); } ); } } SCENARIO( "Generating statistics for parallel hash tables with varying capacity using multiple threads" ) { using std::chrono::duration_cast; using std::chrono::high_resolution_clock; using std::chrono::microseconds; GIVEN( "A hash table of strings with capacity 10000" ) { HashTable hash; WHEN( "Adding same number of random strings as table size using multiple threads" ) { const auto t1 = high_resolution_clock::now(); csc280::mt::createStrings( hash ); REQUIRE( size == hash.size() ); csc280::test::ofs << "Parallel hash table of strings stats for capacity 10000 using multiple threads\n" << hash.getStats(); const auto t2 = high_resolution_clock::now(); const auto duration = std::chrono::duration_cast<microseconds>( t2 - t1 ).count(); std::cout << "Time to add " << size << " strings to parallel hash table using multiple threads: " << duration << " microseconds\n"; } AND_WHEN( "Finding each string added to table" ) { const auto values = csc280::mt::createStrings( hash ); const auto t1 = high_resolution_clock::now(); csc280::mt::findValues( hash, values ); const auto t2 = high_resolution_clock::now(); const auto duration = duration_cast<microseconds>( t2 - t1 ).count(); std::cout << "Time to find " << size << " strings in parallel hash table using multiple threads: " << duration << " microseconds\n"; } AND_WHEN( "Finding each string added to table of double capacity" ) { Vector values; values.reserve( size / 2 ); for ( std::size_t i = 0; i < size / 2; ++i ) { auto value = csc280::test::generateRandomString(); while ( hash.find( value ) != hash.cend() ) value = csc280::test::generateRandomString(); values.push_back( value ); hash.insert( std::move( value ) ); } const auto t1 = high_resolution_clock::now(); csc280::mt::findValues( hash, values ); const auto t2 = high_resolution_clock::now(); const auto duration = duration_cast<microseconds>( t2 - t1 ).count(); std::cout << "Time to find " << size / 2 << " strings in parallel hash table of capacity " << size << " using multiple threads: " << duration << " microseconds\n"; } } csc280::test::ofs.flush(); }