Sans Pareil Technologies, Inc.

Key To Your Business

Lesson 18 - Threads


In computer science, a thread of execution is the smallest sequence of programmed instructions that can be managed independently by a scheduler, which is typically a part of the operating system. The implementation of threads and processes differs between operating systems, but in most cases a thread is a component of a process. Multiple threads can exist within one process, executing concurrently and sharing resources such as memory, while different processes do not share these resources. In particular, the threads of a process share its executable code and the values of its variables at any given time.

Systems with a single processor generally implement multithreading by time slicing: the central processing unit (CPU) switches between different software threads. This context switching generally happens very often and rapidly enough that users perceive the threads or tasks as running in parallel. On a multiprocessor or multi-core system, multiple threads can execute in parallel, with every processor or core executing a separate thread simultaneously; on a processor or core with hardware threads, separate software threads can also be executed concurrently by separate hardware threads.

Threads vs. Processes


Threads differ from traditional multitasking operating system processes in that:
  • processes are typically independent, while threads exist as subsets of a process
  • processes carry considerably more state information than threads, whereas multiple threads within a process share process state as well as memory and other resources
  • processes have separate address spaces, whereas threads share their address space
  • processes interact only through system-provided inter-process communication mechanisms
  • context switching between threads in the same process is typically faster than context switching between processes.

Multithreading


Multithreading is mainly found in multitasking operating systems. Multithreading is a widespread programming and execution model that allows multiple threads to exist within the context of one process. These threads share the process's resources, but are able to execute independently. The threaded programming model provides developers with a useful abstraction of concurrent execution. Multithreading can also be applied to one process to enable parallel execution on a multiprocessing system.

Multithreaded applications have the following advantages:
  • Responsiveness: multithreading can allow an application to remain responsive to input. In a one-thread program, if the main execution thread blocks on a long-running task, the entire application can appear to freeze. By moving such long-running tasks to a worker thread that runs concurrently with the main execution thread, it is possible for the application to remain responsive to user input while executing tasks in the background. On the other hand, in most cases multithreading is not the only way to keep a program responsive, with non-blocking I/O and/or Unix signals being available for gaining similar results.
  • Faster execution: this advantage of a multithreaded program allows it to operate faster on computer systems that have multiple central processing units (CPUs) or one or more multi-core processors, or across a cluster of machines, because the threads of the program naturally lend themselves to parallel execution, assuming sufficient independence (that they do not need to wait for each other).
  • Lower resource consumption: using threads, an application can serve multiple clients concurrently using fewer resources than it would need when using multiple process copies of itself. For example, the Apache HTTP server uses thread pools: a pool of listener threads for listening to incoming requests, and a pool of server threads for processing those requests.
  • Better system utilisation: as an example, a file system using multiple threads can achieve higher throughput and lower latency since data in a faster medium (such as cache memory) can be retrieved by one thread while another thread retrieves data from a slower medium (such as external storage) with neither thread waiting for the other to finish.
  • Simplified sharing and communication: unlike processes, which require a message passing or shared memory mechanism to perform inter-process communication (IPC), threads can communicate through data, code and files they already share.
  • Parallelisation: applications looking to use multicore or multi-CPU systems can use multithreading to split data and tasks into parallel subtasks and let the underlying architecture manage how the threads run, either concurrently on one core or in parallel on multiple cores. GPU computing environments like CUDA and OpenCL use the multithreading model where dozens to hundreds of threads run in parallel across data on a large number of cores.

Multithreading has the following drawbacks:
  • Synchronisation: since threads share the same address space, the programmer must be careful to avoid race conditions and other non-intuitive behaviours. In order for data to be correctly manipulated, threads will often need to rendezvous in time in order to process the data in the correct order. Threads may also require mutually exclusive operations (often implemented using semaphores) in order to prevent common data from being simultaneously modified or read while in the process of being modified. Careless use of such primitives can lead to deadlocks.
  • Thread crashes a process: an illegal operation performed by a thread crashes the entire process; therefore, one misbehaving thread can disrupt the processing of all the other threads in the application.

