Lab 7 - Circular Buffer
In this lab session we will develop a simple circular buffer and perform some rudimentary tests around our implementation. We will also test the use of the circular buffer as a fixed size message queue in a pub-sub (publisher-subscriber) model.
Circular Buffer
// CircularBuffer.h #pragma once #include <array> #include <memory> #include <mutex> namespace csc280 { template <typename T, std::size_t S> class CircularBuffer { using Array = std::array<T, S>; using ArrayPtr = std::unique_ptr<Array>; public: CircularBuffer(); ~CircularBuffer() = default; CircularBuffer( const CircularBuffer& ) = delete; CircularBuffer& operator= ( const CircularBuffer& ) = delete; void put( T item ); T pop(); void reset() { std::lock_guard<std::mutex> lock( mutex ); head = tail; } bool empty() const { return head == tail; } bool full() const { return ( ( head + 1 ) % S ) == tail; } std::size_t size() const { return S - 1; } private: std::mutex mutex; ArrayPtr array; std::size_t head = 0; std::size_t tail = 0; }; #include "private/CBImpl.h" }
Circular Buffer Implementation
// private/CBImpl.h template <typename T, std::size_t S> CircularBuffer<T, S>::CircularBuffer() : array{ std::make_unique<Array>() } {} template <typename T, std::size_t S> void CircularBuffer<T, S>::put( T item ) { std::lock_guard<std::mutex> lock( mutex ); ( *array.get() )[head] = std::move( item ); head = ( head + 1 ) % S; if ( head == tail ) tail = ( tail + 1 ) % S; } template <typename T, std::size_t S> T CircularBuffer<T, S>::pop() { std::lock_guard<std::mutex> lock( mutex ); if ( empty() ) return T(); auto val = array->at( tail ); tail = ( tail + 1 ) % S; return val; }
Test
// CircularBuffer.cpp #include "catch.hpp" #include "../main/CircularBuffer.h" #include <atomic> #include <chrono> #include <iostream> #include <thread> SCENARIO( "Circular buffer operations on numbers" ) { GIVEN( "A circular buffer of numbers" ) { csc280::CircularBuffer<uint32_t, 10> buffer; WHEN( "Adding and consuming a value in the buffer" ) { const uint32_t value = 100; buffer.put( value ); const uint32_t read = buffer.pop(); REQUIRE( read == value ); REQUIRE( buffer.empty() ); REQUIRE( !buffer.full() ); } WHEN( "Adding maximum number of values to buffer" ) { for ( std::size_t i = 0; i < buffer.size(); ++i ) buffer.put( i ); REQUIRE( buffer.full() ); while ( !buffer.empty() ) buffer.pop(); REQUIRE( buffer.empty() ); REQUIRE( !buffer.full() ); } WHEN( "Adding double the number of values to buffer" ) { for ( std::size_t i = 0; i < 2 * buffer.size(); ++i ) buffer.put( i ); REQUIRE( buffer.full() ); std::size_t count = 0; while ( !buffer.empty() ) { buffer.pop(); ++count; } REQUIRE( count == buffer.size() ); } WHEN( "Adding 5 more than number of values to buffer" ) { for ( std::size_t i = 0; i < buffer.size() + 5; ++i ) buffer.put( i ); REQUIRE( buffer.full() ); std::size_t count = 0; while ( !buffer.empty() ) { buffer.pop(); ++count; } REQUIRE( count == buffer.size() ); } } } SCENARIO( "Using circular buffer in a pub-sub model" ) { GIVEN( "A circular buffer of numbers" ) { csc280::CircularBuffer<uint32_t, 1000> buffer; const std::size_t sleepInterval = 10; std::atomic_bool publishing = true; WHEN( "Publishing numbers at twice the speed of subscribers" ) { auto publisher = std::thread( [&buffer, &publishing, sleepInterval]() { for ( std::size_t i = 0; i < 2 * buffer.size(); ++i ) { buffer.put( i ); if ( !( i % 100 ) ) std::cout << "Published " << i << " numbers\n"; std::this_thread::sleep_for( std::chrono::milliseconds( sleepInterval ) ); } publishing = false; } ); auto subscriber = std::thread( [&buffer, &publishing, sleepInterval]() { if ( buffer.empty() ) { std::this_thread::sleep_for( std::chrono::microseconds( sleepInterval ) ); } std::size_t count = 0; while ( !buffer.empty() ) { buffer.pop(); if ( !( ++count % 100 ) ) std::cout << "Consumed " << count << " numbers\n"; const std::size_t interval = publishing ? 2 * sleepInterval : sleepInterval / 2; std::this_thread::sleep_for( std::chrono::milliseconds( interval ) ); } } ); publisher.join(); subscriber.join(); while ( !buffer.empty() ) buffer.pop(); REQUIRE( buffer.empty() ); } } }