UP | HOME

A2. Parallel Programming in C

Table of Contents


Why Parallel Programming?

Simply put, because it may speed up your code. Unlike 10 years ago, today, your computer (and probably even your smartphone) have one or more CPUs that have multiple processing cores (Multi-core processor). This helps with desktop computing tasks like multitasking (running multiple programs, plus the operating system, simultaneously). For scientific computing, this means you have the ability in principle of splitting up your computations into groups and running each group on its own processor.

Most operating systems have a utility that allows you to visualize processor usage in real-time. Mac OSX has "Activity Monitor", Gnome/GNU Linux has "gnome-system-monitor" and Windows has … well actually I have no idea, you're on your own with that one. Fire it up, and run a computationally intensive program you have written, and what you will probably see is that you have a lot of computational power that is sleeping. Parallel programming allows you in principle to take advantage of all that dormant power.

Kinds of Parallel Programming

There are many flavours of parallel programming, some that are general and can be run on any hardware, and others that are specific to particular hardware architectures.

Two main paradigms we can talk about here are shared memory versus distributed memory models. In shared memory models, multiple processing units all have access to the same, shared memory space. This is the case on your desktop or laptop with multiple CPU cores. In a distributed memory model, multiple processing units each have their own memory store, and information is passed between them. This is the model that a networked cluster of computers operates with. A computer cluster is a collection of standalone computers that are connected to each other over a network, and are used together as a single system. We won't be talking about clusters here, but some of the tools we'll talk about (e.g. MPI) are easily used with clusters.

Types of parallel tasks

Broadly speaking we can separate a computation into two camps depending on how it can be parallelized. A so-called embarrassingly parallel problem is one for which it is dead easy to separate it into some number of independent tasks that then may be run in parallel.

Embarrassingly parallel problems

Embarrassingly parallel computational problems are the easiest to parallelize and you can achieve impressive speedups if you have a computer with many cores. Even if you have just two cores, you can get close to a two-times speedup. An example of an embarrassingly parallel problem is when you need to run a preprocessing pipeline on datasets collected for 15 subjects. Each subject's data can be processed independently of the others. In other words, the computations involved in processing one subject's data do not in any way depend on the results of the computations for processing some other subject's data.

As an example, a grad student in my lab (Heather) figured out how to distribute her FSL preprocessing pipeline for 24 fMRI subjects across multiple cores on her Mac Pro desktop (it has 8) and as a result what used to take about 48 hours to run, now takes "just" over 6 hours.

Serial problems

In contrast to embarrassingly parallel problems, there is a class of problems that cannot be split into independent sub-problems, we can call them inherently sequential or serial problems. For these types of problems, the computation at one stage does depend on the results of a computation at an earlier stage, and so it is not so easy to parallelize across independent processing units. In these kinds of problems, there is a need for some communication or coordination between sub-tasks.

An example of a serial problem is a simulation of an arm movement. We run simulations of arm movements like reaching, that use detailed mathematical models of muscle mechanics, activation dynamics, musculoskeletal dynamics and spinal reflexes. Differential equations govern the relationship between muscle stimulation (the input) and the resulting arm movement (the output). These equations are "unwrapped" in time by a differential equation integrator, that takes small steps (like 1 millisecond at a time) to generate a simulation of a whole movement (e.g. 1 second of simulated time). On each step the current state of the system depends on both the current input (muscle command) and on the previous state of the system. With a 1 ms step, it takes (at least) 1000 computations to simulate a 1 sec arm movement … but we cannot simply split up those 1000 computations and distribute them to a set of independent processing units. This is an inherently serial problem where the current computation cannot be carried out without the results of the previous computation.

As an aside, the way to take advantage of parallelism even with a serial problem, is to parallelize meta-computations. So for example, we typically want to run "sensitivity analyses" where we vary some parameter(s) of the arm model and re-run the simulation. If we have 100 such simulations to run, even though each one of them is a serial problem on its own, they are independent of each other, and so we can parallelize the sensitivity analysis by distributing those 100 simulations to multiple processing units.

Mixtures

A good example of a problem that has both embarrassingly parallel properties as well as serial dependency properties, is the computations involved in training and running an artificial neural network (ANN). An ANN is made up of several layers of neuron-like processing units, each layer having many (even hundreds or thousands) of these units. If the ANN is a pure feedforward architecture, then computations within each layer are embarrassingly parallel, while computations between layers are serial.

Tools for Parallel Programming

