Dynamic Schema Modification

With eXtremeDB version 7.1 and later it is possible to modify an SQL database schema through the standard SQL DDL statements create table/index, drop table/index and alter table/index. (Please refer to the eXtremeSQL User’s Guide for further details about SQL DDL.)

Replica-side replay of Dynamic Schema Modification

This enhancement stems from a request to register and propagate master-side CRUD events to the replica-side application when the database schema on the master is dynamically modified. This is accomplished by implementing an application-side iterator callback that lets the eXtremeDB High Availability database runtime publish information regarding the database modifications to the applications in JSON format. The iterator is available for all supported host languages, and as well can be enabled in the xSQL configuration file.

C API

The replica side mco_HA_replica_params_t is extended with two additional fields:

     
    mco_trans_iterator_callback_t iterator;
    void                         *iterator_context;
     

Where:

    typedef MCO_RET (*mco_trans_iterator_callback_t)(mco_trans_h trans,
    MCO_Hf* obj, int cid, int obj_state, void* user_ctx);
     

JSON output

Though applications can use the generic format for the callback function and implement custom output format, implemented and included into eXtremeDB High Availability distributions is an iterator that represents the database contents in JSON format (RFC 7159). The JSON converter synopsis as follows:

     
    mco_trans_iterator_h mco_create_json_converter(mco_stream_h stream,
    mco_json_converter_params_t *params);
     

The callback arguments are as follows:

The parameters struct is defined as follows:

 
    typedef struct {
        mco_bool         compact;
        mco_bool         ignore_stream_errors;
        MCO_RET          last_error;
        int              last_errno;
    } mco_json_converter_params_t;
     

Where:

The optional parameters are initialized with default values by calling:

     
    void mco_json_converter_params_init(mco_json_converter_params_t *params);
     

If successful, the function has allocated and returned a pointer to the mco_trans_iterator_t structure (which is a common structure for all future iterator callback implementations) defined as follows:

     
    typedef struct mco_trans_iterator_t {
        mco_trans_iterator_callback_t callback;
        MCO_RET                       last_error;
        int                           last_errno;
    } mco_trans_iterator_t, *mco_trans_iterator_h;
     

Note that the mco_trans_iterator_t represents a header, similar to a base class in C++ terminology. Any custom iterator adds its own custom fields that are allocated beyond the header.

Following is an example:

 
    mco_trans_iterator_h json;
    mco_stream_h stream;
    mco_json_converter_params_t json_params; // declare parameters
    stream = ... <create stream, see below> ...
     
    void mco_json_converter_params_init(&json_params); // initialize parameters
    json_params.compact = true; // change default parameter value
    json = mco_create_json_converter(stream, &json_params); // create JSON iterator
     
    if (! json)
    {
        // Handle errors
    }
     
    mco_HA_replica_params_t replica_params;
    mco_HA_replica_params_init(&replica_params);
    replica_params.iterator         = json->callback; // set callback function
    replica_params.iterator_context = json;           // set context
    mco_HA_attach_master(db, conn_string, &replica_params, stop_reason, timeout);
     

Database objects rendered to the JSON object stream are separated by \0 (zero is not present in JSON). Therefore the stream can be processed as in the following code snippet (Java is used for simplicity):

     
    InputStream fis = (new java.net.Socket("127.0.0.1", 5566)).getInputStream();
    java.util.Scanner s = new java.util.Scanner(fis).useDelimiter("\\\0");
    while (s.hasNext())
    {
        JSONObject obj = new JSONObject(s.next());
        .....
    }
     

Each JSON object includes an operation field, which can be assigned one of the following values:

In addition JSON objects for OBJ_NEW, OBJ_UPDATE, OBJ_DELETE and DELETE_ALL operations include a table field. The value of the field indicates the table (class name) the affected object(s) belongs to. For the OBJ_NEW, OBJ_UPDATE and OBJ_DELETE operations an object field is included into the JSON object. The object field layout (the structure and data fields types) corresponds to the database object layout. Note the following:

1. Database arrays and vectors are represented via JSON arrays

2. Nested structures are represented as separate JSON objects

3. It is assumed that char and string database fields contain zero-terminated strings in the UTF-8 format

4. nchar and nstrings database field values are zero-terminated UTF-16 strings

5. wchar and wstring values are zero-terminated UTF-32 strings

6. BLOBs are represented as base64 JSON strings

7. NULL values and missing optional structure values are represented through the JSON -integrated literal null

For example, for the following SQL layout and content:

 
    create table a (i int, j int, s string);
    insert into a values (1,2,'test');
     

The following JSON would be generated:

     
    {
    "operation":"SCHEMA_CHANGED"
    }
    {
    "operation":"TRANS_BEGIN"
    }
    {
        "operation":"OBJ_NEW",
        "table":"a",
        "object":{
        "auto_oid@":1,
        "i":1,
        "j":2,
        "s":"test"
        }
    }
    {
        "operation":"TRANS_END"
    }
     

