Sans Pareil Technologies, Inc.

Key To Your Business

Lab 7 - Circular Buffer


In this lab session we will develop a simple circular buffer and perform some rudimentary tests around our implementation. We will also test the use of the circular buffer as a fixed size message queue in a pub-sub (publisher-subscriber) model.
Circular Buffer 
// CircularBuffer.h
#pragma once

#include <array>
#include <memory>
#include <mutex>

namespace csc280
{
  template <typename T, std::size_t S>
  class CircularBuffer
  {
    using Array = std::array<T, S>;
    using ArrayPtr = std::unique_ptr<Array>;

  public:
    CircularBuffer();
    ~CircularBuffer() = default;

    CircularBuffer( const CircularBuffer& ) = delete;
    CircularBuffer& operator= ( const CircularBuffer& ) = delete;

    void put( T item );
    T pop();

    void reset()
    {
      std::lock_guard<std::mutex> lock( mutex );
      head = tail;
    }

    bool empty() const { return head == tail; }
    bool full() const { return ( ( head + 1 ) % S ) == tail; }
    std::size_t size() const { return S - 1; }

  private:
    std::mutex mutex;
    ArrayPtr array;
    std::size_t head = 0;
    std::size_t tail = 0;
  };

#include "private/CBImpl.h"
}
Circular Buffer Implementation 
// private/CBImpl.h
template <typename T, std::size_t S>
CircularBuffer<T, S>::CircularBuffer() : array{ std::make_unique<Array>() } {}

template <typename T, std::size_t S>
void CircularBuffer<T, S>::put( T item )
{
  std::lock_guard<std::mutex> lock( mutex );

  ( *array.get() )[head] = std::move( item );
  head = ( head + 1 ) % S;
  if ( head == tail ) tail = ( tail + 1 ) % S;
}

template <typename T, std::size_t S>
T CircularBuffer<T, S>::pop()
{
  std::lock_guard<std::mutex> lock( mutex );
  if ( empty() ) return T();

  auto val = array->at( tail );
  tail = ( tail + 1 ) % S;

  return val;
}
Test 
// CircularBuffer.cpp
#include "catch.hpp"
#include "../main/CircularBuffer.h"

#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>

SCENARIO( "Circular buffer operations on numbers" )
{
  GIVEN( "A circular buffer of numbers" )
  {
    csc280::CircularBuffer<uint32_t, 10> buffer;

    WHEN( "Adding and consuming a value in the buffer" )
    {
      const uint32_t value = 100;
      buffer.put( value );
      const uint32_t read = buffer.pop();
      REQUIRE( read == value );
      REQUIRE( buffer.empty() );
      REQUIRE( !buffer.full() );
    }

    WHEN( "Adding maximum number of values to buffer" )
    {
      for ( std::size_t i = 0; i < buffer.size(); ++i ) buffer.put( i );
      REQUIRE( buffer.full() );

      while ( !buffer.empty() ) buffer.pop();
      REQUIRE( buffer.empty() );
      REQUIRE( !buffer.full() );
    }

    WHEN( "Adding double the number of values to buffer" )
    {
      for ( std::size_t i = 0; i < 2 * buffer.size(); ++i ) buffer.put( i );
      REQUIRE( buffer.full() );

      std::size_t count = 0;
      while ( !buffer.empty() )
      {
        buffer.pop();
        ++count;
      }

      REQUIRE( count == buffer.size() );
    }

    WHEN( "Adding 5 more than number of values to buffer" )
    {
      for ( std::size_t i = 0; i < buffer.size() + 5; ++i ) buffer.put( i );
      REQUIRE( buffer.full() );

      std::size_t count = 0;
      while ( !buffer.empty() )
      {
        buffer.pop();
        ++count;
      }

      REQUIRE( count == buffer.size() );
    }
  }
}

SCENARIO( "Using circular buffer in a pub-sub model" )
{
  GIVEN( "A circular buffer of numbers" )
  {
    csc280::CircularBuffer<uint32_t, 1000> buffer;
    const std::size_t sleepInterval = 10;
    std::atomic_bool publishing = true;

    WHEN( "Publishing numbers at twice the speed of subscribers" )
    {
      auto publisher = std::thread( [&buffer, &publishing, sleepInterval]()
      {
        for ( std::size_t i = 0; i < 2 * buffer.size(); ++i )
        {
          buffer.put( i );
          if ( !( i % 100 ) ) std::cout << "Published " << i << " numbers\n";
          std::this_thread::sleep_for( std::chrono::milliseconds( sleepInterval ) );
        }

        publishing = false;
      } );

      auto subscriber = std::thread( [&buffer, &publishing, sleepInterval]()
      {
        if ( buffer.empty() )
        {
          std::this_thread::sleep_for( std::chrono::microseconds( sleepInterval ) );
        }

        std::size_t count = 0;
        while ( !buffer.empty() )
        {
          buffer.pop();
          if ( !( ++count % 100 ) ) std::cout << "Consumed " << count << " numbers\n";
          const std::size_t interval = publishing ? 2 * sleepInterval : sleepInterval / 2;
          std::this_thread::sleep_for( std::chrono::milliseconds( interval ) );
        }
      } );

      publisher.join();
      subscriber.join();
      while ( !buffer.empty() ) buffer.pop();
      REQUIRE( buffer.empty() );
    }
  }
}