The threads model of parallel programming is one in which a single process (a single program) can spawn multiple, concurrent "threads" (sub-programs). Each thread runs independently of the others, although they can all access the same shared memory space (and hence they can communicate with each other if necessary). Threads can be spawned and killed as required, by the main program.

A challenge of using threads is the issue of collisions and race conditions, which can be addressed using synchronization. If multiple threads write to (and depend upon) a shared memory variable, then care must be taken to make sure that multiple threads don't try to write to the same location simultaneously. The wikipedia page for race condition has a nice description (an an example) of how this can be a problem. There are mechanisms when using threads to implement synchronization, and to implement mutual exclusivity (mutex variables) so that shared variables can be locked by one thread and then released, preventing collisions by other threads. These mechanisms ensure threads must "take turns" when accessing protected data.

POSIX Threads (Pthreads)

POSIX Threads (Pthreads for short) is a standard for programming with threads, and defines a set of C types, functions and constants.

More generally, threads are a way that a program can spawn concurrent units of processing that can then be delegated by the operating system to multiple processing cores. Clearly the advantage of a multithreaded program (one that uses multiple threads that are assigned to multiple processing cores) is that you can achieve big speedups, as all cores of your CPU (and all CPUs if you have more than one) are used at the same time.

Here is a simple example program that spawns 5 threads, where each one runs the myFun() function:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define NTHREADS 5

void *myFun(void *x)
{
  int tid;
  tid = *((int *) x);
  printf("Hi from thread %d!\n", tid);
  return NULL;
}

int main(int argc, char *argv[])
{
  pthread_t threads[NTHREADS];
  int thread_args[NTHREADS];
  int rc, i;

  /* spawn the threads */
  for (i=0; i<NTHREADS; ++i)
    {
      thread_args[i] = i;
      printf("spawning thread %d\n", i);
      rc = pthread_create(&threads[i], NULL, myFun, (void *) &thread_args[i]);
    }

  /* wait for threads to finish */
  for (i=0; i<NTHREADS; ++i) {
    rc = pthread_join(threads[i], NULL);
  }

  return 1;
}

plg@wildebeest:~/Desktop$ gcc -o go go.c -lpthread
plg@wildebeest:~/Desktop$ ./go
spawning thread 0
spawning thread 1
Hi from thread 0!
spawning thread 2
Hi from thread 1!
spawning thread 3
Hi from thread 2!
spawning thread 4
Hi from thread 3!
Hi from thread 4!

For more information about collisions, synchronization, mutexes, etc, check out one of the many sources of documentation about Pthreads, e.g. here: Mutex Variables.

OpenMP

OpenMP is an API that implements a multi-threaded, shared memory form of parallelism. It uses a set of compiler directives (statements that you add to your C code) that are incorporated at compile-time to generate a multi-threaded version of your code. You can think of Pthreads (above) as doing multi-threaded programming "by hand", and OpenMP as a slightly more automated, higher-level API to make your program multithreaded. OpenMP takes care of many of the low-level details that you would normally have to implement yourself, if you were using Pthreads from the ground up.

Here is the general code structure of an OpenMP program:

#include <omp.h>

main ()  {

int var1, var2, var3;

Serial code 
      .
      .
      .

Beginning of parallel section. Fork a team of threads.
Specify variable scoping 

#pragma omp parallel private(var1, var2) shared(var3)
  {

  Parallel section executed by all threads 
        .
        .
        .

  All threads join master thread and disband 

  }  

Resume serial code 
      .
      .
      .

}

Private vs Shared variables

By using the private() and shared() directives, you can specify variables within the parallel region as being shared, i.e. visible and accessible by all threads simultaneously, or private, i.e. private to each thread, meaning each thread will have its own local copy. In the code example below for parallelizing a for loop, you can see that we specify the thread_id and nloops variables as private.

Parallelizing for loops with OpenMP

Parallelizing for loops is really simple (see code below). By default, loop iteration counters in OpenMP loop constructs (in this case the i variable) in the for loop are set to private variables.

// gcc -fopenmp -o go go.c
// ./go

#include <stdio.h>
#include <omp.h>