Extended eXtremeDB Streams

The eXtremeDB runtime customarily uses streams to output the database content to an external media and to import data into the database from an external media or a storage device. The eXtremeDB export / import APIs, mco_db_save() and mco_db_load() and some other functions use read- and write-streams to import / export data. The iterator extends stream functions with the ability to retain and then return error information from the stream back to the calling application or database runtime. The stream is represented through the mco_stream_h structure defined as follows:

     
    typedef struct mco_stream_t {
        mco_stream_write writer;
        MCO_RET          (*close) (struct mco_stream_t *self);
        MCO_RET          last_error;
        int              last_errno;
    } mco_stream_t, *mco_stream_h;
     

Here:

     
    typedef mco_size_sig_t(*mco_stream_write)(void* stream_handle,
                            const void* from, mco_size_t nbytes);
     

Currently implemented are file-based, server_socket - based, client_socket –based and tee_stream -based stream functions.

File-based stream

File streams are created with either of the following functions:

 
    mco_stream_h mco_create_mcofile_stream(const char *filename,
                            mco_file_stream_params_t *params);
     

or

 
    mco_stream_h mco_create_stdfile_stream(const char *filename,
                            mco_file_stream_params_t *params);
     

The first function makes use of the eXtremeDB fs library through the mco_file_h handle, while the second uses the standard I/O (stdio.h) FILE* handle. Currently the only required parameter is the filename, which indicates the file name to write to, it must not be NULL. There are no optimal parameters at present, and the stream parameters structure only includes fields where to return errors received from the stream:

 
    typedef struct {
        MCO_RET          last_error;
        int              last_errno;
    } mco_file_stream_params_t;
 

However in the future some additional optional parameters (such as encryption) are possible, so the following function is called to initialize them:

 
    void mco_file_stream_params_init(mco_file_stream_params_t *params);
     

Following is a usage example:

 
    mco_stream_h stream;
    mco_file_stream_params_t file_params;
    mco_file_stream_params_init(&file_params);
    stream = mco_create_stdfile_stream("myfile.txt", &file_params);
     

or

 
    mco_stream_h stream = mco_create_stdfile_stream("myfile.txt", 0);
     

Server-socket stream

The following function is called to create a server-socket stream:

     
    mco_stream_h mco_create_server_socket_stream(int port, mco_server_socket_stream_params_t *params);
     

The only required parameter is the port number to listen on. Optional parameters are defined by the following structure:

     
    typedef struct {
        timer_unit        write_timeout;
        mco_size_t        buffer_size;
        mco_size_t        max_clients;
        const char       *net_interface;
        mco_sock_params_t sock_params;
        MCO_RET           last_error;
        int               last_errno;
    } mco_server_socket_stream_params_t;
     

Where:

Client-socket stream

The following function is called to create a client-socket stream:

     
    mco_stream_h mco_create_client_socket_stream(const char *hostname, int port,
    mco_client_socket_stream_params_t *params);
     

The required arguments are hostname (the name and IP addresses) and port (the port number). Optional parameters are defined by the following structure:

     
    typedef struct {
        timer_unit        write_timeout;
        timer_unit        connect_timeout;
        int               connect_attempts;
        timer_unit        connect_interval;
        mco_bool          auto_reconnect;
        mco_size_t        buffer_size;
        mco_sock_params_t sock_params;
        MCO_RET           last_error;
        int               last_errno;
    } mco_client_socket_stream_params_t;
     

Where:

Default values are initialized by calling:

     
    void mco_client_socket_stream_params_init(mco_client_socket_stream_params_t *params);
     

Following is a usage example:

     
    mco_stream_h stream;
    mco_client_socket_stream_params_t stream_params;
    mco_client_socket_stream_params_init(&stream_params);
    stream_params.connect_timeout = 3000; // change connect timeout
    // connect to myhost.com:10023
    stream = mco_create_client_socket_stream("myhost.com", 10023, &stream_params);
     

or

     
    // connect to myhost.com:10023
    mco_stream_h stream = mco_create_client_socket_stream("myhost.com", 10023, 0);
     

Tee stream

A tee-stream allows the iterator to write into two separate output streams simultaneously. For example one of the output streams can be a socket, while the other could be a file. Tee streams can be nested.

The following function is called to create a tee-stream:

     
    mco_stream_h mco_create_tee_stream(mco_stream_h stream1,
    mco_stream_h stream2, mco_bool any_ok);
     

Where:

