eXtremeDB Cluster Applications in Python

Python cluster support is a thin layer over the top of eXtremeDB Cluster. To implement a cluster application with Python, an array of Cluster nodes is initialized then the exdb.open_database() method is called and a Listener thread is created for each node.

Before creating the cluster, the Python runtime must load the cluster-enabled eXtremeDB runtime. This is done by passing parameter cluster = True to method exdb.init_runtime(). (Note that only the MVCC transaction manager is supported for Cluster.)

 
    exdb.init_runtime(is_disk, 'mvcc', is_shm, is_debug, cluster = True)
     

Then each node is created by calling exdb.open_database(). For example:

 
    db = exdb.open_database(dbname='clusterdb_%s' % node_id, dictionary=dict, clusterParams=cluster_params,
                    is_disk=is_disk, db_segment_size=128*1024*1024);
                     

Here the database name dbname is made unique by concatenating a unique node_id. The parameter clusterParams is an instance of the Python class exdb.ClusterParams which corresponds with the C API structure mco_cluster_params_t . The clusterParams could be initialized as follows to use different ports on the localhost:

 
    nodes = [exdb.ClusterNodeParams("127.0.0.1:2001"),
            exdb.ClusterNodeParams("127.0.0.1:2002"),
            exdb.ClusterNodeParams("127.0.0.1:2003")]
 
    cluster_params = exdb.ClusterParams(nwtype = 'TCP', nodeId = int(node_id), 
                    nodes = nodes, notifyCallback=onNotify)
     

The ClusterNodeParams class has the following constructor:

 
    def __init__(self, addr, qrank = 1):
        self.addr = addr
        self.qrank = qrank
         

where addr is the TCP address of node and qrank is quorum rank for this node.

 

ClusterParams

The constructor for class exdb.ClusterParams is defined as:

 
    def __init__(self, nodeId, nwtype, nodes = [], nwparams = None, MPICluster = False,
            window = None, conn_pool_factor = 50, sync_msg_objects = 100, sync_msg_size = 0,
            notifyCallback = None, quorumCallback = None,
            clusterSendBuf = 0, clusterRecvBuf = 0, mode_mask = 0):
             

where the elements are as follows:

nodeId

The integer identifier of this cluster node. Translated to the node_id cluster parameter.

nwtype

The type of cluster. Can be either 'TCP' or 'MPI'.

nodes

The list of nodes in the cluster. Each node is represented as an instance of class ClusterNodeParams.

nwparams

Optional network parameters. A dictionary, setting network parameters for the cluster. keys for the dictionary for a TCP cluster:

  • "connectTimeout"
  • "connectInterval"
  • "socketSendBuf"
  • "socketRecvBuf"
  • "socketDomain"
  • "keepAliveTime"
  • "keepAliveProbes"
  • "ssl_params"

If ssl_params is specified an SSL layer is used; the parameters for setting an SSL encrypted connection is a dictionary consisting of:

  • "cipher_list" - string
  • "max_cert_list" - long
  • "verify_mode" - int
  • "verify_depth" - long
  • "cert_file_pem" - string
  • "pkey_file_pem" - string

An example of nwparams parameter usage:

 
    nwparams = {"connectTimeout" : 15000, "keepAliveTime" : 600}
     

MPICluster

Set to True if MPI channel is used.

window

An object of type ClusterWindow which is declared as follows:

 
    class ClusterWindow(object):
        def __init__(self, bsize = 0, length = 0, timeout = 1):
                self.bsize   = bsize
                self.length  = length
                self.timeout = timeout
                 

conn_pool_factor

An integer value specifying the size of the connection pool (as a percent of db_max_connections).

sync_msg_objects

An integer value specifying the maximum number of objects per message during synchronization.

sync_msg_size

An integer value specifying the maximum size of a message in bytes during synchronization.

notifyCallback

Python callback functions to receive cluster notifications. For example:

 
    notifications = ["connect", "disconnect"]
     
    def onNotify(notification_code, node_info):
        print 'Cluster notification: code %s info %s' % (notifications[notification_code], node_info)
     

quorumCallback

A callback function to be called when a Cluster Quorum is not reached

clusterSendBuf

An integer value specifying the internal send buffer size in bytes.

clusterRecvBuf

An integer value specifying the internal receive buffer size in bytes.

mode_mask

A cluster mode mask (mco_cluster_params_t::mode_mask); combination of debug_output (MCO_CLUSTER_MODE_DEBUG_OUTPUT) and

early_data_send (MCO_CLUSTER_MODE_EARLY_DATA_SEND).

 

Example

The following code snippet (from sample samples/python/cluster) demonstrates how a simple Python Cluster application performs these steps:

     
    class ListenThread(threading.Thread):
        def __init__(self, db):
            self.db = db
            super(ListenThread, self).__init__()
 
        def run(self):
            print "Listen thread started"
            con = self.db.connect()
            print "Listen thread connected:", con
            con.listen()
            con.close()
        }
     
    ...
    notifications = ["connect", "disconnect"]
 
    def onNotify(notification_code, node_info):
        print 'Cluster notification: code %s info %s' % (notifications[notification_code], node_info)
     
    ...
     
    def start_node(n_nodes, node_id, q = None):
    ...
 
    exdb.init_runtime(is_disk, 'mvcc', is_shm, is_debug, cluster = True)
    db = None
    try:
        # use different ports on the localhost
        nodes = [exdb.ClusterNodeParams("127.0.0.1:200%s" % (10 + i * 10)) for i in xrange(n_nodes)]
 
        cluster_params = exdb.ClusterParams(nwtype = 'TCP', nodeId = int(node_id), 
                            nodes = nodes, notifyCallback=onNotify)
 
        dict = exdb.load_dictionary(schema, persistent=is_disk, debug=is_debug)
        db = exdb.open_database(dbname='clusterdb_%s' % node_id, dictionary=dict, 
                    clusterParams=cluster_params,
                    is_disk=is_disk, db_segment_size=128*1024*1024);
        print 'Node %s: Database opened' % node_id
 
        listenThread = ListenThread(db)
        listenThread.start()
 
        print 'Node %s: ListenThread started' % node_id
        with db.connect() as con:
            print 'Node %s: Database connected' % node_id
            startTime = time.time()
            doWork(con, n_nodes, node_id)
            stopTime = time.time()
            print 'Node %s: Work finished' % node_id
            con.stop() # stop cluster runtime. Returns after ALL nodes call stop()
            stopAllTime = time.time()
            print 'Node %s: Connection stopped' % node_id
             
            listenThread.join()
            ...
    finally:
        if not db is None:
        db.close()
    return