Sans Pareil Technologies, Inc.

Key To Your Business

Akumuli Client


Akumuli is a high performance time series database. It supports a simple TCP/UDP as well as the OpenTSDB telnet style protocol. There is no official C++ client library for the protocol, however the protocol is extremely simple and easy to implement. Data is written to Akumuli using the Redis serialisation protocol (resp) format.

Here we show a very simple data model and client using boost.asio for storing data in Akumuli.

Data Model

// timeseries.h
#pragma once

#include <string>
#include <sstream>
#include <tuple>
#include <vector>

namespace spt::akumuli
{
  using Tag = std::pair<std::string, std::string>;
  using Tags = std::vector<Tag>;

  template <typename Value, char ValueType>
  struct TimeSeries
  {
    TimeSeries()
    {
      tags.reserve( 16 );
      timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>( std::chrono::system_clock::now().time_since_epoch() );
    }

    TimeSeries( std::string name, Value value ) :
        name{ std::move( name ) }, value{ value }
    {
      tags.reserve( 16 );
      timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>( std::chrono::system_clock::now().time_since_epoch() );
    }

    TimeSeries( std::string name, Tags tags, Value value ) :
        name{ std::move( name ) }, tags{ std::move( tags ) }, value{ value }
    {
      timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>( std::chrono::system_clock::now().time_since_epoch() );
    }

    TimeSeries( std::string name, Tags tags, std::chrono::nanoseconds timestamp, Value value ) :
        name{ std::move( name ) }, tags{ std::move( tags ) },
        timestamp{ timestamp }, value{ value } {}

    TimeSeries& add( Tag tag )
    {
      tags.emplace_back( std::move( tag ) );
      return *this;
    }

    TimeSeries& add( std::string key, std::string tag )
    {
      tags.emplace_back( std::move( key ), std::move( tag ) );
      return *this;
    }

    std::string resp() const
    {
      std::ostringstream ss;
      ss << '+' << name;

      for ( const auto& tag : tags )
      {
        ss << ' ' << tag.first << '=' <<  tag.second;
      }

      ss << "\r\n";

      ss << ':' << timestamp.count() << "\r\n";
      ss << ValueType << value << "\r\n";

      return ss.str();
    }

    std::chrono::nanoseconds getTimestamp() { return timestamp; }

    TimeSeries(const TimeSeries&) = delete;
    TimeSeries& operator=(const TimeSeries&) = delete;

    TimeSeries(TimeSeries&&) = default;
    TimeSeries& operator=(TimeSeries&&) = default;
    ~TimeSeries() = default;

  private:
    std::string name;
    Tags tags;
    std::chrono::nanoseconds timestamp;
    Value value;
  };

  using IntegerSeries = TimeSeries<int64_t, ':'>;
  using DoubleSeries = TimeSeries<double , '+'>;

  template <typename V, char VT>
  std::ostream& operator<<( std::ostream& s, const TimeSeries<V, VT>& ts )
  {
    s << ts.resp();
    return s;
  }
}

Client


A simple blocking client implementation. Uses synchronous writes with asynchronous reads to handle any error messages from Akumuli. Akumuli closes the socket connection on error, hence the client reconnects to Akumuli when an error is encountered.
// akumuli.h

#pragma once

#include <boost/asio.hpp>

#include "timeseries.h"
#include "log/NanoLog.h"

namespace spt::akumuli
{
  struct Akumuli
  {
    Akumuli( boost::asio::io_context& context, const std::string& host,
        int port ) :
        socket{ context }, reconnectTimer{ context }, deadline{ context }
    {
      boost::asio::ip::tcp::resolver resolver{ context };
      endpoints = resolver.resolve( host, std::to_string( port ));
    }

    void connect()
    {
      startConnection( endpoints.begin());
      deadline.async_wait( std::bind( &Akumuli::checkDeadline, this ));
    }