int main(int argc, char **argv)
{
  int i, thread_id, nloops;

#pragma omp parallel private(thread_id, nloops)
  {
    nloops = 0;

#pragma omp for
    for (i=0; i<1000; ++i)
      {
        ++nloops;
      }

    thread_id = omp_get_thread_num();

    printf("Thread %d performed %d iterations of the loop.\n",
           thread_id, nloops );
  }

  return 0;
}
plg@wildebeest:~/Desktop$ gcc -fopenmp -o go go.c
plg@wildebeest:~/Desktop$ ./go
Thread 4 performed 125 iterations of the loop.
Thread 7 performed 125 iterations of the loop.
Thread 2 performed 125 iterations of the loop.
Thread 6 performed 125 iterations of the loop.
Thread 5 performed 125 iterations of the loop.
Thread 0 performed 125 iterations of the loop.
Thread 3 performed 125 iterations of the loop.
Thread 1 performed 125 iterations of the loop.

Critical Code

Using OpenMP you can specify something called a "critical" section of code. This is code that is performed by all threads, but is only performed one thread at a time (i.e. in serial). This provides a convenient way of letting you do things like updating a global variable with local results from each thread, and you don't have to worry about things like other threads writing to that global variable at the same time (a collision).

#include <stdio.h>
#include <omp.h>

int main(int argc, char *argv[])
{
    int i, thread_id;
    int glob_nloops, priv_nloops;
    glob_nloops = 0;

    // parallelize this chunk of code
    #pragma omp parallel private(priv_nloops, thread_id)
    {
        priv_nloops = 0;
        thread_id = omp_get_thread_num();

        // parallelize this for loop
        #pragma omp for
        for (i=0; i<100000; ++i)
        {
            ++priv_nloops;
        }

        // make this a "critical" code section
        #pragma omp critical
        {
            printf("Thread %d is adding its iterations (%d) to sum (%d), ",
                   thread_id, priv_nloops, glob_nloops);
            glob_nloops += priv_nloops;
            printf(" total nloops is now %d.\n", glob_nloops);
        }
    }
    printf("Total # loop iterations is %d\n",
           glob_nloops);
    return 0;
}
plg@wildebeest:~/Desktop$ gcc -fopenmp -o go go.c
plg@wildebeest:~/Desktop$ ./go
Thread 1 is adding its iterations (12500) to sum (0),  total nloops is now 12500.
Thread 4 is adding its iterations (12500) to sum (12500),  total nloops is now 25000.
Thread 0 is adding its iterations (12500) to sum (25000),  total nloops is now 37500.
Thread 5 is adding its iterations (12500) to sum (37500),  total nloops is now 50000.
Thread 3 is adding its iterations (12500) to sum (50000),  total nloops is now 62500.
Thread 6 is adding its iterations (12500) to sum (62500),  total nloops is now 75000.
Thread 2 is adding its iterations (12500) to sum (75000),  total nloops is now 87500.
Thread 7 is adding its iterations (12500) to sum (87500),  total nloops is now 100000.
Total # loop iterations is 100000

Reduction

Reduction refers to the process of combining the results of several sub-calculations into a final result. This is a very common paradigm (and indeed the so-called "map-reduce" framework used by Google and others is very popular). Indeed we used this paradigm in the code example above, where we used the "critical code" directive to accomplish this. The map-reduce paradigm is so common that OpenMP has a specific directive that allows you to more easily implement this.

#include <stdio.h>
#include <omp.h>

int main(int argc, char *argv[])
{
    int i, thread_id;
    int glob_nloops, priv_nloops;
    glob_nloops = 0;

    // parallelize this chunk of code
    #pragma omp parallel private(priv_nloops, thread_id) reduction(+:glob_nloops)
    {
        priv_nloops = 0;
        thread_id = omp_get_thread_num();

        // parallelize this for loop
        #pragma omp for
        for (i=0; i<100000; ++i)
        {
            ++priv_nloops;
        }
        glob_nloops += priv_nloops;
    }
    printf("Total # loop iterations is %d\n",
           glob_nloops);
    return 0;
}
plg@wildebeest:~/Desktop$ gcc -fopenmp -o go go.c
plg@wildebeest:~/Desktop$ ./go
Total # loop iterations is 100000

Other OpenMP directives

There are a host of other directives you can issue using OpenMP, see here for a list (wikipedia). Some other clauses of interest are:

  • barrier: each thread will wait until all threads have reached this point in the code, before proceeding
  • nowait: threads will not wait until everybody is finished
  • schedule(type, chunk) allows you to specify how tasks are spawned out to threads in a for loop. There are three types of scheduling you can specify
  • if: allows you to parallelize only if a certain condition is met
  • … and a host of others