Following is a usage example:

     
    // create file stream
    mco_stream_h file_stream   = mco_create_stdfile_stream("file.txt", 0);
     
    // create socket stream
    mco_stream_h socket_stream = mco_create_client_socket_stream("myhost.com",
                                        10023, 0);
    // join file and socket stream
    mco_stream_h tee_stream    = mco_create_tee_stream(file_stream, socket_stream, false);
     
    // create JSON iterator writing to both file and socket
    mco_trans_iterator_h json  = mco_create_json_converter(tee_stream, 0);
     

Custom stream

It is possible to inherit the functionality of an existing stream implementations and create a custom stream handle mco_stream_h based on the existing stream_handle and stream_writer:

     
    mco_stream_h mco_create_custom_stream(void* stream_handle,
                    mco_stream_write output_stream_writer,
                    MCO_RET(*close)(void *));
     

Where:

Following is a usage example:

     
    static mco_size_sig_t file_writer(void* stream_handle, const void* from, mco_size_t nbytes)
    {
        FILE *f = (FILE *) stream_handle;
        return fwrite(from, 1, nbytes, f);
    }
         
    FILE *f = fopen(filename, "w");
    mco_stream_h stream = mco_create_custom_stream(f, file_writer,(MCO_RET (*)(void*)) fclose);
         

Notes

     
    MCO_RET mco_destroy_stream(mco_stream_h s);
     
     
    MCO_RET mco_destroy_trans_iterator(mco_trans_iterator_h t);
     

For example:

 
    // create file stream
    mco_stream_h file_stream   = mco_create_stdfile_stream("file.txt", 0);
     
    // create socket stream
    mco_stream_h socket_stream = mco_create_client_socket_stream("myhost.com", 10023, 0);
     
    // join file and socket stream
    mco_stream_h tee_stream    = mco_create_tee_stream(file_stream,
    socket_stream, false);
     
    // create JSON iterator writing to both file and socket
    mco_trans_iterator_h json  = mco_create_json_converter(tee_stream, 0);
     
    //... Use iterator ...
    mco_destroy_trans_iterator(json);
     
    // Don't call mco_destroy_stream() for file_stream, socket_stream, tee_stream
     
     
    #define MCO_STDIO_FILE_STREAM 0
     

 

Finally, the "new style" streams can also be used with the eXtremeDB API functions written for the "old style" streams. The following snippet illustrates how to connect to the server and export the database content using the mco_db_save() API and the "new style" socket stream:

     
    mco_stream_h socket_stream = mco_create_client_socket_stream("myhost.com", 10023, 0);
    if (socket_stream)
    {
        mco_db_save(socket_stream, socket_stream->writer, connection);
        mco_destroy_stream(socket_stream);
    }
     

Java API

The Java-based iterator makes use of the standard Java OutputStream to output database content. The iterators themselves are defined in the TransIterator class:

     
    public class TransIterator {
    public abstract static class Iterator
    {
        public void close();
    }
     
    public static class JsonConverter extends Iterator
    {
        public JsonConverter(java.io.OutputStream os, boolean compact,
                    boolean ignoreStreamErrors);
                     
    public JsonConverter(java.io.OutputStream os);
    }
}

Here the TransIterator.Iterator represents the base class for all interior types and TransIterator.JsonConverter implements the JSON-based iterator.

Following is an example for the JSON iterator writing into a file:

     
    ReplicaConnection con = new ReplicaConnection(db);
    ReplicaConnection.Parameters replParams = new ReplicaConnection.Parameters();
    replParams.iterator = new TransIterator.JsonConverter(new java.io.FileOutputStream("myfile.txt"), false, false);
    con.attachMaster(connectionString, replParams, CONNECT_TIMEOUT);
    replParams.iterator.close();
     

C# API

Similar to Java the standard .NET Framework input/output streams are used:

     
    public class TransIterator
    {
        public abstract class Iterator
        {
            public Iterator(Connection con);
            public void Close();
        }
        public class JsonIterator : Iterator
        {
            public JsonIterator(Connection con, System.IO.Stream s, bool compact,
            bool ignore_stream_errors);
            public JsonIterator(Connection con, System.IO.Stream s);
        }
    }
     

Following is an example for the JSON iterator writing into a file:

     
    ReplicaConnection con = new ReplicaConnection(db);
    ReplicaConnection.Parameters replParams = new ReplicaConnection.Parameters();
    replParams.Iterator = new TransIterator.JsonIterator(con, new System.IO.FileStream("myfile.txt", System.IO.FileMode.Create));
    con.AttachMaster(connectionString, replParams, CONNECT_TIMEOUT);
    replParams.Iterator.Close();
     

Python API

The Python API is a wrapper around the C language mco_stream_h. As such most Python methods and their parameters are the same as the corresponding C functions.

To create a file-stream:

 
    def create_file_stream(filename)
     

To create a Server (listen=True) and client stream (listen= False):

     
    def create_socket_stream(hostname, port, listen = False,
    write_timeout = -1, buffer_size = -1, max_clients = -1,
    connect_timeout = -1, connect_attempts = -1, c
    onnect_interval = -1, reconnect = -1, socket_domain = -1 )
     

