To implement a C/C++ Cluster application requires, as a minimum, calling the following functions:
MCO_RET mco_cluster_init(); MCO_RET mco_cluster_params_init(); MCO_RET mco_cluster_db_open(); MCO_RET mco_cluster_listen(); MCO_RET mco_cluster_stop();A Cluster application can register a “quorum check callback” function to verify that the necessary minimum number of cluster nodes is present before database processing begins. If no quorum callback function is registered, the cluster runtime uses the
qrankmechanism to verify a quorum, Also, a “notification callback” function can be registered to detect when nodes enter or leave the cluster.The following code snippet demonstrates a C/C++ Cluster application with quorum check and connection notification callback functions:
const char dbName [] = "clusterdb"; // Listen-thread THREAD_PROC_DEFINE (ClusterListen, p) { mco_db_h db; mco_db_connect (dbName, & db); mco_cluster_listen (db); mco_db_disconnect (db); return 0; } // Quorum check callback function. It must return MCO_YES if the node can start // (or continue) to work: // parameters: // uint2 *neighbor_ids - array of active neighbors // uint2 n_neighbors - number of active neighbors in the neighbor_ids array // void *param - user-defined context mco_bool my_check_quorum(uint2 *neighbor_ids, uint2 n_neighbors, void *param) { mco_bool result = MCO_YES; // <some logic> return result; } // Notification callback function. void cluster_notifying(mco_db_h db, mco_cluster_notification_t notification_code, mco_cluster_node_info_t *node_info, void *param, void *user_context) { mco_cluster_node_info_t nodes_info[MCO_MAX_CLUSTER_SIZE]; uint2 nd, n_nodes_info = MCO_MAX_CLUSTER_SIZE; printf("***NOTIFICATION*** Node %d (%s, qrank %d) %s.\n", node_info->node_id, node_info->addr, node_info->qrank, notification_code == MCO_CLUSTER_NOTIFY_NODE_CONNECT ? "CONNECTED" : "DISCONNECTED"); mco_cluster_get_active_nodes(db, nodes_info, &n_nodes_info); printf("List of active nodes: \n --------------------------\n"); for (nd = 0; nd < n_nodes_info; ++nd) { printf("ID %d, Addr %s, Qrank %d\n", nodes_info[nd].node_id, nodes_info[nd].addr, nodes_info[nd].qrank); } } int main () { mco_cluster_params_t cl_params; mco_cluster_node_params_t nodes [2]; mco_device_t dev [4]; unsigned int n_dev; mco_db_params_t db_params; // Initialize cluster runtime mco_cluster_init (); // Initialize cluster parameters mco_cluster_params_init(&cl_params); // Prepare dev, n_dev, db_params, as usual for mco_db_open_dev() // Fill the information about nodes strcpy (nodes[0].addr, "node1:10000") strcpy (nodes[1].addr, "node2:10000"); nodes[0].qrank = nodes [1].qrank = 1; cl_params.nodes = nodes; cl_params.n_nodes = 2 cl_params.node_id = local_id; // on node1 this value should be 0 // on node2 this value should be 1, and so on // set Quorum callback function cl_params.check_quorum_func = my_check_quorum; cl_params.check_quorum_param = 0; // don't use the context // set Notification callback function cl_params.notifying_callback = cluster_notifying; cl_params.notifying_context = 0; // don't use the context / * create cluster database * / mco_cluster_db_open (dbName, clusterdb_get_dictionary(), dev, n_dev, &db_params, &cl_params); / * Start mco_cluster_listen() thread * / createThread (ClusterListen, 0, & hListenThread); / * Work with the database as usual * / / * Stop the cluster * / mco_cluster_stop (); / * Wait for the listen-thread to stop * / THREAD_JOIN (hListenThread, res); / * Close the database * / mco_db_close (); }Note that the
struct (mco_cluster_params_tcl_params) passed tomco_cluster_db_open()contains the cluster specifications. Typically this will define the number of nodes (n_nodes), the integer identifier (node_id) of this node, and the node list, an array ofmco_cluster_node_params_tstructs (nodes) containing the node id (the array index; i.e.nodes[0]corresponds tonode_id=0,nodes[7]- tonode_id=7and so on), address andqrankof the other nodes in the cluster. It can also specify callback functions for quorum check and node notification.This sample uses a quorum check callback function
my_check_quorum(). This allows the application to validate the collection of nodes online to determine if the cluster can begin processing.
Note that when working with a cluster database, the
mco_trans_commit()function may return error code:MCO_E_CLUSTER_NOQUORUM. This happens if one or more nodes go down, and the remaining nodes do not constitute a quorum.
Also note the use of a notification callback function (
notifying_callback); so the application will receive a connection event (typenotification_code=connect/disconnect) withnode_infoanduser_context(the application's context) parameters when nodes attach to and detach from the cluster.
Note that the notification function is called in the context of a listener task. Thus it has to do whatever it does quickly, and can't perform any transactional activities or any active cluster operations. Only
mco_cluster_info()andmco_cluster_get_active_nodes()are appropriate here.
Adding a node to an active Cluster
A new node can join an existing cluster by simply calling
mco_cluster_db_open()with a uniquenode_idand theaddress:portandnode_idof one of the running nodes. For example, suppose we havenodeAandnodeBrunning as a cluster withnodeAhavingnode_id = 0andnodeBwithnode_id = 1. The newnodeNknows aboutnodeAbut doesn't know aboutnodeB. It choosesnode_id = 2and fills thenode_paramsstructure as following:strcpy(node_params[0].addr, "nodeA:10000"); strcpy(node_params[1].addr, ""); // set empty address, because don't know about nodeB strcpy(node_params[2].addr, "<my address and port>");To join the cluster
nodeNthen callsmco_cluster_db_open()as usual. Note that ifnodeNwere to accidentally choosenode_id = 1(which is occupied bynodeB),mco_cluster_db_open()will returnMCO_E_CLUSTER_DUPLICATE_NODEID. To determine the members of a cluster and theirnode_id, themco_cluster_discover()function can be helpful.A node can also call
mco_cluster_detach()/mco_cluster_attach()to leave or join the cluster. When a node joins the cluster it need only know the address of one of its neighbor nodes. It then obtains an updated node list from the cluster runtime.For example, consider a cluster with 4 nodes, A, B, C and D. Their node lists,
node_idandqranksettings are as follows:Node A : { "A", "B" },
node_id = 0,qrank = 1- NodeAindicates its own address and also knows about nodeB. No indication of theCandDnodes presence.Node B : { "A", "B"},
node_id = 1,qrank - 1 -NodeBindicates its own address and also knows about nodeA. No indication of theCandDnodes presence.Node C : { "A", "B", "C"},
node_Id = 2,qrank = 0 -NodeC"knows" about nodesAandB. Theqrank = 0means that disconnectingCdoes not reflect on the quorum.Node D : { "A", "B", "", "D"}
node_id = 3,qrank = 0 -NodeD"knows" about nodesAandB, but not whetherCis connected. Theqrank = 0means that disconnectingDdoes not reflect on the quorum.Nodes
AandBare started first, they have each other’s addresses, so they are able to communicate. Then nodeCis "attached",Cis connected toAandBandAandBare able to addCinto their cluster lists. ThenDis attached. It is first connected toAandB(the addresses of which are known toD), receives the cluster lists fromAandB(that now includeC). It is now able to connect toC.The
mco_cluster_detach()API disconnects the database from the cluster. The database immediately becomes available for both read and write access. Note thatdecrements the overall sum of clustermco_cluster_detach()qranks. For example, consider a cluster consisting of two working nodesAandBwith qranks of 1. If nodeBfails, then the cluster (nodeA) is stopped because the quorum is no longer there (theqranksum of active nodes must be greater than half of the qranks of all nodes - and 1 is not greater than 2 / 2 = 1). However, if nodeBcallsmco_cluster_detach(), nodeAis able to continue functioning "as a cluster", since this was a “voluntary detachment” as opposed to a network / node failure. Theqrankof nodeBbecomes 0, and the quorum is still there (1 > 1/2).To determine the presence of “dynamic” nodes (nodes that detach from or attach to the cluster), the
mco_cluster_get_active_nodes()API can be called at any time. Note that before callingmco_cluster_get_active_nodes(), the application should callmco_cluster_info(db,&cl_info)which returns information including the number of active nodes (cl_info.n_active_nodes). This parameter is then passed to functionmco_cluster_get_active_nodes()in order to allocate appropriate space for thenodes_infoarray.
Restricting replication
Sometimes it is beneficial to restrict automated replication to a limited set of classes. This is done by declaring some of the classes as
distributed. Thedistributedclasses are declared in theschemaand objects of these classes don’t get distributed to the cluster automatically. Instead, each node buffers (keeps an internal list) of all inserts, modifications and deletions made to itsdistributedclasses. Transactions that update the local objects don’t trigger any network-related activities.Periodically, the application will trigger replication of the
distributedobjects to the cluster by calling one of the following functions:MCO_RET mco_cluster_scatter(mco_db_h db, uint2 *class_codes, uint2 n_class_codes, uint2 *node_ids, uint2 n_node_ids); MCO_RET mco_cluster_gather(mco_db_h db, uint2 *class_codes, uint2 n_class_codes, uint2 *node_ids, uint2 n_node_ids);The application can scatter (push) updates made to the
distributedobjects to one or many cluster nodes, or gather (pull) updates made on one or many cluster nodes to a set of remotedistributedobjects. Both operations are atomic. If conflicts are detected, the entire scatter or gather operation is rolled back. Themco_cluster_scatter()function will push out updates made to its distributed tables after the last successful replication of the data. Similarly,mco_cluster_gather()will pull updates made to the remote “distributed” objects of interest after the last successful replication.For example, consider 3 nodes
A,BandCand 2distributedclassesXandYin the following scenario:1.
Acreatesx12.
Bcreatesx23.
Ascatter{X}to{B}[Asendsx1toB]4.
Bgather{X}from{A,C}[no sends becausex1is already onB]5.
Acreatesy16.
Ascatter{X,Y}to{B,C}[Asendsy1toBand (x1,y1) toC]7.
Cgather{X,Y}from{A,B}[Cgetsx2fromB(x1,y1already onC)]Because Cluster in general, and the
distributedclasses in particular, allows hardware (separate nodes ) to be added to the network to increase the overall throughput. if all data is distributed automatically, the scalability is limited because of network-related overhead. Thedistributedfeature allows segmenting data and processing those data segments locally (quickly). Yet the results of the local processing can be shared automatically via themco_cluster_scatter()andmco_cluster_gather()functions.Another method for restricting the data being replicated is by using the
localdeclaration in theschema. Objects of classes declared aslocalare never subject to replication – i.e. they are maintained in the local database for the individual node on which they are declared but this data is never distributed to other nodes.