MPI

The Message Passing Interface (MPI) is a standard defining core syntax and semantics of library routines that can be used to implement parallel programming in C (and in other languages as well). There are several implementations of MPI such as Open MPI, MPICH2 and LAM/MPI.

In the context of this tutorial, you can think of MPI, in terms of its complexity, scope and control, as sitting in between programming with Pthreads, and using a high-level API such as OpenMP.

The MPI interface allows you to manage allocation, communication, and synchronization of a set of processes that are mapped onto multiple nodes, where each node can be a core within a single CPU, or CPUs within a single machine, or even across multiple machines (as long as they are networked together).

One context where MPI shines in particular is the ability to easily take advantage not just of multiple cores on a single machine, but to run programs on clusters of several machines. Even if you don't have a dedicated cluster, you could still write a program using MPI that could run your program in parallel, across any collection of computers, as long as they are networked together. Just make sure to ask permission before you load up your lab-mate's computer's CPU(s) with your computational tasks!

Here is a basic MPI program that simply writes a message to the screen indicating which node is running.

// mpicc go_mpi.c -o go_mpi
// mpirun -n 4 go_mpi

#include <stdio.h>
#include <mpi.h>

int main(int argc, char *argv[]) 
{
  int myrank, nprocs;

  MPI_Init(&argc, &argv);
  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
  MPI_Comm_rank(MPI_COMM_WORLD, &myrank);

  printf("I am node %d of %d\n", myrank, nprocs);

  MPI_Finalize();
  return 0;
}
plg@wildebeest:~/Desktop$ mpicc go_mpi.c -o go_mpi
plg@wildebeest:~/Desktop$ mpirun -n 4 go_mpi
I am node 0 of 4
I am node 2 of 4
I am node 1 of 4
I am node 3 of 4

The basic design pattern of an MPI program is that the same code is sent to all nodes for execution. It's by using the MPI_Comm_rank() function that you can determine which node is running, and (if needed) act differently. The MPI_Comm_size() function will tell you how many nodes there are in total.

MPI programs need to be compiled using mpicc, and need to be run using mpirun with a flag indicating the number of processors to spawn (4, in the above example).

MPI_Reduce

We saw with OpenMP that we can use a reduce directive to sum values across all threads. A similar function exists in MPI called MPI_Reduce().

An Example: Estimating pi using dartboard algorithm

// Estimating pi using the dartboard algorithm
// All processes contribute to the calculation, with the
// master process averaging the values for pi.
// We then use mpc_reduce to collect the results
//
// mpicc -o go mpi_pi_reduce.c
// mpirun -n 8 go

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

#define MASTER 0         // task ID of master task
#define NDARTS 1000      // # dart throws per round
#define NROUNDS 10     // # of rounds of dart throwing

// our function for throwing darts and estimating pi
double dartboard(int ndarts)
{
  double x, y, r, pi; 
  int n, hits;
  hits = 0;

  // throw darts
  for (n = 1; n <= ndarts; n++)  {
    // (x,y) are random between -1 and 1
    r = (double)random()/RAND_MAX;
    x = (2.0 * r) - 1.0;
    r = (double)random()/RAND_MAX;
    y = (2.0 * r) - 1.0;
    // if our random dart landed inside the unit circle, increment the score
    if (((x*x) + (y*y)) <= 1.0) {
      hits++;
    }
  }

  // estimate pi
  pi = 4.0 * (double)hits / (double)ndarts;
  return(pi);
} 

// the main program
int main (int argc, char *argv[])
{
  double my_pi, pi_sum, pi_est, mean_pi, err;
  int task_id, n_tasks, rc, i;
  MPI_Status status;

  // Obtain number of tasks and task ID
  MPI_Init(&argc,&argv);
  MPI_Comm_size(MPI_COMM_WORLD,&n_tasks);
  MPI_Comm_rank(MPI_COMM_WORLD,&task_id);
  //  printf ("task %d of %d reporting for duty...\n", task_id, n_tasks);

  // different seed for random number generator for each task
  srandom (task_id);

  mean_pi = 0.0;
  for (i=0; i<NROUNDS; i++) {
    // all tasks will execute dartboard() to calculate their own estimate of pi
    my_pi = dartboard(NDARTS);

    // now we use MPI_Reduce() to sum values of my_pi across all tasks
    // the master process (id=MASTER) will store the accumulated value
    // in pi_sum. We tell MPI_Reduce() to sum by passing it
    // the MPI_SUM value (define in mpi.h)
    rc = MPI_Reduce(&my_pi, &pi_sum, 1, MPI_DOUBLE, MPI_SUM,
                    MASTER, MPI_COMM_WORLD);

    // now, IF WE ARE THE MASTER process, we will compute the mean
    if (task_id == MASTER) {
      pi_est = pi_sum / n_tasks;
      mean_pi = ( (mean_pi * i) + pi_est ) / (i + 1); // running average
      err = mean_pi - 3.14159265358979323846;
      printf("%d throws: mean_pi %.12f: error %.12f\n",
             (NDARTS * (i + 1)), mean_pi, err);
    }
  }
  if (task_id == MASTER) 
    printf ("PS, the real value of pi is about 3.14159265358979323846\n");

  MPI_Finalize();
  return 0;
}

