To implement a C/C++ Cluster application requires, as a minimum, calling the following functions:
MCO_RET mco_cluster_init(); MCO_RET mco_cluster_params_init(); MCO_RET mco_cluster_db_open(); MCO_RET mco_cluster_listen(); MCO_RET mco_cluster_stop();A Cluster application can register a “quorum check callback” function to verify that the necessary minimum number of cluster nodes is present before database processing begins. If no quorum callback function is registered, the cluster runtime uses the
qrank
mechanism to verify a quorum, Also, a “notification callback” function can be registered to detect when nodes enter or leave the cluster.The following code snippet demonstrates a C/C++ Cluster application with quorum check and connection notification callback functions:
const char dbName [] = "clusterdb"; // Listen-thread THREAD_PROC_DEFINE (ClusterListen, p) { mco_db_h db; mco_db_connect (dbName, & db); mco_cluster_listen (db); mco_db_disconnect (db); return 0; } // Quorum check callback function. It must return MCO_YES if the node can start // (or continue) to work: // parameters: // uint2 *neighbor_ids - array of active neighbors // uint2 n_neighbors - number of active neighbors in the neighbor_ids array // void *param - user-defined context mco_bool my_check_quorum(uint2 *neighbor_ids, uint2 n_neighbors, void *param) { mco_bool result = MCO_YES; // <some logic> return result; } // Notification callback function. void cluster_notifying(mco_db_h db, mco_cluster_notification_t notification_code, mco_cluster_node_info_t *node_info, void *param, void *user_context) { mco_cluster_node_info_t nodes_info[MCO_MAX_CLUSTER_SIZE]; uint2 nd, n_nodes_info = MCO_MAX_CLUSTER_SIZE; printf("***NOTIFICATION*** Node %d (%s, qrank %d) %s.\n", node_info->node_id, node_info->addr, node_info->qrank, notification_code == MCO_CLUSTER_NOTIFY_NODE_CONNECT ? "CONNECTED" : "DISCONNECTED"); mco_cluster_get_active_nodes(db, nodes_info, &n_nodes_info); printf("List of active nodes: \n --------------------------\n"); for (nd = 0; nd < n_nodes_info; ++nd) { printf("ID %d, Addr %s, Qrank %d\n", nodes_info[nd].node_id, nodes_info[nd].addr, nodes_info[nd].qrank); } } int main () { mco_cluster_params_t cl_params; mco_cluster_node_params_t nodes [2]; mco_device_t dev [4]; unsigned int n_dev; mco_db_params_t db_params; // Initialize cluster runtime mco_cluster_init (); // Initialize cluster parameters mco_cluster_params_init(&cl_params); // Prepare dev, n_dev, db_params, as usual for mco_db_open_dev() // Fill the information about nodes strcpy (nodes[0].addr, "node1:10000") strcpy (nodes[1].addr, "node2:10000"); nodes[0].qrank = nodes [1].qrank = 1; cl_params.nodes = nodes; cl_params.n_nodes = 2 cl_params.node_id = local_id; // on node1 this value should be 0 // on node2 this value should be 1, and so on // set Quorum callback function cl_params.check_quorum_func = my_check_quorum; cl_params.check_quorum_param = 0; // don't use the context // set Notification callback function cl_params.notifying_callback = cluster_notifying; cl_params.notifying_context = 0; // don't use the context / * create cluster database * / mco_cluster_db_open (dbName, clusterdb_get_dictionary(), dev, n_dev, &db_params, &cl_params); / * Start mco_cluster_listen() thread * / createThread (ClusterListen, 0, & hListenThread); / * Work with the database as usual * / / * Stop the cluster * / mco_cluster_stop (); / * Wait for the listen-thread to stop * / THREAD_JOIN (hListenThread, res); / * Close the database * / mco_db_close (); }Note that the
struct (
mco_cluster_params_t
cl_params
) passed tomco_cluster_db_open()
contains the cluster specifications. Typically this will define the number of nodes (n_nodes
), the integer identifier (node_id
) of this node, and the node list, an array ofmco_cluster_node_params_t
structs (nodes) containing the node id (the array index; i.e.nodes[0]
corresponds tonode_id=0
,nodes[7]
- tonode_id=7
and so on), address andqrank
of the other nodes in the cluster. It can also specify callback functions for quorum check and node notification.This sample uses a quorum check callback function
my_check_quorum()
. This allows the application to validate the collection of nodes online to determine if the cluster can begin processing.
Note that when working with a cluster database, the
mco_trans_commit()
function may return error code:MCO_E_CLUSTER_NOQUORUM
. This happens if one or more nodes go down, and the remaining nodes do not constitute a quorum.
Also note the use of a notification callback function (
notifying_callback
); so the application will receive a connection event (typenotification_code
=connect
/disconnect
) withnode_info
anduser_context
(the application's context) parameters when nodes attach to and detach from the cluster.
Note that the notification function is called in the context of a listener task. Thus it has to do whatever it does quickly, and can't perform any transactional activities or any active cluster operations. Only
mco_cluster_info()
andmco_cluster_get_active_nodes()
are appropriate here.
Adding a node to an active Cluster
A new node can join an existing cluster by simply calling
mco_cluster_db_open()
with a uniquenode_id
and theaddress:port
andnode_id
of one of the running nodes. For example, suppose we havenodeA
andnodeB
running as a cluster withnodeA
havingnode_id = 0
andnodeB
withnode_id = 1
. The newnodeN
knows aboutnodeA
but doesn't know aboutnodeB
. It choosesnode_id = 2
and fills thenode_params
structure as following:strcpy(node_params[0].addr, "nodeA:10000"); strcpy(node_params[1].addr, ""); // set empty address, because don't know about nodeB strcpy(node_params[2].addr, "<my address and port>");To join the cluster
nodeN
then callsmco_cluster_db_open()
as usual. Note that ifnodeN
were to accidentally choosenode_id = 1
(which is occupied bynodeB
),mco_cluster_db_open()
will returnMCO_E_CLUSTER_DUPLICATE_NODEID
. To determine the members of a cluster and theirnode_id
, themco_cluster_discover()
function can be helpful.A node can also call
mco_cluster_detach()
/mco_cluster_attach()
to leave or join the cluster. When a node joins the cluster it need only know the address of one of its neighbor nodes. It then obtains an updated node list from the cluster runtime.For example, consider a cluster with 4 nodes, A, B, C and D. Their node lists,
node_id
andqrank
settings are as follows:Node A : { "A", "B" },
node_id = 0
,qrank = 1
- NodeA
indicates its own address and also knows about nodeB
. No indication of theC
andD
nodes presence.Node B : { "A", "B"},
node_id = 1
,qrank - 1 -
NodeB
indicates its own address and also knows about nodeA
. No indication of theC
andD
nodes presence.Node C : { "A", "B", "C"},
node_Id = 2
,qrank = 0 -
NodeC
"knows" about nodesA
andB
. Theqrank = 0
means that disconnectingC
does not reflect on the quorum.Node D : { "A", "B", "", "D"}
node_id = 3
,qrank = 0 -
NodeD
"knows" about nodesA
andB
, but not whetherC
is connected. Theqrank = 0
means that disconnectingD
does not reflect on the quorum.Nodes
A
andB
are started first, they have each other’s addresses, so they are able to communicate. Then nodeC
is "attached",C
is connected toA
andB
andA
andB
are able to addC
into their cluster lists. ThenD
is attached. It is first connected toA
andB
(the addresses of which are known toD
), receives the cluster lists fromA
andB
(that now includeC
). It is now able to connect toC
.The
mco_cluster_detach()
API disconnects the database from the cluster. The database immediately becomes available for both read and write access. Note thatdecrements the overall sum of cluster
mco_cluster_detach()
qranks
. For example, consider a cluster consisting of two working nodesA
andB
with qranks of 1. If nodeB
fails, then the cluster (nodeA
) is stopped because the quorum is no longer there (theqrank
sum of active nodes must be greater than half of the qranks of all nodes - and 1 is not greater than 2 / 2 = 1). However, if nodeB
callsmco_cluster_detach()
, nodeA
is able to continue functioning "as a cluster", since this was a “voluntary detachment” as opposed to a network / node failure. Theqrank
of nodeB
becomes 0, and the quorum is still there (1 > 1/2).To determine the presence of “dynamic” nodes (nodes that detach from or attach to the cluster), the
mco_cluster_get_active_nodes()
API can be called at any time. Note that before callingmco_cluster_get_active_nodes()
, the application should callmco_cluster_info(db,&cl_info)
which returns information including the number of active nodes (cl_info.n_active_nodes
). This parameter is then passed to functionmco_cluster_get_active_nodes()
in order to allocate appropriate space for thenodes_info
array.
Restricting replication
Sometimes it is beneficial to restrict automated replication to a limited set of classes. This is done by declaring some of the classes as
distributed
. Thedistributed
classes are declared in theschema
and objects of these classes don’t get distributed to the cluster automatically. Instead, each node buffers (keeps an internal list) of all inserts, modifications and deletions made to itsdistributed
classes. Transactions that update the local objects don’t trigger any network-related activities.Periodically, the application will trigger replication of the
distributed
objects to the cluster by calling one of the following functions:MCO_RET mco_cluster_scatter(mco_db_h db, uint2 *class_codes, uint2 n_class_codes, uint2 *node_ids, uint2 n_node_ids); MCO_RET mco_cluster_gather(mco_db_h db, uint2 *class_codes, uint2 n_class_codes, uint2 *node_ids, uint2 n_node_ids);The application can scatter (push) updates made to the
distributed
objects to one or many cluster nodes, or gather (pull) updates made on one or many cluster nodes to a set of remotedistributed
objects. Both operations are atomic. If conflicts are detected, the entire scatter or gather operation is rolled back. Themco_cluster_scatter()
function will push out updates made to its distributed tables after the last successful replication of the data. Similarly,mco_cluster_gather()
will pull updates made to the remote “distributed” objects of interest after the last successful replication.For example, consider 3 nodes
A
,B
andC
and 2distributed
classesX
andY
in the following scenario:1.
A
createsx1
2.
B
createsx2
3.
A
scatter{X}
to{B}
[A
sendsx1
toB
]4.
B
gather{X}
from{A,C}
[no sends becausex1
is already onB
]5.
A
createsy1
6.
A
scatter{X,Y}
to{B,C}
[A
sendsy1
toB
and (x1
,y1
) toC
]7.
C
gather{X,Y}
from{A,B}
[C
getsx2
fromB
(x1
,y1
already onC
)]Because Cluster in general, and the
distributed
classes in particular, allows hardware (separate nodes ) to be added to the network to increase the overall throughput. if all data is distributed automatically, the scalability is limited because of network-related overhead. Thedistributed
feature allows segmenting data and processing those data segments locally (quickly). Yet the results of the local processing can be shared automatically via themco_cluster_scatter()
andmco_cluster_gather()
functions.Another method for restricting the data being replicated is by using the
local
declaration in theschema
. Objects of classes declared aslocal
are never subject to replication – i.e. they are maintained in the local database for the individual node on which they are declared but this data is never distributed to other nodes.