C++ Standard Library Support


The std::thread class represents a single thread of execution. Threads allow multiple functions to execute concurrently.

Threads begin execution immediately upon construction of the associated thread object (pending any OS scheduling delays), starting at the top-level function provided as a constructor argument. The return value of the top-level function is ignored and if it terminates by throwing an exception, std::terminate is called. The top-level function may communicate its return value or an exception to the caller via std::promise or by modifying shared variables (which may require synchronisation, see std::mutex and std::atomic)

std::thread objects may also be in the state that does not represent any thread (after default construction, move from, detach, or join), and a thread of execution may be not associated with any thread objects (after detach).

No two std::thread objects may represent the same thread of execution; std::thread is not CopyConstructible or CopyAssignable, although it is MoveConstructible and MoveAssignable.

Low Level API


Direct usage of std::thread, coupled with mutexes, condition variables etc. form the basis for writing multi-threaded code. This model is familiar for any programmer who has written low level multi-threaded code (in any language) since this maps almost directly to how operating systems support threads.

Creating threads


std::thread presents three forms of constructors:
  • default - Just creates a thread object. Does not do any work or start any execution, ie. it does not (yet) represent a thread of execution.
  • move - Moves the specified thread object to the newly constructed instance. The passed in thread no longer represents a thread of execution.
  • designated initialiser - The third form of the constructor takes as input a function and any additional parameters that needed to be passed to the function. This form creates a new thread object and associates it with a thread of execution. The execution unit will be the function passed in invoked with the specified parameters.

The arguments to the thread function are moved or copied by value. If a reference argument needs to be passed to the thread function, it has to be wrapped (e.g. with std::ref or std::cref).

#include <iostream>
#include <utility>
#include <thread>
#include <chrono>
#include <functional>
#include <atomic>

void f1(int n)
{
  for (int i = 0; i < 10; ++i)
  {
    std::cout << "Thread 1 executing\n";
    ++n;
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
  }
}

void f2(int& n)
{
  for (int i = 0; i < 5; ++i)
  {
    std::cout << "Thread 2 executing\n";
    ++n;
    std::this_thread::sleep_for(std::chrono::milliseconds(20));
  }
}

int main()
{
  int n = 0;
  std::thread t1; // t1 is not a thread
  std::thread t2(f1, n + 1); // pass by value
  std::thread t3(f2, std::ref(n)); // pass by reference
  std::thread t4(std::move(t3)); // t4 is now running f2(). t3 is no longer a thread
  t2.join();
  t4.join();
  std::cout << "Final value of n is " << n << '\n';
}

#include <vector>
#include <thread>
#include <iostream>

const std::size_t nloop = 11;

void serial()
{
  std::cout << "serial:" << std::endl;
  for ( int i = 0; i < nloop; ++i )
  {
    const int j = i*i;
    std::cout << j << std::endl;
  }
  std::cout << std::endl;
}

void parallel()
{
  const size_t nthreads = std::thread::hardware_concurrency();
  std::cout << "parallel (" << nthreads << " threads):" << std::endl;

  std::vector<std::thread> threads( nthreads );
  std::mutex critical;
  for ( int i = 0; i < nthreads; ++i )
  {
    threads[i] = std::thread
      (
        std::bind
        (
          [&](const int bi, const int ei, const int t)
          {
            for ( int i = bi; i < ei; ++i )
            {
              const int j = i*i;
              // (optional) make output critical
              std::lock_guard<std::mutex> lock(critical);
              std::cout << j << std::endl;
            }
          },
          i * nloop/nthreads,
          (i+1) == nthreads ? nloop : (i+1)*nloop/nthreads,
          i
        )
    );
  }

  std::for_each( threads.begin(),threads.end(),
      [](std::thread& x){ x.join(); } );
  std::cout << std::endl;
}

int main()
{
  serial();
  parallel();
}


Mutual Exclusion