Here we run it with just one parallel process:

plg@wildebeest:~/Desktop/mpi$ time mpirun -n 1 go
1000 throws: mean_pi 3.088000000000: error -0.053592653590
2000 throws: mean_pi 3.104000000000: error -0.037592653590
3000 throws: mean_pi 3.101333333333: error -0.040259320256
4000 throws: mean_pi 3.120000000000: error -0.021592653590
5000 throws: mean_pi 3.124800000000: error -0.016792653590
6000 throws: mean_pi 3.127333333333: error -0.014259320256
7000 throws: mean_pi 3.134285714286: error -0.007306939304
8000 throws: mean_pi 3.128500000000: error -0.013092653590
9000 throws: mean_pi 3.132444444444: error -0.009148209145
10000 throws: mean_pi 3.119600000000: error -0.021992653590
PS, the real value of pi is about 3.14159265358979323846

real	0m0.032s
user	0m0.020s
sys	0m0.012s

Now let's run it with 4:

plg@wildebeest:~/Desktop/mpi$ time mpirun -n 4 go
1000 throws: mean_pi 3.105000000000: error -0.036592653590
2000 throws: mean_pi 3.122500000000: error -0.019092653590
3000 throws: mean_pi 3.122000000000: error -0.019592653590
4000 throws: mean_pi 3.137750000000: error -0.003842653590
5000 throws: mean_pi 3.143600000000: error 0.002007346410
6000 throws: mean_pi 3.140166666667: error -0.001425986923
7000 throws: mean_pi 3.142000000000: error 0.000407346410
8000 throws: mean_pi 3.140250000000: error -0.001342653590
9000 throws: mean_pi 3.136666666667: error -0.004925986923
10000 throws: mean_pi 3.135000000000: error -0.006592653590
PS, the real value of pi is about 3.14159265358979323846

real	0m0.034s
user	0m0.044s
sys	0m0.024s

We see the final error is much reduced. Each of the 4 processes (which are parallelized across the cores of my CPU) contributes an estimate of pi, which are then averaged by the master process to come up with the final estimate of pi.

GPU Computing

x

Links

Exercises

Here is a data file containing two columns of comma-separated data.

100,111
93,103
115,119
97,117
106,116
111,116
111,119
100,103
126,118
93,119
  • 1 Write a program to read in the data file into one or more data structures, and print the values out to the screen. You can assume in your program that you know the number of rows of data (10).
  • 2 Rewrite your program above assuming you don't know in advance how many rows of data you have.
  • 3 Add to your program a function that computes the value of a t statistic for the difference between means of the two columns of data. Assume it's an unpaired t-test and you can compute t using the following equation:

    \begin{equation} t = \frac{\bar{X}_{2} - \bar{X}_{1}}{\sqrt{\frac{s_{1}^{2}}{n1}+\frac{s_{2}^{2}}{n2}}} \end{equation}
  • 4 Implement a bootstrapping test of the t statistic you get above. Iterate nboot times, each time taking a random sample (with replacement) from the set of 20 observations, and assigning them to each group, then re-do the t-test. Count up how many times out of nboot you get a t value as large or larger as the one you computed above (so this is a one-tailed test). Set nboot to 1 million and report execution time. If you have a fast machine set nboot to 10 million so you have some dynamic range. If you have a slow machine set nboot to 1e5 (or 1e4 if it's really slow).
  • 5 Parallelize the bootstrap loop to make use of multiple CPU cores. Report execution time.

Solutions


Paul Gribble | Summer 2012
This work is licensed under a Creative Commons Attribution 4.0 International License
Creative Commons License