eXtremeDB Cluster Applications in C/C++

To implement a C/C++ Cluster application requires, as a minimum, calling the following functions:

 
    MCO_RET mco_cluster_init();
    MCO_RET mco_cluster_params_init();
    MCO_RET mco_cluster_db_open();
    MCO_RET mco_cluster_listen();
    MCO_RET mco_cluster_stop();
     

A Cluster application can register a “quorum check callback” function to verify that the necessary minimum number of cluster nodes is present before database processing begins. If no quorum callback function is registered, the cluster runtime uses the qrank mechanism to verify a quorum, Also, a “notification callback” function can be registered to detect when nodes enter or leave the cluster.

The following code snippet demonstrates a C/C++ Cluster application with quorum check and connection notification callback functions:

     
    const char dbName [] = "clusterdb";
     
    // Listen-thread
    THREAD_PROC_DEFINE (ClusterListen, p)
    {
        mco_db_h db;
        mco_db_connect (dbName, & db);
        mco_cluster_listen (db);
            
        mco_db_disconnect (db);
        return 0;
    }
     
    // Quorum check callback function. It must return MCO_YES if the node can start
    // (or continue) to work:
    // parameters:
    // uint2 *neighbor_ids - array of active neighbors
    // uint2 n_neighbors - number of active neighbors in the neighbor_ids array
    // void *param - user-defined context
    mco_bool my_check_quorum(uint2 *neighbor_ids, uint2 n_neighbors, void *param) 
    {
        mco_bool result = MCO_YES;
        //    <some logic>
        return result;
    }
     
    // Notification callback function.
    void cluster_notifying(mco_db_h db, mco_cluster_notification_t notification_code,
                        mco_cluster_node_info_t *node_info,
                        void *param, void *user_context) 
    {
                         
        mco_cluster_node_info_t nodes_info[MCO_MAX_CLUSTER_SIZE];
        uint2 nd, n_nodes_info = MCO_MAX_CLUSTER_SIZE;
        printf("***NOTIFICATION*** Node %d (%s, qrank %d) %s.\n", node_info->node_id,
        node_info->addr, node_info->qrank,
        notification_code == MCO_CLUSTER_NOTIFY_NODE_CONNECT ?
        "CONNECTED" : "DISCONNECTED");
         
        mco_cluster_get_active_nodes(db, nodes_info, &n_nodes_info);
        printf("List of active nodes: \n --------------------------\n");
         
        for (nd = 0; nd < n_nodes_info; ++nd) 
        {
            printf("ID %d, Addr %s, Qrank %d\n", nodes_info[nd].node_id,
            nodes_info[nd].addr, nodes_info[nd].qrank);
        }
    }
     
    int main ()
    {
        mco_cluster_params_t cl_params;
        mco_cluster_node_params_t nodes [2];
        mco_device_t dev [4];
        unsigned int n_dev;
        mco_db_params_t db_params;
         
        // Initialize cluster runtime
        mco_cluster_init ();
            
         
        // Initialize cluster parameters
        mco_cluster_params_init(&cl_params);
            
     
        // Prepare dev, n_dev, db_params, as usual for mco_db_open_dev()
     
        // Fill the information about nodes
        strcpy (nodes[0].addr, "node1:10000")
        strcpy (nodes[1].addr, "node2:10000");
        nodes[0].qrank = nodes [1].qrank = 1;
        cl_params.nodes = nodes;
        cl_params.n_nodes = 2
        cl_params.node_id = local_id; // on node1 this value should be 0
         
        // on node2 this value should be 1, and so on
     
        // set Quorum callback function
        cl_params.check_quorum_func  = my_check_quorum;
        cl_params.check_quorum_param = 0; // don't use the context
         
        // set Notification callback function
        cl_params.notifying_callback  = cluster_notifying;
        cl_params.notifying_context = 0; // don't use the context
         
        / * create cluster database * /
        mco_cluster_db_open (dbName,
            
                    clusterdb_get_dictionary(),
                    dev,
                    n_dev,
                    &db_params,
                    &cl_params);
                     
        / * Start mco_cluster_listen() thread * /
        createThread (ClusterListen, 0, & hListenThread);
         
        / * Work with the database as usual * /
        / * Stop the cluster * /
        mco_cluster_stop ();
            
         
        / * Wait for the listen-thread to stop * /
        THREAD_JOIN (hListenThread, res);
     
        / * Close the database * /
        mco_db_close ();
    }
     

Note that the mco_cluster_params_t struct (cl_params) passed to mco_cluster_db_open() contains the cluster specifications. Typically this will define the number of nodes (n_nodes), the integer identifier (node_id) of this node, and the node list, an array of mco_cluster_node_params_t structs (nodes) containing the node id (the array index; i.e. nodes[0] corresponds to node_id=0, nodes[7] - to node_id=7 and so on), address and qrank of the other nodes in the cluster. It can also specify callback functions for quorum check and node notification.

This sample uses a quorum check callback function my_check_quorum(). This allows the application to validate the collection of nodes online to determine if the cluster can begin processing.

Note that when working with a cluster database, the mco_trans_commit() function may return error code: MCO_E_CLUSTER_NOQUORUM. This happens if one or more nodes go down, and the remaining nodes do not constitute a quorum.

 