    template<typename Value, char ValueType>
    void addSeries( const TimeSeries<Value, ValueType>& ts )
    {
      doWrite( ts.resp());
    }

    void stop()
    {
      stopped.store( true );
      boost::system::error_code ignored;
      socket.close( ignored );
      reconnectTimer.cancel();
    }

    Akumuli( const Akumuli& ) = delete;

    Akumuli& operator=( const Akumuli& ) = delete;

    ~Akumuli() = default;

  private:
    void doWrite( const std::string& data )
    {
      if ( !connected )
      {
        connect();

        // TODO: use synchronous connection to avoid this nonsense.
        while ( !stopped.load() && !connected )
        {
          std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) );
        }
      }

      boost::system::error_code error;
      boost::asio::write( socket, boost::asio::buffer( data ), error );
      if ( error )
      {
        LOG_WARN << "Error sending data to akumuli " << error.message();
        doStop();
      }
    }

    void startConnection( boost::asio::ip::tcp::resolver::results_type::iterator iter )
    {
      if ( iter == endpoints.end()) return stop();

      reconnectTimer.expires_after( std::chrono::seconds( 60 ));
      socket.async_connect( iter->endpoint(),
          std::bind( &Akumuli::handleConnect, this, std::placeholders::_1,
              iter ));
    }

    void restartConnection( const boost::system::error_code& error )
    {
      if ( stopped.load()) return;

      if ( error )
      {
        LOG_WARN << error.message();
        return;
      }

      startConnection( endpoints.begin());
    }

    void handleConnect( const boost::system::error_code& error,
        boost::asio::ip::tcp::resolver::results_type::iterator iter )
    {
      if ( stopped.load()) return;

      if ( !socket.is_open())
      {
        LOG_WARN << "Connect timed out";

        // Try the next available endpoint.
        startConnection( ++iter );
      }
      else if ( error )
      {
        LOG_WARN << "Connect error: " << error.message();

        try
        {
          socket.close();
        }
        catch ( const std::exception& ex )
        {
          LOG_WARN << ex.what();
        }

        startConnection( ++iter );
      }
      else
      {
        std::ostringstream oss;
        oss << iter->endpoint();
        if ( initial ) LOG_INFO << "Connected to " << oss.str();
        initial = false;
        startRead();
        connected = true;
      }
    }

    void doStop()
    {
      socket.close();
      connected = false;
      reconnectTimer.expires_from_now( std::chrono::seconds{ 2 } );
      reconnectTimer.async_wait(
          std::bind( &Akumuli::restartConnection, this,
              std::placeholders::_1 ));
    }

    void startRead()
    {
      deadline.expires_after( std::chrono::seconds( 30 ));

      boost::asio::async_read_until( socket,
          boost::asio::dynamic_buffer( inputBuffer ), '\n',
          std::bind( &Akumuli::handleRead, this, std::placeholders::_1,
              std::placeholders::_2 ));
    }

    void handleRead( const boost::system::error_code& error, std::size_t n )
    {
      if ( stopped.load()) return;
      if ( !error )
      {
        std::string line( inputBuffer.substr( 0, n - 1 ));
        inputBuffer.erase( 0, n );
        if ( !line.empty())
        {
          LOG_WARN << "Akumuli write error\n" << line;
          doStop();
        }
      }
    }

    void checkDeadline()
    {
      if ( stopped.load()) return;

      if ( deadline.expiry() <= boost::asio::steady_timer::clock_type::now() )
      {
        deadline.expires_after( std::chrono::seconds( 30 ));
      }

      deadline.async_wait( std::bind( &Akumuli::checkDeadline, this ) );
    }

  private:
    boost::asio::ip::tcp::resolver::results_type endpoints;
    boost::asio::ip::tcp::socket socket;
    boost::asio::steady_timer reconnectTimer;
    boost::asio::steady_timer deadline;
    std::string inputBuffer;;
    std::atomic_bool stopped{ false };
    bool connected = false;
    bool initial = true;
  };
}