Note that for the server type socket the hostname can be None. If the hostname is not equal to None, then it corresponds to the mco_server_socket_stream_params_t::net_interface. The buffer_size, write_timeout, socket_domain parameters are common for both the client and the server sockets, while the max_clients parameter is defined only for the server, and connect_timeout, connect_attempts, connect_interval, reconnect only for the client sockets.

Following is an example for the JSON/Tee stream usage:

     
    db = exdb.open_database(dbname="rpldb", dictionary=dict, is_disk=is_disk,
    log_params=log_params)
    sock_stream = exdb.create_socket_stream("myhost.com", 10023, listen=False,
    write_timeout=200);
    file_stream = exdb.create_file_stream("out1.txt");
    tee_stream  = exdb.create_tee_stream(sock_stream, file_stream);
    json_ierator = exdb.create_json_iterator(tee_stream);
    replcon = exdb.ReplicaConnection(db)
    params = exdb.ReplicaConnectionParameters()
    params.iterator = json_iterator;
    replcon.attachMaster(ha_replica_connstr, params, CONNECT_TIMEOUT):
    exdb.destroy_trans_iterator(params.iterator);
     

xSQL API

For xSQL the iterator is described in the configuration file in the form of a JSON object. Each iterator contains a type field, which currently must be set to json. Other fields and their values depend on the iterator type. For the json type the fields are stream, compact (optional, boolean) and ignore_stream_errors (optional, boolean).

The Stream is also defined in the configuration file. The type of the stream can be file, socket or tee. Other fields and their values depend on the stream type.

For type file:
     
    {
        type : "file",
        name : "myfile" # file name
    }
     
    *type: "socket", listen: true (server-side socket:
    {
        type : "socket",
        listen : true,
        port   : 10000, # listen port
     
        buffer_size : 1k,  # optional, mco_server_socket_stream_params_t::buffer_size
        max_clients : 3,   # optional, mco_server_socket_stream_params_t::max_clients
        net_interface : 192.168.0.3, # optional, mco_server_socket_stream_params_t::net_interface
        write_timeout : 1000,  # optional, mco_server_socket_stream_params_t::write_timeout
         
        sock_params : {   # optional
            domain : "inet" # other options are "local" or "sdp",
                        # mco_server_socket_stream_params_t::sock_params::domain
            mode   : ["nodelay"], # combination of "nodelay", "do_not_reuse_address",
                        # "non_blocking", "keep_alive" or
                        # "do_not_cloexec", mco_server_socket stream_params_t::sock_params::mode
            sndbuf : 16k, # mco_server_socket_stream_params_t::sock_params::sndbuf
            use_ssl : false, # apply "ssl_params" for this socket or not
        }
    }
     

For type socket, listen: false (client socket):

 
    {
        type : "socket",
        listen : false,
        hostname : "192.168.0.1" # host to connect to,
        port     : 10023 # port to connect
        auto_reconnect : true,  # optional, mco_client_socket_stream_params_t::auto_reconnect
        buffer_size : 1k,   # optional, mco_client_socket_stream_params_t::buffer_size
        connect_attempts : 3,   # optional, mco_client_socket_stream_params_t::connect_attempts
        connect_interval : 1000, # optional, mco_client_socket_stream_params_t::connect_attempts
        connect_timeout : 2000, # optional, mco_client_socket_stream_params_t::connect_timeout
        write_timeout : 500, # optional, mco_client_socket_stream_params_t::write_timeout
         
        sock_params : { optional,
        domain : "inet" # other options are "local" or "sdp",
                    # mco_server_socket_stream_params_t::sock_params::domain
        mode   : ["nodelay"], # combination of "nodelay", "do_not_reuse_address",
                    # "non_blocking", "keep_alive" or
                    # "do_not_cloexec", mco_server_socket_stream_params_t::sock_params::mode
        sndbuf : 16k, # mco_server_socket_stream_params_t::sock_params::sndbuf
        use_ssl : false, # apply "ssl_params" for this socket or not
        }
    }
     

For type tee:

     
    {
        type : "tee",
        stream1 : {
            ...
        }
        stream2 : {
            ...
        }
    }
     

For example, the following configuration defines a replica-side JSON iterator that writes the file called myfile.txt and into a socket 127.0.0.1:10023

     
    ha_params : {
        connection_strings : "127.0.0.1:10000",
         
        replica_params : {
            iterator : {
            type : "json",
            compact : false,
            stream : {
                type   : "tee",
                stream1 : {
                    type     : "socket",
                    listen   : false,
                    hostname : "127.0.0.1",
                    port     : 10023,
                },
     
                stream2 : {
                    type : "file",
                    name : "myfile.txt"
                }
             }
          }
       }
    }