Using an MPI channel with eXtremeDB Cluster

To use an MPI channel in a Cluster application, it must be linked with the mcoclmpi library (instead of mcocltcp). The MPI standard requires that the first MPI call in the MPI-program must be MPI_Init() or MPI_Init_thread(), and the last call must be MPI_Finalize(). Another requirement is that MPI_Init() or MPI_Init_thread() and MPI_Finalize() must be called only once per process.

The difference between MPI_Init() and MPI_Init_thread() is that MPI_Init_thread() allows the application to set the desired level of thread support. The Cluster runtime requires the MPI_THREAD_MULTIPLE level, which allows the runtime to call MPI functions from different threads without additional synchronization.

In view of the above, there are two ways to initialize the MPI in cluster applications: explicitly or implicitly. In the first case, the application explicitly calls MPI_Init_thread() / MPI_Finalize(), and passes to mco_cluster_db_open() an opaque MPI object called communicator. Communicator defines the communication context, in other words it defines which processes are involved in communications.

The MPI runtime automatically creates the communicator MPI_COMM_WORLD, which includes all the processes. There is also an MPI API, that allows creation of derived communicators. The communicator is passed to mco_cluster_db_open() via the communicator field in the mco_clnw_mpi_params_t structure. If this field is not set (or is NULL), MPI_COMM_WORLD is used.

In the second case (implicit MPI initialization) MPI_Init_thread() and MPI_Finalize() will be automatically called inside the cluster runtime and the MPI_COMM_WORLD communicator will be used. Implicit initialization is a little easier for developers and the application can still be linked with mcoclmpi, or with mcocltcp if the MPI channel is not used, without any changes.

Explicit initialization allows the user to define the lifetime of MPI. For example, if MPI is used for custom communications, unrelated to eXtremeDB Cluster. For example the following sample illustrates explicit MPI initialization:

     
    main (int argc, char * argv)
    {
        mco_cluster_params_t cl_params;
        mco_db_params_t db_params;
        mco_db_h db;
        mco_device_t devs [N];
     
        int provided;
        MPI_Init_thread (& argc, & argv, MPI_THREAD_MULTIPLE, & provided);
        mco_cluster_init ();
        mco_cluster_params_init (& cl_params);
        cl_params.nw.mpi.communicator = (void *)MPI_COMM_WORLD;
        <initialize db_params and devs>
        mco_cluster_db_open (dbName, cldb_get_dictionary (), dev, n_dev, &db_params,
                            &cl_params);
         
        /* start listener threads */
        sample_start_connected_task (& listener_task, ClusterListener,
        dbName, 0);
     
        mco_db_connect(dbName, &db);
        ...                                                                                             
        mco_cluster_stop (db);
         
        sample_join_task (& listener_task);
         
        mco_db_disconnect (db);
        mco_db_close (dbName);
        MPI_Finalize ();
        return 0;
    }
     

 

This is very similar to a normal Cluster application using TCP, with the exception of the MPI calls and setting the communicator parameter:

 
    cl_params.nw.mpi.communicator = (void *)MPI_COMM_WORLD;
            
     

Some fields of the mco_cluster_params_t structure are not mandatory for MPI channel:

uint2 n_nodes - total number of nodes in the cluster.

MPI provides the function MPI_Comm_size() which returns the number of processes in the communicator. So this Cluster parameter is optional. If n_nodes is specified and not the same as MPI_Comm_size, the error code MCO_E_NW_INVALID_PARAMETER is returned.

 

uint2 node_id – the ID of the node.

MPI provides the function MPI_Comm_rank() which returns the process ID in the communicator. So this Cluster parameter is also optional. If it is specified and not the same as MPI_Comm_rank(), MCO_E_NW_INVALID_PARAMETER is returned.

 

mco_cluster_node_params_t * nodes – the node list.

This list is also optional for MPI. It can still be used to specify the names of the processes (addr field in the mco_cluster_node_params_t structure ).

 

Also, currently MPI doesn't use the check_quorum_func and check_quorum_param fields, since MPI doesn't allow dynamic connection / disconnection of processes.

To compile applications that explicitly call MPI functions (and link with the mcoclmpi library) the wrappers (scripts) mpicc (for C) and mpicxx (for C++) must be used. These scripts, which are part of the MPI package, call gcc/g++ or another standard compiler and know how to build MPI applications specifying the proper include / library paths, macro definitions and other compiler options.

For UNIX-based systems, specifying PRJ_F_CLUSTER = MPI in the makefile will cause header.mak to automatically set CC = mpicc and CXX = mpicxx, and link the application with the mcoclmpi library.

To start MPI applications the mpirun or mpiexec commands are used. These are also the part of the MPI package. The command is:

 
    mpirun <mpirun arguments> <program> <program arguments>
            
     

The most important mpirun arguments are:

-n <nprocs> - number of processes (nodes) in the cluster

-machinefile <filename> - the name of the text file that contains a list of nodes, one node per line

For example, to run the application mpi_test on nodes nodeA and nodeB, create the file nodes with the lines:

     
    nodeA
    nodeB
     

Then execute the command:

     
    mpirun -n 2 -machinefile ./nodes ./mpi_test
            
     

As mentioned above, n_nodes and node_id are not mandatory for the MPI channel, so mpi_test can be started without command line arguments.

If an MPI channel is used, then the underlying transport (TCP, IB, shared memory, etc.) is determined by MPI tools. Usually it's a command line option for mpirun or an environment variable, but it depends on the MPI implementation. Often MPI automatically determines the "best" transport. For example, If we run multiple processes on a single physical host, MPI will use IPC (shared memory). If we run the application on different hosts that have Infiniband, MPI will use the IB transport, otherwise (without Infiniband) it will use TCP.

The limitation of MPI is that it uses a static process model. There is no convenient and standard way to handle a node's failure or to dynamically connect new nodes. The MPI implementations may provide tools or APIs for Network-Level fault tolerance, migration, etc., but these features are not covered by the standard, so they highly depend on the MPI library in use.