Python cluster support is a thin layer over the top of eXtremeDB Cluster. To implement a cluster application with Python, an array of Cluster nodes is initialized then the
exdb.open_database()
method is called and a Listener thread is created for each node.Before creating the cluster, the Python runtime must load the cluster-enabled eXtremeDB runtime. This is done by passing parameter
cluster = True
to methodexdb.init_runtime()
. (Note that only the MVCC transaction manager is supported for Cluster.)exdb.init_runtime(is_disk, 'mvcc', is_shm, is_debug, cluster = True)Then each node is created by calling
exdb.open_database()
. For example:db = exdb.open_database(dbname='clusterdb_%s' % node_id, dictionary=dict, clusterParams=cluster_params, is_disk=is_disk, db_segment_size=128*1024*1024);Here the database name
dbname
is made unique by concatenating a uniquenode_id
. The parameterclusterParams
is an instance of the Python class exdb.ClusterParams which corresponds with the C API structure mco_cluster_params_t . TheclusterParams
could be initialized as follows to use different ports on the localhost:nodes = [exdb.ClusterNodeParams("127.0.0.1:2001"), exdb.ClusterNodeParams("127.0.0.1:2002"), exdb.ClusterNodeParams("127.0.0.1:2003")] cluster_params = exdb.ClusterParams(nwtype = 'TCP', nodeId = int(node_id), nodes = nodes, notifyCallback=onNotify)The ClusterNodeParams class has the following constructor:
def __init__(self, addr, qrank = 1): self.addr = addr self.qrank = qrankwhere
addr
is the TCP address of node andqrank
is quorum rank for this node.
ClusterParams
The constructor for class exdb.ClusterParams is defined as:
def __init__(self, nodeId, nwtype, nodes = [], nwparams = None, MPICluster = False, window = None, conn_pool_factor = 50, sync_msg_objects = 100, sync_msg_size = 0, notifyCallback = None, quorumCallback = None, clusterSendBuf = 0, clusterRecvBuf = 0, mode_mask = 0):where the elements are as follows:
nodeId
The integer identifier of this cluster node. Translated to the
node_id
cluster parameter.nwtype
The type of cluster. Can be either 'TCP' or 'MPI'.
nodes
The list of nodes in the cluster. Each node is represented as an instance of class ClusterNodeParams.
nwparams
Optional network parameters. A dictionary, setting network parameters for the cluster. keys for the dictionary for a TCP cluster:
- "connectTimeout"
- "connectInterval"
- "socketSendBuf"
- "socketRecvBuf"
- "socketDomain"
- "keepAliveTime"
- "keepAliveProbes"
- "ssl_params"
If
ssl_params
is specified an SSL layer is used; the parameters for setting an SSL encrypted connection is a dictionary consisting of:
- "cipher_list" - string
- "max_cert_list" - long
- "verify_mode" - int
- "verify_depth" - long
- "cert_file_pem" - string
- "pkey_file_pem" - string
An example of
nwparams
parameter usage:nwparams = {"connectTimeout" : 15000, "keepAliveTime" : 600}MPICluster
Set to
True
if MPI channel is used.window
An object of type ClusterWindow which is declared as follows:
class ClusterWindow(object): def __init__(self, bsize = 0, length = 0, timeout = 1): self.bsize = bsize self.length = length self.timeout = timeoutconn_pool_factor
An integer value specifying the size of the connection pool (as a percent of
).
db_max_connections
sync_msg_objects
An integer value specifying the maximum number of objects per message during synchronization.
sync_msg_size
An integer value specifying the maximum size of a message in bytes during synchronization.
notifyCallback
Python callback functions to receive cluster notifications. For example:
notifications = ["connect", "disconnect"] def onNotify(notification_code, node_info): print 'Cluster notification: code %s info %s' % (notifications[notification_code], node_info)quorumCallback
A callback function to be called when a Cluster Quorum is not reached
clusterSendBuf
An integer value specifying the internal send buffer size in bytes.
clusterRecvBuf
An integer value specifying the internal receive buffer size in bytes.
mode_mask
A cluster mode mask (
); combination of
mco_cluster_params_t::mode_mask
debug_output
(MCO_CLUSTER_MODE_DEBUG_OUTPUT
) and
early_data_send
(MCO_CLUSTER_MODE_EARLY_DATA_SEND
).
Example
The following code snippet (from sample
samples/python/cluster
) demonstrates how a simple Python Cluster application performs these steps:class ListenThread(threading.Thread): def __init__(self, db): self.db = db super(ListenThread, self).__init__() def run(self): print "Listen thread started" con = self.db.connect() print "Listen thread connected:", con con.listen() con.close() } ... notifications = ["connect", "disconnect"] def onNotify(notification_code, node_info): print 'Cluster notification: code %s info %s' % (notifications[notification_code], node_info) ... def start_node(n_nodes, node_id, q = None): ... exdb.init_runtime(is_disk, 'mvcc', is_shm, is_debug, cluster = True) db = None try: # use different ports on the localhost nodes = [exdb.ClusterNodeParams("127.0.0.1:200%s" % (10 + i * 10)) for i in xrange(n_nodes)] cluster_params = exdb.ClusterParams(nwtype = 'TCP', nodeId = int(node_id), nodes = nodes, notifyCallback=onNotify) dict = exdb.load_dictionary(schema, persistent=is_disk, debug=is_debug) db = exdb.open_database(dbname='clusterdb_%s' % node_id, dictionary=dict, clusterParams=cluster_params, is_disk=is_disk, db_segment_size=128*1024*1024); print 'Node %s: Database opened' % node_id listenThread = ListenThread(db) listenThread.start() print 'Node %s: ListenThread started' % node_id with db.connect() as con: print 'Node %s: Database connected' % node_id startTime = time.time() doWork(con, n_nodes, node_id) stopTime = time.time() print 'Node %s: Work finished' % node_id con.stop() # stop cluster runtime. Returns after ALL nodes call stop() stopAllTime = time.time() print 'Node %s: Connection stopped' % node_id listenThread.join() ... finally: if not db is None: db.close() return