eXtremeDB Cluster Implementation Details

Cluster Binary Schema Evolution

The Cluster runtime supports the Binary Schema Evolution (BSE) feature of eXtremeDB, thus nodes in the single cluster are able to have different database schemas. To enable binary evolution mode the application must set the BINARY_EVOLUTION flag in the mode_mask field on all nodes.

Using the C/C++ API, this is done as follows:

     
    mco_cluster_params_t cl_params;
    ...
    mco_cluster_params_init(&cl_params);
    ...
    cl_params.mode_mask |= MCO_CLUSTER_MODE_BINARY_EVOLUTION; /* use BSE */
     

In the Java API:

 
    Database.Parameters params = new Database.Parameters();
    ...
    params.clusterParams = new Database.ClusterParams(...);
    ...
    params.clusterParams.mode |= Database.CLUSTER_MODE_BINARY_EVOLUTION;
     

In the C# API:

     
    Database.Parameters parameters = new Database.Parameters();
    ...
    parameters.ClusterParameters = new Database.ClusterParams(...);
    ...
    parameters.ClusterParameters.Mode |= Database.CLUSTER_MODE_BINARY_EVOLUTION;
     

Note that if nodes A and B set the CLUSTER_MODE_BINARY_EVOLUTION flag but have exactly the same schema, the objects between these nodes are sent without any conversion. In this case the only overhead for using the binary evolution flag is the additional space allocated in the database header (at database creation time). This space is used to store the dictionaries from other nodes.

Saving and Loading a snapshot of a cluster database

A snapshot of a cluster database can be saved by calling function mco_db_save(). The ability to load a cluster database from a snapshot (image file) is enabled through the following two fields in the mco_cluster_params_t structure passed to mco_cluster_db_open():

     
    void *   	      stream_handle;
    mco_stream_read   input_stream_reader;
     

These fields correspond to two the first two parameters of the mco_db_load() API, where stream_handle is a handle to the input stream and input_stream_reader is the handler function called by the runtime to read the input.

If the values for these two parameters are NULL, then mco_cluster_db_open() calls mco_db_open_dev(), otherwise mco_db_load() is called. By default (set in mco_cluster_params_init()) these values are set to NULL.

For example, with code like the following, a cluster-based in-memory database can be opened (created) or loaded from a saved database snapshot (image file):

     
    mco_size_sig_t file_reader(void* stream_handle /* FILE*  */,
    void* to, mco_size_t max_nbytes)
    {
        return (mco_size_t) fread(to, 1, max_nbytes, (FILE*)stream_handle);
    }
    mco_size_sig_t file_writer(void* stream_handle /* FILE*  */,
     
    const void* from, mco_size_t max_nbytes)
    {
        return (mco_size_t) fwrite(from, 1, max_nbytes, (FILE*)stream_handle);
    }
     
    main()
    {
        mco_db_h db;
        mco_cluster_params_t cl_params;
        mco_cluster_params_init(&cl_params);
        <fill cl_params>
        FILE *rfile = fopen("<image_file>", "r");
        if (rfile) /* if image exists, load database */
        { 
            cl_params.stream_handle       = rfile;
            cl_params.input_stream_reader = &file_reader;
        }
     
        mco_cluster_db_open("<db_name>",...., &cl_params);
        <start mco_cluster_listen() thread>
        mco_db_connect("<db_name>", &db);
     
        <... Do some work ...>
        mco_cluster_stop(db);
     
        /* save database */
        FILE *wfile = fopen("<image_file>", "w");
        mco_db_save(wfile, &file_writer, db);
        <close database>
    }
     

Cluster Transaction Window

The transaction window in the eXtremeDB Cluster allows an application to merge transaction data from several commit threads into a single send() operation reducing the number of system calls. For small transactions this can significantly improve the overall performance in the cluster database. The C APIs mco_cluster_get_window_params() and mco_cluster_set_window_params() are used to retrieve the current window parameters and to set new values.

The transaction window has three threshold values specified in structure mco_cluster_window_t : bsize, length and timeout. The first one (bsize) defines the maximum size of the "buffered" transaction data in bytes, length is the maximum number of buffered transactions and timeout is the maximum delay ( in milliseconds) before sending the buffered data to other nodes. The buffered data will be sent as soon as any of these thresholds is reached. So the number of buffered transactions is never more than mco_cluster_window_t::length and the delay is never more than mco_cluster_window_t::timeout.

