I got my first taste of writing distributed computer programs in 2010, during my PhD at EPF Lausanne. In my group we were developing a set of C++ libraries, called LifeV, for doing large scale finite element computational fluid dynamics simulations. In the case of my project, the emphasis was on large scale, so my work was to find ways to improve the scalability of our simulations. A bit later, I will elaborate on different ways of defining scalability, for now let’s say we wanted to use large clusters or supercomputers more efficiently, which in turn would allow us to perform larger and more realistic simulations.

We were using the Message Passing Interface (MPI), which was and still is the standard way of writing parallel numerical applications that run on distributed memory systems. For many people working in the field of scientific computing, MPI is their only, well, interface to distributed computing. It represents a sufficient abstraction of the mechanisms for distributing data and computations across multiple computers. A computational scientist would rarely need to look elsewhere, due to the native support of Fortran, C and C++ and all the excellent numerical libraries that have been written in these languages.

At the same time, MPI is very rarely used outside scientific computing. Other fields involving a large amount of data and computation, such as machine learning or data mining, have their own preferred tools, like Apache Hadoop or Apache Spark. Nonetheless, it’s interesting to look at MPI due to its simplicity. It describes quite clearly the basic concepts of message passing as a means of communication between independent processes.

So, what is MPI?

MPI is a specification for software libraries that handle the communication aspects, by way of explicit information exchange, in parallel numerical software that employs multiple processes. There exist multiple implementations of this specification, both open source and proprietary, and all supercomputing hardware vendors provide tuned their own optimized versions of MPI.

The most common programming model with MPI is “single-program-multiple-data” (SPMD), where multiple copies of the same program are operating on different datasets, possibly exchanging data when needed. This maps very well with various domain-decomposition methods used for solving partial differential equations: the problem is split into smaller independent subproblems. The solutions to these subproblems are used, in an iterative process, to solve the original problem. This all sounds quite abstract, but MPI makes it very easy to implement such algorithms by providing the user with a basic set of very general operations.

Core concepts

A fundamental concept of MPI is that of communicators. These objects link multiple (software) processes in a group. Each process in a communicator is a assigned a unique process id (usually called rank) and can exchange data (messages) with the other processes in the same group. Using the unique process rank, it’s possible to define which parts of the problem (or the dataset) each process will be responsible of.

The strength of MPI comes into play when the problem to solve is not embarrassingly parallel (some sort of synchronization is required between the processes). There exist different ways of exchanging information between processes, ranging from simple send and receive operations between pairs of processes, to collective reduction operations across the entire process groups. Most operations come in synchronous and asynchronous variants, and the use of asynchronous operations is recommended for maximum parallel scalability. In addition, there is support for marshalling and unmarshalling arbitrary data structures, to be used in messages.

More advanced concepts like sub-communicators and even parallel I/O are implemented on top of these primitives, making MPI a very powerful framework on top of which most numerical algorithms can be implemented in a parallel manner.

The application developer is left to focus on the implementation of his algorithm, while the MPI library will take care of putting the parallel machinery in place. Client code does not take care of establishing and maintaining connections between processes, message delivery, mailbox high-water-mark or choosing the message transport. All this is hidden in the MPI libraries and it works well, in the very controlled environment of a supercomputer. MPI applications have traditionally been static, in the sense that there is a predefined number of processes taking place and the hardware can be considered reliable enough that this number doesn’t change during the runtime of an application. This is also reflected in the error handling of MPI: any failed communication operation leaves the system in an undefined state. The client software is made aware of this and is supposed to perform necessary cleanup and then shut down. Again, this works due to the very specific circumstances in which MPI programs run (hardware is usually known at compile time, all nodes of a cluster have the same configuration, the network can be seen as reliable for the expected duration of MPI tasks).

Some code

It’s always good to have some code when explaining things. Here you can find a short program that uses the basic functionality of MPI. The program can be compiled with the mpicxx wrapper and run with mpirun -np <number_of_processes> <executable_name>. Both these commands should be available if you have MPI installed on your system.

An MPI program always starts with the initialization of the MPI library. Once this is done, it’s possible to know how many processes are taking part in our parallel programs and what is each process’ rank:

// Initialize MPI; should come as early in the program as possible
MPI_Init(&argc, &argv);

// Get the number of processes in the global process group (communicator)
int numProc;
MPI_Comm_size(MPI_COMM_WORLD, &numProc);

// Get the unique identifier of the current process inside the process group
int myRank;
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);

std::cout << "Hello! I am process " << myRank + 1
          << " of " << numProc
          << std::endl;

We can then communicate between processes. For instance, doing a synchronous (blocking) send from process 0 to process 1:

using message_t = std::array<int, 3>;

if (myRank == 0) {
  const int messageTag = 111;
  const int destinationRank = 1;
  const auto message = message_t{% raw %}{{1, 2, 3}}{% endraw %};

  MPI_Send(message.data(), message.size(), MPI_INT, destinationRank,
           messageTag, MPI_COMM_WORLD);

  std::cout << "Process " << myRank
            << " sent message " << messageTag << std::endl;
} else if (myRank == 1) {
  const int messageTag = 111;
  const int sourceRank = 0;
  message_t message;
  MPI_Status status;

  MPI_Recv(message.data(), message.size(), MPI_INT, sourceRank,
           messageTag, MPI_COMM_WORLD, &status);

  std::cout << "Process " << myRank
            << " received message " << messageTag << std::endl;
  printMessage(myRank, message);
}

An asynchronous (non-blocking) send is done in two steps. The operation is initiated on the sender and the receiver with MPI_Isent and MPI_Irecv respectively. Both processes can continue with other work at this point, but they must call MPI_Wait on the previous request before the transfer can be considered complete:

if (myRank == 0) {
  const int messageTag = 112;
  const int destinationRank = 1;
  const auto message = message_t{% raw %}{{4, 5, 6}}{% endraw %};
  MPI_Request request;

  // Begin send
  MPI_Isend(message.data(), message.size(), MPI_INT, destinationRank,
            messageTag, MPI_COMM_WORLD, &request);

  MPI_Status status;
  // This call ensures that the message has been received
  MPI_Wait(&request, &status);

  std::cout << "Process " << myRank
            << " sent message " << messageTag << std::endl;
} else if (myRank == 1) {
  const int messageTag = 112;
  const int sourceRank = 0;
  message_t message;
  MPI_Request request;

  // Begin receive
  MPI_Irecv(message.data(), message.size(), MPI_INT, sourceRank,
            messageTag, MPI_COMM_WORLD, &request);

  MPI_Status status;
  // This call ensures that the message has been sent and received
  MPI_Wait(&request, &status);

  std::cout << "Process " << myRank
            << " received message " << messageTag << std::endl;
  printMessage(myRank, message);
}

It’s also possible to perform reduction operations across processes. Here is a sum operation where the result is stored on the process with rank 0:

int result;
MPI_Reduce(&myRank, &result, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);

if (myRank == 0)
{
  // Result should be (numProc - 1) * numProc / 2
  std::cout << "Process " << myRank
            << " obtained result: " << result << std::endl;
}

The last thing before program exit should be a call to MPI_Finalize(), which does the necessary cleanup and shutdown of the MPI library.

Of course, MPI offers much more functionality which is not covered in this example. The full API is documented and can be consulted online.

Scalability

I have already mentioned scalability twice in this text. Two main measures of scalability are used in parallel computing. The first one is strong scalability, described by Amdahl’s law. This tells us that given a fixed problem size, there is an upper bound to the speedup that can be achieved using more and more processors in parallel, since a fraction of the program is inherently serial and cannot be parallelized. The second measure is weak scalability, described by Gustafson’s law. The law states that it should be possible to solve larger problems in a constant amount of time, if both the problem sizes and the amount of parallel hardware resources are increased by the same amount.

Closing remarks

This is the end of this very brief introduction to MPI. The topic is very large but I hope to revisit some of the concepts mentioned here, in future articles. MPI is an interesting system to study because it synthesizes the communication patterns underlying a very large number of parallel algorithms. At the same time, the user is freed from the need to implement the non-functional aspects of his code, which are handled by the library and runtime environment.