In computer science, mutual exclusion is a property of concurrency control, which is instituted for the purpose of preventing race conditions; it is the requirement that one thread of execution never enter its critical section at the same time that another concurrent thread of execution enters its own critical section.

The std::mutex class is a synchronisation primitive that can be used to protect shared data from being simultaneously accessed by multiple threads.
std::mutex offers exclusive, non-recursive ownership semantics:
  • A calling thread owns a mutex from the time that it successfully calls either lock or try_lock until it calls unlock.
  • When a thread owns a mutex, all other threads will block (for calls to lock) or receive a false return value (for try_lock) if they attempt to claim ownership of the mutex.
  • A calling thread must not own the mutex prior to calling lock or try_lock.
The behaviour of a program is undefined if a mutex is destroyed while still owned by any threads, or a thread terminates while owning a mutex.

#include <iostream>
#include <map>
#include <string>
#include <chrono>
#include <thread>
#include <mutex>

std::map<std::string, std::string> g_pages;
std::mutex g_pages_mutex;

void save_page(const std::string& url)
{
  // simulate a long page fetch
  std::this_thread::sleep_for(std::chrono::seconds(2));
  std::string result = "fake content";

  std::lock_guard<std::mutex> guard(g_pages_mutex);
  g_pages[url] = result;
}

int main()
{
  std::thread t1(save_page, "http://foo");
  std::thread t2(save_page, "http://bar");
  t1.join();
  t2.join();

  // safe to access g_pages without lock now, as the threads are joined
  for (const auto& pair : g_pages)
  {
    std::cout << pair.first << " => " << pair.second << std::endl;
  }
}



Condition Variable


The std::condition_variable class is a synchronisation primitive that can be used to block a thread, or multiple threads at the same time, until another thread both modifies a shared variable (the condition), and notifies the condition_variable.

The thread that intends to modify the variable has to
  1. acquire a std::mutex (typically via std::lock_guard)
  2. perform the modification while the lock is held
  3. execute notify_one or notify_all on the std::condition_variable (the lock does not need to be held for notification)

Even if the shared variable is atomic, it must be modified under the mutex in order to correctly publish the modification to the waiting thread.
Any thread that intends to wait on std::condition_variable has to
  1. acquire a std::unique_lock<std::mutex>, on the same mutex as used to protect the shared variable
  2. execute wait, wait_for, or wait_until. The wait operations atomically release the mutex and suspend the execution of the thread.
  3. When the condition variable is notified, a timeout expires, or a spurious wakeup occurs, the thread is awakened, and the mutex is atomically reacquired. The thread should then check the condition and resume waiting if the wake up was spurious.

std::condition_variable works only with std::unique_lock<std::mutex>; this restriction allows for maximal efficiency on some platforms. std::condition_variable_any provides a condition variable that works with any BasicLockable object, such as std::shared_lock.
Condition variables permit concurrent invocation of the wait, wait_for, wait_until, notify_one and notify_all member functions.

#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;

void worker_thread()
{
  // Wait until main() sends data
  std::unique_lock<std::mutex> lk(m);
  cv.wait(lk, []{return ready;});

  // after the wait, we own the lock.
  std::cout << "Worker thread is processing data\n";
  data += " after processing";

  // Send data back to main()
  processed = true;
  std::cout << "Worker thread signals data processing completed\n";

  // Manual unlocking is done before notifying, to avoid waking up
  // the waiting thread only to block again (see notify_one for details)
  lk.unlock();
  cv.notify_one();
}

int main()
{
  std::thread worker(worker_thread);

  data = "Example data";
  // send data to the worker thread
  {
    std::lock_guard<std::mutex> lk(m);
    ready = true;
    std::cout << "main() signals data ready for processing\n";
  }
  cv.notify_one();

  // wait for the worker
  {
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{return processed;});
  }
  std::cout << "Back in main(), data = " << data << '\n';

  worker.join();
}



Higher Level API


Working at the level of threads and locks can be quite tricky. Modern frameworks offer a higher level of abstraction, in the form of task-based concurrency. A task represents an asynchronous operation that can run in parallel with other operations, and the system hides the details of how this parallelism is implemented.

