Synchronous vs. Asynchronous Replication
eXtremeDB High Availability allows both Synchronous and Asynchronous replication between master and replica applications. Because synchronous replication requires a successful transaction commit and
ACK
response from the replica application, the overall transaction rate is mostly defined by network round-trip delay time (RTT). For this reason it may be preferable for some applications to use asynchronous replication.Asynchronous replication uses a buffer for passing processing transactions at a time; where the master puts transaction data to this buffer and a separate application thread sends the buffer to replicas (e.g. with C API
mco_HA_async_send_data_to_replicas()
) . In the asynchronous mode, a replica does not confirm the transactions at all. Asynchronous replication is very fast, because it doesn't depend on the network latency (Round Trip Time), only on the bandwidth. But the replica will lag behind the master, because some transactions are committed on the master but not yet sent to the replica. So the maximum lag is defined by the size of the asynchronous buffer.Transaction Window
A transaction window can be specified for synchronous replication (e.g. using C APIs
mco_HA_set_trans_window_size()
andmco_HA_commit_window()
). The transaction window is a kind of сompromise between synchronous and asynchronous replication. If the size of the window is N, the replica sends anACK
for each N-th transaction. In this case, the RTT becomes less restrictive, but the replica can lag behind the master up to N transactions. (Note that the transaction window has no effect when using asynchronous replication; the asynchronous buffer is a separate memory device allocated by the application and its size is specified to optimize transaction throughput.)Mixing Synchronous and Asynchronous Replication
A single master application can implement synchronous replication with one or more replicas, and asynchronous replication with other replicas.
When the flag
MCO_HAMODE_FORCE_SYNC
is set, it turns on synchronous replication for a given replica. Typically the flag is set as follows:mco_HA_replica_params_t replica_p; ... mco_HA_replica_params_init(&replica_p); ... replica_p.mode_flags = MCO_HAMODE_REPLICA_NOTIFICATION | MCO_HAMODE_FORCE_SYNC;Note that if the master is run in synchronous mode, the setting has no effect. (See sample
haasync
componentfor implementation details.)
rplsync
Replication with a Persistent Database
It may be desirable for a replica to store data in a persistent database. In this case there are performance issues to be considered as the write processes on the replica will block the master if synchronous replication is used. To tune performance the transaction commit policy and timeout parameters on the replica must be adjusted according to application requirements. (For details on the transaction commit policies see the Persistent Database I/O page.)
If the
MCO_COMMIT_DELAYED
policy is used, the transactions are not written to disk upon themco_trans_commit()
call. The data is written to disk only if one of the thresholds is reached. The thresholds are:
log_params.delayed_commit_threshold
- size of uncommitted data (unwritten to disk)
log_params.max_delayed_transactions
- the number of unwritten transactions
log_params.max_commit_delay
– the maximum delay betweencommit()
and writing to disk (in milliseconds)For example, with the following parameter values:
db_params.log_params.delayed_commit_threshold = 64*1024; db_params.log_params.max_delayed_transactions = 10; db_params.log_params.max_commit_delay = 1000;the data will be written to disk if any of the thresholds is reached - every 1 second, or every 10 transactions, or every
64K
of changed data - whichever comes sooner.Note that these thresholds are checked only in the
mco_trans_commit()
call. For example, if you setmax_commit_delay
to 1000 (1 second) and perform a single small transaction, the data will not be written to disk until the nextmco_trans_commit()
even if it occurs after a long time (say, 10 seconds). Thus, if you insert 1000 objects, if the total size of the data (changed pages) doesn't exceed thedelayed_commit_threshold
then the changes are not written to disk.The only way to cause data to be written to disk on each transaction commit is to use the
MCO_COMMIT_SYNC_FLUSH
policy. This is the most “durable” transaction commit policy. However, not that when usingMCO_COMMIT_SYNC_FLUSH
(with synchronous replication) themco_trans_commit()
on the master returns only after the data was written to the replica's persistent media. This can be very slow, so in this case it is advisable to use as long transactions as possible (eg. insert 1000 objects in one transaction, not in 1000 small transactions). (Note that if the master database is on persistent media, the application can call tomco_disk_flush()
to flush all changes made by committed transactions. But there is currently no way to force the replica database to be flushed to disk.)It is possible to use asynchronous replication with the
MCO_COMMIT_SYNC_FLUSH
policy on the replica to avoid blocking the master. But note that in asynchronous mode the transaction data is not sent to replica immediately. Instead the data is placed in the async buffer and is sent to the replica after themco_trans_commit()
finishes. If the replica spends a lot of time committing (and the size of the data in the master's async buffer can be significant), then if the master dies for some reason, this data will be lost.Multiple Communication Channels
High Availability applications can communicate on more than one channel. The functions
mco_HA_attach_master()
andmco_HA_attach_replica()
choose the channel implementation based on connection string content. The connection string is passed to the first registered channel. If the channel implementation recognizes the string (i.e. the string has the right format for the channel implementation), it will be used. Otherwise the string is passed to the next implementation, etc., Each channel has its own unique prefix that identifies the string for the channel (the standard channels havetcp
,udp
andpipe
prefixes). When the prefix is present in the connection string, the string parsing is suspended and the appropriate channel is used.For example:
1.
:tcp:20000
- master side, TCP channel, listen port – 200002.
:udp:127.0.0.1:30000
- replica or master, UDP channel, host - 127.0.0.1, port - 30000
Each communication channel must be registered in the master and replica by calling
mco_HA_channel_implementation_add()
aftermco_HA_start()
and prior to any other HA function call. For example:mco_HA_start(); … mco_HA_channel_implementation_add( mco_nw_tcpip_vt() ); mco_HA_channel_implementation_add( mco_nw_udpip_vt() ); mco_HA_channel_implementation_add( mco_nw_pipe_vt() );And the master must create listener threads (the thread that calls
mco_HA_attach_replica()
) for each registered channel implementation. These listener threads differ only in the connection string that is passed tomco_HA_attach_replica()
.(See sample
hamultichan
for implementation details.)Replica taking over as Master
In most mission critical HA applications a replica should be able to take over for a failed master. In this case the master and replica will be copies of the same application, aware that they are operating as master or replica, with the replica able to switch roles if necessary.
The essential feature of this type of application is the implementation of code to detect if the master is running, and if so connect to it and run in replica mode. Otherwise, the master node has failed for some reason and the application must take over as master.
The following example code illustrates how a C/C++ application might implement a switch-over from replica to master:
for (i = 0; i < max_retries; ++i) { printf("Try to attach master..."); master_mode = 0; ReplicaParams.mode_flags = MCO_HAMODE_REPLICA_NOTIFICATION; rc = mco_HA_attach_master(&attach_p.db, &ReplicaParams); if (rc != MCO_S_OK) { printf("attempt #%d failed (rc=%d)\n", i + 1, rc); sample_sleep(500); } else { break; } } /* switch to MASTER mode */ /******* setup HA instance *********/ master_mode = 1; stop_flag = exit_flag; init_db = (attach_p.db == 0); if (init_db) /* attach_master not detect "old" master */ { /* create the master database */ printf("Create database for the first time\n"); rc = sample_open_database( db_name, switchdb_get_dictionary(), DATABASE_SIZE, CACHE_SIZE, MEMORY_PAGE_SIZE, PSTORAGE_PAGE_SIZE, 5, &attach_p.memory); if (rc != MCO_S_OK) { printf("Can't open database, error %d\n", rc); return 1; } /* connect to the database, obtain a database handle */ rc = mco_db_connect(db_name, &attach_p.db); if (rc != MCO_S_OK) { printf("Can't connect to database, error %d\n", rc); return 1; } } MasterParams.mode_flags = MCO_MASTER_MODE; mco_HA_set_master_params(attach_p.db, &MasterParams); // set MASTER mode sample_start_connected_task(&listen_task, ListenToReplicas, db_name, &ha ); /* start the ListenToReplicas thread*/ sample_start_connected_task(&master_task, Master, db_name, (void*) init_db); /* start the Master thread*/ sample_join_task (&listen_task); sample_join_task (&master_task); /****************** master clean up *****************/ mco_HA_stop(attach_p.db); /* detach all replicas */ }
Similarly, the following example code illustrates how a Java application might implement a switch-over from replica to master:
HASwitch() throws Exception { … // prepare replica parameters ReplicaConnection rpl_con = new ReplicaConnection(db); ReplicaConnection.Parameters replParams = new ReplicaConnection.Parameters(); // attach to master. Analog of mco_nw_attach_master try { System.out.println("Try to connect master..."); if (!rpl_con.attachMaster("localhost:" + PORT, replParams, CONNECT_TIMEOUT)) { System.err.println("Failed to connect to master : timeout"); } } catch (DatabaseError de) { System.err.println("Failed to connect to master : error " + de.errorCode); } //stop & wait working thread running = false; inspectThread.join(); System.out.println("Replica is terminated, switch to MASTER mode"); rpl_con.disconnect(); /************* Master mode **************/ MasterConnection mst_con = new MasterConnection(db); MasterConnection.Parameters mst_params = new MasterConnection.Parameters( MasterConnection.MCO_HAMODE_ASYNCH ); mst_params.maxReplicas = MAX_REPLICAS; mst_params.asyncBuf = new Database.PrivateMemoryDevice( Database.Device.Kind.AsyncBuffer, ASUNC_BUF_SIZE ); mst_con.setReplicationMode(mst_params); listening = true; // start listen and async. commit threads Thread listenThread = new Thread( new Runnable() { public void run() { listen(); } } ); Thread replicateThread = new Thread( new Runnable() { public void run() { replicate(); } }); listenThread.start(); replicateThread.start(); ////////////////////////////////////////////////////// // Insert some data in the database if needed ////////////////////////////////////////////////////// mst_con.startTransaction(Database.TransactionType.ReadWrite); … mst_con.commitTransaction(); // stop & wait threads listening = false; listenThread.join(); mst_con.stopReplication(); replicateThread.join(); mst_con.disconnect(); db.close(); System.out.println("Master completes it work"); }High Availability for Distributed Databases
Replication forms the foundation of high availability in a
sharded
eXtremeDB database. Sharding is supported though eXtremeSQL SQL by the DistributedSqlEngine. Each shard consists of an HA master and a certain number of HA replicas. (Please refer to the eXtremeSQL User Guide for further details.)Partial Replication
It may be desirable for an HA application to replicate a part but not all of the objects in a database. For this purpose, the keyword
local
in a C/C++ application schema indicates what classes are subject to the partial replication. Local class objects from the master database don't get replicated to replica nodes; and the content oflocal
class objects from the replica database does not get written into the database when it's received from the master.Local
classes on the master and replicas can be different and it is not necessary to turn on binary evolution to enforce the partial replication.C/C++ APIs
The function
mco_HA_enable_filter(mco_db_h db, mco_bool enabled)
turns on and off filtering at runtime. It should be called aftermco_db_connect()
but before callingmco_HA_attach_replica()
ormco_HA_attach_master()
. If local tables are defined, the filtering is on by default. Consequentlymco_HA_enable_filter(db, MCO_NO);
turns filtering off – effectively ignoring any local class definitions in the schema. If the schema does not definelocal
tables, the function has no effect.(See sample
samples/native/ha/hafilter
for a C/C++ example of partial replication.)Java and C#
With the C# and Java APIs
local
classes are specified using the attributelocal
and at runtime partial replication can be turned on/off by calling theenableFilter()
method of either the MasterConnection or ReplicaConnection:MasterConnection::enableFilter(boolean enabled); ReplicaConnection::enableFilter(boolean enabled);Python
The Python wrapper supports partial replication by declaring a class as
local
in the schema definition, as for C/C++ applications, then calling theload_dictionary()
method. For example:schema = ''' #define uint4 unsigned<4> #define uint4 unsigned<4> declare database filtermstdb; declare auto_oid [2000]; class T1 { uint4 key; unique tree<key> tkey; }; class T2 { uint4 key; unique tree<key> tkey; }; class T3 { uint4 key; unique tree<key> tkey; }; local class T4 { uint4 key; unique tree<key> tkey; }; ''' dict = exdb.load_dictionary(schema)Also, as with the Java and C# APIs, at runtime partial replication can be turned on/off by calling the
enableFilter()
method of either the MasterConnection or ReplicaConnection:MasterConnection::enableFilter(boolean enabled); ReplicaConnection::enableFilter(boolean enabled);Setting the Quorum
Under some circumstances it is necessary for the master to perform updates in the database only if some minimal number of replicas (
quorum
) is connected. If the number of active replicas is less than thequorum
, themco_trans_start()
ormco_trans_commit()
will return error code. This minimal number of replicas can be set via the
MCO_E_HA_NO_QUORUM
mco_HA_master_params::quorum
parameter (the default value is 0).In some cases (e.g. if the network goes down during the commit) it is not possible to determine if the last transaction was received by the replica or not. In this case the
commit()
applies changes to master's database and returns status codeMCO_S_HA_REPLICA_DETACH
. This code means that if application switches to the replica, this transaction can be missed. (Note thatMCO_S_HA_REPLICA_DETACH
is only possible ifmco_HA_master_params::quorum
is greater than 0.)The master parameter
mco_HA_master_params::quorum
sets the initial value of the HA quorum, and themco_HA_set_quorum()
C API, or the equivalent JavasetQuorum()
, C#SetQuorum()
or PythonsetQuorum()
API, is used to change the HA quorum at runtime. The default quorum value is 0, which means that any number of active replicas is acceptable (including 0).
Note that a quorum value greater than 0 is allowed only for synchronous replication mode (i.e. flag
MCO_HAMODE_ASYNCH
is not set) and if transaction window size is 1 (seemco_HA_set_trans_window_size()
).Extending Database memory
Extending the memory size for an eXtremeDB database is done by calling the core API function
mco_db_extend()
. However, because this call causes an internal write transaction, it cannot be called from the replica. So the procedure for extending database memory is to first callmco_db_extend()
from the master, then in anotifying_callback
function the replica will respond to aMCO_REPL_NOTIFY_MASTER_DB_EXTENDED
notification code by callingmco_db_extend_dev()
. The following code snippet illustrates how such thenotifying_callback
function is implemented in the replica:/* replica notification callback function */ void replica_notifying( uint2 notification_code, /* notification code */ uint4 param1, /* reserved for special cases */ void* param2, /* reserved for special cases */ void* context) /* pointer to the user-defined context */ { char *context_str = (char*)context; /* get context */ switch (notification_code) { case MCO_REPL_NOTIFY_CONNECTED: printf("\n** Notification ** Replica's been connected, “ “context = %s\n", context_str); break; case MCO_REPL_NOTIFY_DB_LOAD_OK: printf("\n** Notification ** Database's been loaded “ “successfully, context = %s\n", context_str); break; case MCO_REPL_NOTIFY_MASTER_DB_EXTENDED: { MCO_RET rc; /* Get the device size passed in param2 */ mco_size_t size = *((mco_size_t*) param2); printf("\n** Notification ** Master's database was extended, “ “extend size %d bytes, context = %s\n", (int)size, context_str); extend_dev.type = MCO_MEMORY_CONV; extend_dev.assignment = MCO_MEMORY_ASSIGN_DATABASE; extend_dev.size = size; /* allocate memory and set device pointer */ extend_dev.dev.conv.ptr = (void*)malloc( extend_dev.size ); if (extend_dev.dev.conv.ptr) { rc = mco_db_extend_dev(db_name, &extend_dev); printf("\nmco_db_extend_dev(), size %d : %s\n", DATABASE_SIZE, (rc == MCO_S_OK) ? "OK" : "FAILED" ); } break; } case MCO_REPL_NOTIFY_REPLICA_STOPPED: { const char* reason = ""; if (param1 == MCO_HA_REPLICA_MASTER_REQUESTED_DISCONNECT) { reason = "MCO_HA_REPLICA_MASTER_REQUESTED_DISCONNECT"; } printf("\n** Notification ** Replica stopped with the reason: “ “%d (%s), context = %s\n", param1, reason, context_str); break; } default: printf("\n** Notification ** Replica's been notified code = “ “%d, param1 = %u, param2 = %p, context = %s\n", notification_code, param1, param2, context_str); break; } }Note that the size of the device is passed into the notification callback through the third parameter (
param2
) as a pointer tomco_size_t
and the replica extends its storage size by callingmco_db_extend_dev()
.
HA sequencer API
Sometimes it may be necessary to determine which of the replica's databases is the most relevant. Suppose we have configuration with the master and two replicas
R1
andR2
. ThenR1
dies at some momentT1
. Later at momentT2 (> T1)
the master andR2
stop processing due to a power failure. After the cold restart of theR1
andR2
applications, it must be determined which of the nodes will be the new master. To resolve this issue,mco_HA_get_sequencer()
API can be called to return the number of the "db version" (or "current-ness" in the sense of HA). This number can be used to determine which database is the latest.
A Note on setting detach_timeout
When the master detaches the replica (explicitly or inside
mco_HA_stop()
), it sends theDETACH
message indicating the end of replication. Thedetach_timeout
is used for thissend()
operation. After sending the message (regardless of the result), the master closes the channel to the replica. Changing the timeout does not affect how quickly the replica will be disconnected; it affects the maximum amount of time inside themco_HA_detach()
call. Setting thedetach_timeout
to 0 could lead to theDETACH
message not being sent at all, and thus the replica will not receive the notification about detaching. Consequently the replica will run into the unexpected closing of the channel (and will return error codeMCO_E_NW_RECVERR
). So setting thedetach_timeout
to 0 is not recommended.