This is a short introduction to the Message Passing Interface (MPI) designed to convey the fundamental operation and use of the interface. This introduction is designed for readers with some background programming Fortran, and should deliver enough information to allow readers to write and run their own (very simple) parallel Fortran programs using MPI.
There exists a version of this tutorial for C programers called Introduction the the Message Passing Interface (MPI) using C.
The MPI standard was developed to ameliorate these problems. It is a library that runs with standard Fortran or C programs, using commonly-available operating system services to create parallel processes and exchange information among these processes.
MPI is designed to allow users to create programs that can run efficiently on most parallel architectures. The design process included vendors (such as IBM, Intel, TMC, Cray, Convex, etc.), parallel library authors (involved in the development of PVM, Linda, etc.), and applications specialists. The final version for the draft standard became available in May of 1994.
MPI can also support distributed program execution on heterogenous hardware. That is, you may run a program that starts processes on multiple computer systems to work on the same problem. This is useful with a workstation farm.
program hello_world include '/usr/include/mpif.h' integer ierr call MPI_INIT ( ierr ) print *, "Hello world" call MPI_FINALIZE ( ierr ) stop end
If you compile hello.f with a command like
f77 -o hello hello.f -lmpi
you will create an executable file called hello, which you can execute by using the mpirun command as in the following session segment:
$ mpirun -np 4 hello Hello world Hello world Hello world Hello world $
When the program starts, it consists of only one process, sometimes called the "parent" or "root" process. When the routine MPI_INIT executes within the root process, it causes the creation of 3 additional process (to reach the number of process (np) specified on the mpirun command line), sometimes called "child" processes.
Each of the processes then continues executing separate versions of the hello world program. The next statement in every program is the print statement, and each process prints "Hello world" as directed. Since terminal output from every program will be directed to the same terminal, we see four lines saying "Hello world".
MPI also provides routines that let the process determine its process ID, as well as the number of processes that are have been created.
Here is an enhanced version of the Hello world program that identifies the process that writes each line of output:
program hello_world include '/usr/include/mpif.h' integer ierr, num_procs, my_id call MPI_INIT ( ierr ) c find out MY process ID, and how many processes were started. call MPI_COMM_RANK (MPI_COMM_WORLD, my_id, ierr) call MPI_COMM_SIZE (MPI_COMM_WORLD, num_procs, ierr) print *, "Hello world! I'm process ", my_id, " out of ", & num_procs, " processes." call MPI_FINALIZE ( ierr ) stop end
When we run this program, each process identifies itself:
$ f77 -o hello2 hello2.f -lmpi $ mpirun -np 4 hello2 Hello world! I'm process number 0 out of 4 processes. Hello world! I'm process number 2 out of 4 processes. Hello world! I'm process number 1 out of 4 processes. Hello world! I'm process number 3 out of 4 processes. $
Note that the process numbers are not printed in ascending order. That is because the processes execute independently and execution order was not controlled in any way. The programs may print their results in different orders each time they are run.
(To find out which Origin processors and memories are used to run a program you can turn on the MPI_DSM_VERBOSE environment variable with "export MPI_DSM_VERBOSE=ON", or equivalent.)
To let each process perform a different task, you can use a program structure like:
program example_mpi include '/usr/include/mpif.h' integer my_id, root_process, ierr, status(MPI_STATUS_SIZE) integer num_procs c Create child processes, each of which has its own variables. c From this point on, every process executes a separate copy c of this program. Each process has a different process ID, c ranging from 0 to num_procs minus 1, and COPIES of all c variables defined in the program. No variables are shared. call MPI_INIT (ierr) c find out MY process ID, and how many processes were started. call MPI_COMM_RANK (MPI_COMM_WORLD, my_id, ierr) call MPI_COMM_SIZE (MPI_COMM_WORLD, num_procs, ierr) if ( my_id .eq. 0 ) then c do some work as process 0. elseif (my_id .eq. 1 ) then c do some work as process 1. elseif (my_id .eq. 2 ) then c do some work as process 2. else c do this work in any remaining processes. endif c Stop this process. call MPI_FINALIZE(ierr) stop end
As a result, these programs cannot communicate with each other by exchanging information in memory variables. Instead they may use any of a large number of MPI communication routines. The two basic routines are:
The syntax of MPI_SEND is:
MPI_SEND (data_to_send, send_count, send_type, destination_ID, tag, comm, ierr)
The syntax of MPI_RECV is:
MPI_RECV (received_data, receive_count, receive_type, sender_ID, tag, comm, status, ierr)
The amount of information actually received can then be retrieved from the status variable, as with:
call MPI_GET_COUNT (status, MPI_REAL, true_received_count, ierr)or
received_source = status (MPI_SOURCE)or
received_tag = status (MPI_TAG)
MPI_RECV blocks until the data transfer is complete and the received_data variable is available for use.
The basic datatypes recognized by MPI are:
MPI datatype handle | Fortran datatype |
---|---|
MPI_INTEGER | INTEGER |
MPI_REAL | REAL |
MPI_DOUBLE_PRECISION | DOUBLE_PRECISION |
MPI_COMPLEX | COMPLEX |
MPI_LOGICAL | LOGICAL |
MPI_CHARACTER | CHARACTER |
MPI_BYTE | |
MPI_PACKED |
There also exist other types like: Hvector, Hindexed, and Struct.
A common pattern of interaction among parallel processes is for one, the "master", to allocate work to a set of "slave" processes and collect results from the slaves to synthesize a final result.
The master process will execute program statements like:
c distribute portions of vector1 to slaves. do an_id = 1, num_procs - 1 start_row = ( an_id * num_rows_per_process) + 1 call MPI_SEND( num_rows_to_send, 1, MPI_INT, & an_id, send_data_tag, MPI_COMM_WORLD, ierr) call MPI_SEND( vector1(start_row), num_rows_per_process, & MPI_REAL, an_id, send_data_tag, MPI_COMM_WORLD, ierr) end do c and, then collect the results from the slave processes, c here in a variable called vector4, and do something with them. do an_id = 1, num_procs -1 call MPI_RECV( vector4, num_rows_returned, MPI_REAL, & MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, status, ierr) c DO SOMETHING WITH VECTOR4 HERE. end do c and then print out some final result using the c information collected from the slaves.
In this fragment, the master program sends a contiguous portion of vector1 to each slave using MPI_SEND and then receives a response from each slave via MPI_RECV. In practice, the master does not have to send a vector; it could send a scalar or some other MPI data type, and it could construct vector1 from any components to which it has access.
Here the returned information is put in vector2, which will be written over every time a different message is received. Therefore, it will probably be copied to some other variable within the receiving loop.
Note the use of the MPI constant MPI_ANY_SOURCE to allow this MPI_RECV call to receive messages from any process. In some cases, a program would need to determine exactly which process sent a message received using MPI_ANY_SOURCE. status(MPI_SOURCE) will hold that information, immediately following the call to MPI_RECV.
The slave program to work with this master would resemble:
c Receive a vector segment, here called vector2. call MPI_RECV ( num_rows_to_receive, 1 , MPI_INT, & root_process, MPI_ANY_TAG, MPI_COMM_WORLD, status, ierr) call MPI_RECV( vector2, num_rows_to_receive, MPI_REAL, & root_process, MPI_ANY_TAG, MPI_COMM_WORLD, status, ierr) c Do something with vector2 here, placing the result in vector3, c and send vector3 to the root process. call MPI_SEND( vector3, num_rows_to_return, MPI_REAL, & root_process, return_data_tag, MPI_COMM_WORLD, ierr) endifThere could be many slave programs running at the same time. Each one would receive data in vector2 from the master via MPI_RECV and work on its own copy of that data. Each slave would construct its own copy of vector3, which it would then send to the master using MPI_SEND.
program sumvector parameter (max_rows = 10000000) real vector(max_rows) print *, "please enter the number of numbers to sum:" read *, num_rows if ( num_rows .gt. max_rows) stop "Too many numbers." c initialize a vector. do I = 1, num_rows vector(i) = float(i) end do c compute sum. sum = 0.0 do i = 1, num_rows sum = sum + vector(i) end do print *, "The grand total is: ", sum stop end
program sumvector_mpi c This program sums all rows in a vector using MPI parallelism. c The root process acts as a master and sends a portion of the c vector to each child process. Master and child processes then c all calculate a partial sum of the portion of the vector assigned c to them, and the child processes send their partial sums to c the master, who calculates a grand total. include '/usr/include/mpif.h' integer my_id, root_process, ierr, status(MPI_STATUS_SIZE) integer num_procs, an_id root_process = 0 c Now replicate this process to create parallel processes. call MPI_INIT (ierr) c find out MY process ID, and how many processes were started. call MPI_COMM_RANK (MPI_COMM_WORLD, my_id, ierr) call MPI_COMM_SIZE (MPI_COMM_WORLD, num_procs, ierr) if (my_id .eq. root_process) then c c I must be the root process, so I will query the user c to determine how many numbers to sum. c initialize a vector, c distribute a portion of the vector to each child process, c and calculate the sum of the values in the segment assigned c to the root process, c and, finally, I collect the partial sums from slave processes, c print them, and add them to the grand sum, and print it. else c I must be slave process, so I must receive my vector segment, c calculate the sum of my portion of the vector, c and, finally, send my portion of the sum to the root process. endif c Stop this process. call MPI_FINALIZE(ierr) stop end
program sumvector_mpi c This program sums all rows in a vector using MPI parallelism. c The root process acts as a master and sends a portion of the c vector to each child process. Master and child processes then c all calculate a partial sum of the portion of the vector assigned c to them, and the child processes send their partial sums to c the master, who calculates a grand total. include '/usr/include/mpif.h' parameter (max_rows = 10000000) parameter ( send_data_tag = 2001, return_data_tag = 2002) integer my_id, root_process, ierr, status(MPI_STATUS_SIZE) integer num_procs, an_id, num_rows_to_receive integer avg_rows_per_process, num_rows, num_rows_to_send real vector(max_rows), vector2(max_rows), partial_sum, sum c Let process 0 be the root process. root_process = 0 c Now replicate this process to create parallel processes. c From this point on, every process executes a separate copy c of this program. call MPI_INIT (ierr) c find out MY process ID, and how many processes were started. call MPI_COMM_RANK (MPI_COMM_WORLD, my_id, ierr) call MPI_COMM_SIZE (MPI_COMM_WORLD, num_procs, ierr) if (my_id .eq. root_process) then c c I must be the root process, so I will query the user c to determine how many numbers to sum. print *, "please enter the number of numbers to sum:" read *, num_rows if ( num_rows .gt. max_rows) stop "Too many numbers." avg_rows_per_process = num_rows / num_procs c initialize a vector, do I = 1, num_rows vector(i) = float(i) end do c distribute a portion of the vector to each child process, do an_id = 1, num_procs -1 start_row = ( an_id * avg_rows_per_process) + 1 end_row = start_row + avg_rows_per_process - 1 if (an_id .eq. (num_procs - 1)) end_row = num_rows num_rows_to_send = end_row - start_row + 1 call MPI_SEND( num_rows_to_send, 1, MPI_INT, & an_id, send_data_tag, MPI_COMM_WORLD, ierr) call MPI_SEND( vector(start_row), num_rows_to_send, MPI_REAL, & an_id, send_data_tag, MPI_COMM_WORLD, ierr) end do c and calculate the sum of the values in the segment assigned c to the root process, sum = 0.0 do i = 1, avg_rows_per_process sum = sum + vector(i) end do print *, "sum ", sum, " calculated by root process." c and, finally, I collect the partial sums from slave processes, c print them, and add them to the grand sum, and print it. do an_id = 1, num_procs -1 call MPI_RECV( partial_sum, 1, MPI_REAL, MPI_ANY_SOURCE, & MPI_ANY_TAG, MPI_COMM_WORLD, status, ierr) sender = status(MPI_SOURCE) print *, "partial sum ", partial_sum, & " returned from process ", sender sum = sum + partial_sum end do print *, "The grand total is: ", sum else c I must be a slave process, so I must receive my vector segment, c storing it in a "local" vector, vector2. call MPI_RECV ( num_rows_to_receive, 1 , MPI_INT, & root_process, MPI_ANY_TAG, MPI_COMM_WORLD, status, ierr) call MPI_RECV ( vector2, num_rows_to_received, MPI_REAL, & root_process, MPI_ANY_TAG, MPI_COMM_WORLD, status, ierr) num_rows_received = num_rows_to_receive c Calculate the sum of my portion of the vector, partial_sum = 0.0 do i = 1, num_rows_received partial_sum = partial_sum + vector2(i) end do c and, finally, send my partial sum to the root process. call MPI_SEND( partial_sum, 1, MPI_REAL, root_process, & return_data_tag, MPI_COMM_WORLD, ierr) endif c Stop this process. call MPI_FINALIZE(ierr) stop end
The following table shows the values of several variables during the execution of sumvector_mpi. The information comes from a two-processor parallel run, and the values of program variables are shown in both processor memory spaces. Note that there is only one process active prior to the call to MPI_INIT.
Program location | Before MPI_INIT | After MPI_INIT | Before MPI_SEND to slave |
After MPI_RECV by slave |
After MPI_RECV by master |
||||
---|---|---|---|---|---|---|---|---|---|
Variable Name | Proc 0 | Proc 0 | Proc 1 | Proc 0 | Proc 1 | Proc 0 | Proc 1 | Proc 0 | Proc 1 |
root_process | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
my_id | . | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 1 |
num_procs | . | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 |
num_rows | . | . | . | 6 | . | 6 | . | 6 | . |
avg_rows_ per_process | . | . | . | 3 | . | 3 | . | 3 | . |
num_rows_ received | . | . | . | . | . | . | 3 | . | 3 |
vector(1) | . | . | . | 1.0 | . | 1.0 | . | 1.0 | . |
vector(2) | . | . | . | 2.0 | . | 2.0 | . | 2.0 | . |
vector(3) | . | . | . | 3.0 | . | 3.0 | . | 3.0 | . |
vector(4) | . | . | . | 4.0 | . | 4.0 | . | 4.0 | . |
vector(5) | . | . | . | 5.0 | . | 5.0 | . | 5.0 | . |
vector(6) | . | . | . | 6.0 | . | 6.0 | . | 6.0 | . |
vector2(1) | . | . | . | . | . | . | 4.0 | . | 4.0 |
vector2(2) | . | . | . | . | . | . | 5.0 | . | 5.0 |
vector2(3) | . | . | . | . | . | . | 6.0 | . | 6.0 |
vector2(4) | . | . | . | . | . | . | . | . | . |
vector2(5) | . | . | . | . | . | . | . | . | . |
vector2(6) | . | . | . | . | . | . | . | . | . |
partial_sum | . | . | . | . | . | . | . | 6.0 | 15.0 |
sum | . | . | . | . | . | . | . | 21.0 | . |
MPI supports three classes of collective operations:
The routines with "V" suffixes move variable-sized blocks of data.
The subroutine MPI_BCAST sends a message from one process to all processes in a communicator.
call MPI_BCAST (data_to_be_sent, send_count, send_type, broadcasting_process_ID, comm, ierr)
When processes are ready to share information with other processes as part of a broadcast, ALL of them must execute a call to MPI_BCAST. There is no separate MPI call to receive a broadcast.
MPI_BCAST could have been used in the program sumvector_mpi presented earlier, in place of the MPI_SEND loop that distributed data to each process. Doing so would have resulted in excessive data movement, of course. A better solution would be MPI_SCATTER or MPI_SCATTERV.
The subroutines MPI_SCATTER and MPI_SCATTERV take an input vector or array, break the input data into separate portions and send a portion to each one of the processes in a communicating group.
call MPI_SCATTER( send_data, send_count, send_type, receive_data, receive_count, receive_type, sending_process_ID, comm) or call MPI_SCATTERV( send_data, send_count_vector, send_start_vector, send_type, receive_data, receive_count, receive_type, sender_process_ID, comm, status)
The routine MPI_SCATTERV could have been used in the program sumvector_mpi presented earlier, in place of the MPI_SEND loop that distributed data to each process.
MPI_BCAST, MPI_SCATTER, and other collective routines build a communication tree among the participating processes to minimize message traffic. If there are N processes involved, there would normally be N-1 transmissions during a broadcast operation, but if a tree is built so that the broadcasting process sends the broadcast to 2 processes, and they each send it on to 2 other processes, the total number of messages transferred is only O(ln N).
call MPI_REDUCE( data_to_be_sent, result_to_be_received_by_target, send_count, send_type, operation, target_process_ID, comm, ierr)
When processes are ready to share information with other processes as part of a data reduction, all of the participating processes execute a call to MPI_REDUCE, which uses local data to calculate each process's portion of the reduction operation and communicates the local result to other processes as necessary. Only the target_process_ID receives the final result.
MPI_REDUCE could have been used in the program sumvector_mpi presented earlier, in place of the MPI_RECV loop that collected partial sums from each process.
Operation handle | Operation |
---|---|
MPI_MAX | Maximum |
MPI_MIN | Minimum |
MPI_PROD | Product |
MPI_SUM | Sum |
MPI_LAN | Logical AND |
MPI_LOR | Logical OR |
MPI_LXOR | Logical Exclusive OR |
MPI_BAND | Bitwise AND |
MPI_BOR | Bitwise OR |
MPI_BXOR | Bitwise Exclusive OR |
MPI_MAXLOC | Maximum value and location |
MPI_MINLOC | Minimum value and location |
program integrate c program to integrate sin(x) between 0 and pi by computing c the area of a number of rectangles chosen so as to approximate c the shape under the curve of the function. c c 1) ask the user to choose the number of intervals, c 2) compute the interval width (rect_width), c 3) for each interval: c c a) find the middle of the interval (x_middle), c b) compute the height of the rectangle, sin(x_middle), c c) find the area of the rectangle as the product of c the interval width and its height sin(x_middle), and c d) increment a running total. parameter (pi=3.141592654) double precision rect_width, area, sum, x_middle print *, "please enter the number of intervals to interpolate:" read *, num_intervals rect_width = pi / num_intervals sum = 0.0 do i= 1, num_intervals c find the middle of the interval on the X-axis. x_middle = (i - 0.5) * rect_width area = dsin(x_middle) * rect_width sum = sum + area end do print *, "The total area is: ", sum stop end
The next program is an MPI version of the program above. It uses MPI_BCAST to send information to each participating process and MPI_REDUCE to get a grand total of the areas computed by each participating process.
program integrate_mpi c This program integrates sin(x) between 0 and pi by computing c the area of a number of rectangles chosen so as to approximate c the shape under the curve of the function using MPI. c c The root process acts as a master to a group of child process c that act as slaves. The master prompts for the number of c interpolations and broadcasts that value to each slave. c c There are num_procs processes all together, and a process c computes the area defined by every num_procs-th interval, c collects a partial sum of those areas, and sends its partial c sum to the root process. include '/usr/include/mpif.h' parameter (pi=3.141592654) integer my_id, root_process, num_procs, ierr double precision rect_width, area, sum, x_middle, partial_sum integer status(MPI_STATUS_SIZE) c Let process 0 be the root process. root_process = 0 c Now replicate this process to create parallel processes. call MPI_INIT (ierr) c Find out MY process ID, and how many processes were started. call MPI_COMM_RANK (MPI_COMM_WORLD, my_id, ierr) call MPI_COMM_SIZE (MPI_COMM_WORLD, num_procs, ierr) if (my_id .eq. root_process) then c c I must be the root process, so I will query the user c to determine how many interpolation intervals to use. print *, "please enter the number of intervals to interpolate:" read *, num_intervals end if c Then...no matter which process I am: c c I engage in a broadcast so that the number of intervals is c sent from the root process to the other processes, and ... call MPI_BCAST (num_intervals, 1, MPI_INTEGER, root_process, & MPI_COMM_WORLD, ierr) c calculate the width of a rectangle, and rect_width = pi / num_intervals c then calculate the sum of the areas of the rectangles for c which I am responsible. Start with the (my_id +1)th c interval and process every num_procs-th interval thereafter. partial_sum = 0.0 do i = (my_id + 1), num_intervals, num_procs c Find the middle of the interval on the X-axis. x_middle = (i - 0.5) * rect_width area = dsin(x_middle) * rect_width partial_sum = partial_sum + area end do print *,"proc", my_id, "computes:", partial_sum c and finally, engage in a reduction in which all partial sums c are combined, and the grand sum appears in variable "sum" in c the root process, call MPI_REDUCE (partial_sum, sum, 1, MPI_DOUBLE_PRECISION, & MPI_SUM, root_process, MPI_COMM_WORLD, ierr) c and, if I am the root process, print the result. if (my_id .eq. root_process) then print *,'The integral is ', sum c (yes, we could have summed just the heights, and c postponed the multiplication by rect_width til now.) end if c Close down this processes. call MPI_FINALIZE (ierr) stop end
A message sent by a send-receive operation can be received by MPI_RECV and a send-receive operation can receive a message sent by an MPI_SEND.
MPI_SENDRECV( data_to_send, send_count, send_type, destination_ID, send_tag, received_data, receive_count, receive_type, sender_ID, receive_tag, comm, status)
Collective operations include just those processes identified by the communicator specified in the calls.
The communicator MPI_COMM_WORLD is defined by default for all MPI runs, and includes all processes defined by MPI_INIT during that run. Additional communicators can be defined that include all or part of those processes. For example, suppose a group of processes needs to engage in two different reductions involving disjoint sets of processes. A communicator can be defined for each subset of MPI_COMM_WORLD and specified in the two reduction calls to manage message transmission.
MPI_COMM_SPLIT can be used to create a new communicator composed of a subset of another communicator. MPI_COMM_DUP can be used to create a new communicator composed of all of the members of another communicator. This may be useful for managing interactions within a set of processes in place of message tags.
For additional information concerning these and other topics please consult:
Document prepared by:
Academic Computing Services
The University of Kansas
with assistance and overheads provided by
The National Computational Science Alliance (NCSA) at
The University of Illinois