The initial parameters of the window are set via the mco_cluster_params_t::window passed to the C API mco_cluster_db_open(). The default values are length=0, bsize=0 and timeout=1, which means that the window is not used because the maximum number of buffered transactions (length) is zero. If one of the cluster nodes sets the window length to a value greater than 0, all other nodes also must use the length greater than 0, but the values may be different on the different nodes. (Note that a window length = 1 is roughly the same as lenght = 0). The value of the window bsize can be 0, which means "unlimited data size".

Note that normally the window length should not exceed the number of active write threads - otherwise the data will always be sent by the timeout threshold. For example, the window is absolutely useless if the application has only one write thread. Usually the best results are reached if the window length is in the range of 50%-75% times the number of writers. The value of the window bsize can be used to limit the size of buffered transaction data in the case of "large" transactions.

Cluster and Transaction Logging

In some cases it may seem desirable to optimize eXtremeDB Cluster performance for persistent databases by instead using the eXtremeDB Transaction Logging feature which provides a possible alternative to using persistent database classes. In order to enable eXtremeDB Transaction Logging (TL), the cluster mode flag MCO_CLUSTER_MODE_START_DETACHED must be used to create a cluster database that is not immediately connected to other nodes. This makes it possible to add processing between the moment the database is created and the moment the node is connected to the cluster. Specifically, when TL is enabled, the transaction log can be applied to the cluster database by calling mco_translog_apply().

For example, the following code opens a cluster database in detached mode, applies the transaction log, and then connects to other cluster nodes:

     
    main()
    {
        mco_db_h db;
        mco_cluster_params_t cl_params;
        mco_cluster_params_init(&cl_params);
     
        <fill cl_params>
        FILE *rfile = fopen(image_path, "r");
        if (rfile) /* load image if exists */
        { 
            cl_params.stream_handle       = rfile;
            cl_params.input_stream_reader = &file_reader;
        }
     
        /* start in detached mode */
        cl_params.mode_mask |= MCO_CLUSTER_MODE_START_DETACHED;
        mco_cluster_db_open(db_name,...., &cl_params);
        mco_db_connect(db_name, &db);
         
        /* Apply transaction log */
        mco_translog_apply(db, log_path, MCO_TRANSLOG_ALL_LABELS);
         
        /* Connect to other cluster nodes */
        mco_cluster_attach(db, &cl_params);
        <start mco_cluster_listen() thread>
         
        /* Start transaction logging */
        mco_translog_start(db, log_path, tl_start_data);
         
        /* Save snapshot to ensure all remote transactions are saved */
        FILE *wfile = fopen(image_path, "w");
        mco_translog_db_save(wfile, &file_writer, db));
         
        <... Do usual work ...>
        mco_cluster_stop(db);
        mco_translog_stop(db);
        <close database>
    }
     

 

Distributed Databases - Sharding

Sharding and clustering are two different concepts that exist for different purposes and are architecturally different. Sharding implements database partitioning, while a Cluster, in its default configuration, does not partition the database but rather replicates everything.

Cluster can be used to implement redundancy in a sharding database. In brief, each shard can be not a single node, but a cluster. The implementation is similar to HA Replication as described in the eXtremeDB High Availability User's Guide, with the difference that any node can act as a master; i.e. receive SQL statements and replicate the changes to other nodes in the shard.

When an eXtremeDB database is distributed using eXtremeDB Cluster the following conditions apply:

 

Cluster Runtime Open Flags

Following are explanations for 3 flags that can be specified in the mco_cluster_params.mode_mask to affect specific Cluster runtime behavior:

1) MCO_CLUSTER_MODE_EARLY_DATA_SEND

Normally, the cluster runtime sends transaction data to other nodes after the first phase of a commit on the initiator has completed successfully. This

allows the runtime to reduce network traffic for transactions that failed locally (for example, due to a uniqueness violation). On the other hand, sending data earlier, before the local phase_1, can reduce the total time to perform distributed transaction. Flag MCO_CLUSTER_MODE_EARLY_DATA_SEND forces the cluster runtime to send data before phase_1.

 

2. MCO_CLUSTER_MODE_BINARY_EVOLUTION - this flag allows nodes to join with different schemas into a single cluster. Also note that this flag prohibits any DDL

statements in the cluster.

 

3. MCO_CLUSTER_MODE_START_DETACHED

Normally, mco_cluster_db_open() creates a database, connects to other cluster's nodes and synchronizes the database content. The MCO_CLUSTER_MODE_START_DETACHED flag tells mco_cluster_db_open() to just create the database and skip any network operations. To add this node to the cluster, the application must call mco_cluster_attach() later.