Also note the use of a notification callback function (notifying_callback); so the application will receive a connection event (type notification_code = connect / disconnect) with node_info and user_context (the application's context) parameters when nodes attach to and detach from the cluster.

Note that the notification function is called in the context of a listener task. Thus it has to do whatever it does quickly, and can't perform any transactional activities or any active cluster operations. Only mco_cluster_info() and mco_cluster_get_active_nodes() are appropriate here.

 

Adding a node to an active Cluster

A new node can join an existing cluster by simply calling mco_cluster_db_open() with a unique node_id and the address:port and node_id of one of the running nodes. For example, suppose we have nodeA and nodeB running as a cluster with nodeA having node_id = 0 and nodeB with node_id = 1. The new nodeN knows about nodeA but doesn't know about nodeB. It chooses node_id = 2 and fills the node_params structure as following:

     
    strcpy(node_params[0].addr, "nodeA:10000");
    strcpy(node_params[1].addr, ""); // set empty address, because don't know about nodeB
    strcpy(node_params[2].addr, "<my address and port>");
     

To join the cluster nodeN then calls mco_cluster_db_open() as usual. Note that if nodeN were to accidentally choose node_id = 1 (which is occupied by nodeB), mco_cluster_db_open() will return MCO_E_CLUSTER_DUPLICATE_NODEID. To determine the members of a cluster and their node_id, the mco_cluster_discover() function can be helpful.

A node can also call mco_cluster_detach() / mco_cluster_attach() to leave or join the cluster. When a node joins the cluster it need only know the address of one of its neighbor nodes. It then obtains an updated node list from the cluster runtime.

For example, consider a cluster with 4 nodes, A, B, C and D. Their node lists, node_id and qrank settings are as follows:

Node A : { "A", "B" }, node_id = 0, qrank = 1 - Node A indicates its own address and also knows about node B. No indication of the C and D nodes presence.

Node B : { "A", "B"}, node_id = 1, qrank - 1 - Node B indicates its own address and also knows about node A. No indication of the C and D nodes presence.

Node C : { "A", "B", "C"}, node_Id = 2, qrank = 0 - Node C "knows" about nodes A and B. The qrank = 0 means that disconnecting C does not reflect on the quorum.

Node D : { "A", "B", "", "D"} node_id = 3, qrank = 0 - Node D "knows" about nodes A and B, but not whether C is connected. The qrank = 0 means that disconnecting D does not reflect on the quorum.

Nodes A and B are started first, they have each other’s addresses, so they are able to communicate. Then node C is "attached", C is connected to A and B and A and B are able to add C into their cluster lists. Then D is attached. It is first connected to A and B (the addresses of which are known to D), receives the cluster lists from A and B (that now include C). It is now able to connect to C.

The mco_cluster_detach() API disconnects the database from the cluster. The database immediately becomes available for both read and write access. Note that mco_cluster_detach() decrements the overall sum of cluster qranks. For example, consider a cluster consisting of two working nodes A and B with qranks of 1. If node B fails, then the cluster (node A) is stopped because the quorum is no longer there (the qrank sum of active nodes must be greater than half of the qranks of all nodes - and 1 is not greater than 2 / 2 = 1). However, if node B calls mco_cluster_detach(), node A is able to continue functioning "as a cluster", since this was a “voluntary detachment” as opposed to a network / node failure. The qrank of node B becomes 0, and the quorum is still there (1 > 1/2).

To determine the presence of “dynamic” nodes (nodes that detach from or attach to the cluster), the mco_cluster_get_active_nodes() API can be called at any time. Note that before calling mco_cluster_get_active_nodes(), the application should call mco_cluster_info(db,&cl_info) which returns information including the number of active nodes (cl_info.n_active_nodes). This parameter is then passed to function mco_cluster_get_active_nodes() in order to allocate appropriate space for the nodes_info array.

 

Restricting replication

Sometimes it is beneficial to restrict automated replication to a limited set of classes. This is done by declaring some of the classes as distributed . The distributed classes are declared in the schema and objects of these classes don’t get distributed to the cluster automatically. Instead, each node buffers (keeps an internal list) of all inserts, modifications and deletions made to its distributed classes. Transactions that update the local objects don’t trigger any network-related activities.

Periodically, the application will trigger replication of the distributed objects to the cluster by calling one of the following functions:

     
    MCO_RET mco_cluster_scatter(mco_db_h db, uint2 *class_codes, uint2 n_class_codes,
                    uint2 *node_ids, uint2 n_node_ids);
    MCO_RET mco_cluster_gather(mco_db_h db, uint2 *class_codes, uint2 n_class_codes,
                    uint2 *node_ids, uint2 n_node_ids);
     

The application can scatter (push) updates made to the distributed objects to one or many cluster nodes, or gather (pull) updates made on one or many cluster nodes to a set of remote distributed objects. Both operations are atomic. If conflicts are detected, the entire scatter or gather operation is rolled back. The mco_cluster_scatter() function will push out updates made to its distributed tables after the last successful replication of the data. Similarly, mco_cluster_gather() will pull updates made to the remote “distributed” objects of interest after the last successful replication.

For example, consider 3 nodes A, B and C and 2 distributed classes X and Y in the following scenario:

1. A creates x1

2. B creates x2

3. A scatter {X} to {B} [A sends x1 to B]

4. B gather {X} from {A,C} [no sends because x1 is already on B]

5. A creates y1

6. A scatter {X,Y} to {B,C} [A sends y1 to B and (x1,y1) to C]

7. C gather {X,Y} from {A,B} [C gets x2 from B (x1,y1 already on C)]

Because Cluster in general, and the distributed classes in particular, allows hardware (separate nodes ) to be added to the network to increase the overall throughput. if all data is distributed automatically, the scalability is limited because of network-related overhead. The distributed feature allows segmenting data and processing those data segments locally (quickly). Yet the results of the local processing can be shared automatically via the mco_cluster_scatter() and mco_cluster_gather() functions.

Another method for restricting the data being replicated is by using the local declaration in the schema. Objects of classes declared as local are never subject to replication – i.e. they are maintained in the local database for the individual node on which they are declared but this data is never distributed to other nodes.