Futures & Promises


The C++11 library, in its new <future> header, also provides support for task-based parallelism, in the form of promises and futures. The classes std::promise<T> and std::future<T> are roughly the C++ equivalent of a .NET Task<T>, or of a Future<T> of Java. They work in pairs to separate the act of calling a function from the act of waiting for the call results.

At the caller-side when we call the asynchronous function we do not receive a result of type T. What is returned instead is a std::future<T>, a placeholder for the result, which will be delivered at some point in time, later.

Once we get our future we can move on doing other work, while the task executes on a separate thread.

A std::promise<T> object represents a result in the callee-side of the asynchronous call, and it is the channel for passing the result asynchronously to the caller. When the task completes, it puts its result into a promise object calling promise::set_value.

When the caller finally needs to access the result it will call the blocking future::get() to retrieve it. If the task has already completed the result will be immediately available, otherwise, the caller thread will suspend until the result value becomes available.

#include <fstream>
#include <iostream>
#include <future>
#include <string>
#include <vector>

std::vector<char> readFile(const std::string& inPath)
{
  std::ifstream file(inPath, std::ios::binary | std::ios::ate);
  auto length = file.tellg();
  std::vector<char> buffer(length);
  file.seekg(0, std::ios::beg);
  file.read(&buffer[0], length);
  return buffer;
}

std::size_t writeFile(const std::vector<char>& buffer, const std::string& outPath)
{
  std::ofstream file(outPath, std::ios::binary);
  file.write(&buffer[0], buffer.size());
  return file.tellp();
}

std::size_t futureCopyFile(const std::string& inFile, const std::string& outFile)
{
  std::promise<std::vector<char>> prom1;
  std::future<std::vector<char>> fut1 = prom1.get_future();
  std::thread th1([&prom1, inFile](){
      prom1.set_value(readFile(inFile));
  });

  std::promise<int> prom2;
  std::future<int> fut2 = prom2.get_future();
  std::thread th2([&fut1, &prom2, outFile](){
      prom2.set_value(writeFile(fut1.get(), outFile));
  });

  std::size_t result = fut2.get();
  th1.join();
  th2.join();
  return result;
}

int main()
{
  std::cout << "Using threads " << futureCopyFile( "test.cpp", "test1.cpp" ) << std::endl;
  return 0;
}


Note that here we have moved the execution of the readFile and writeFile into separate tasks but we also have to configure and start threads to run them. Also, we capture references to the promise and future objects to make them available to the task functions. The first thread implements the read, and moves its result into a promise when it completes, in the form of a big vector. The second thread waits (blocking) on a corresponding future and when the read completes, get the read vector and pass it to the write function. Finally, when the write completes, the number of chars written is put in the second future.

In the main function we could take advantage of this parallelism and do some lengthy operation before the call to future::get(). But when we call get() the main thread will still block if the read and write tasks have not completed yet.

std::async


Even with futures and promises, we were still dealing with lower level threads. Things become much simpler if we use the std::async() function, also provided by the library. It takes as input a lambda or functor and it returns a future that will contain the return value.

It decides where to run the task, if a new thread needs to be created or if an old (or even the current) thread can be reused.

It is also possible to specify a launch policy, which can be either “async” (which requires to execute the task asynchronously, possibly in a different thread) or “deferred” (which asks to execute the task only at the moment when get() is called).

The nice thing is that std::async hides all the implementation, platform specific details for us.

std::size_t asyncCopyFile(const std::string& inFile, const std::string& outFile)
{
  auto fut1 = std::async(readFile, inFile);
  auto lambda = [&fut1](const std::string& path){ return writeFile(fut1.get(), path); };
  auto fut2 = std::async( lambda, outFile);

  return fut2.get();
}



References


https://en.wikipedia.org/wiki/Thread_(computing)
https://paoloseverini.wordpress.com/2014/04/07/concurrency-in-c11/