With eXtremeDB version 7.1 and later it is possible to modify an SQL database schema through the standard SQL DDL statements
create table/index
,drop table/index
andalter table/index
. (Please refer to the eXtremeSQL User’s Guide for further details about SQL DDL.)Replica-side replay of Dynamic Schema Modification
This enhancement stems from a request to register and propagate master-side
CRUD
events to the replica-side application when the database schema on the master is dynamically modified. This is accomplished by implementing an application-side iterator callback that lets the eXtremeDB High Availability database runtime publish information regarding the database modifications to the applications inJSON
format. The iterator is available for all supported host languages, and as well can be enabled in thexSQL
configuration file.C API
The replica side
mco_HA_replica_params_t
is extended with two additional fields:mco_trans_iterator_callback_t iterator; void *iterator_context;Where:
- Iterator is the application’s callback function that the database runtime calls when
CRUD
and some other events happen. The prototype for the callback is defined as the following inmco.h
:typedef MCO_RET (*mco_trans_iterator_callback_t)(mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx);
iterator_context
is the application's context passed into the callback through its last parameteruser_ctx
\JSON output
Though applications can use the generic format for the callback function and implement custom output format, implemented and included into eXtremeDB High Availability distributions is an iterator that represents the database contents in
JSON
format (RFC 7159). TheJSON
converter synopsis as follows:mco_trans_iterator_h mco_create_json_converter(mco_stream_h stream, mco_json_converter_params_t *params);The callback arguments are as follows:
stream
is a stream to writeJSON
values to. Must not beNULL
params
is an optional parameters struct as defined below (It can beNULL
, in which case the default values described below are used:The
parameters
struct is defined as follows:typedef struct { mco_bool compact; mco_bool ignore_stream_errors; MCO_RET last_error; int last_errno; } mco_json_converter_params_t;Where:
compact
If set to true (default) a compactJSON
is generated (with no spaces). false generates human-readableJSON
format.ignore_stream_errors
indicates whether to ignore errors while writingJSON
to a stream. If set to false (default), any stream error causes the HA replication to stop-- themco_HA_attach_master()
returnsMCO_E_WRITE_STREAM
error and stopReason is set toMCO_HA_REPLICA_ITERATOR_ERROR
. If true, errors are ignored and replication continues.last_error
OUT: serves to returnMCO_E_
... errors from themco_create_json_converter()
function.last_errno
OUT: serves to return an OS-level error (errno) from themco_create_json_converter()
function.The optional parameters are initialized with default values by calling:
void mco_json_converter_params_init(mco_json_converter_params_t *params);If successful, the function has allocated and returned a pointer to the
mco_trans_iterator_t
structure (which is a common structure for all future iterator callback implementations) defined as follows:typedef struct mco_trans_iterator_t { mco_trans_iterator_callback_t callback; MCO_RET last_error; int last_errno; } mco_trans_iterator_t, *mco_trans_iterator_h;Note that the mco_trans_iterator_t represents a
header
, similar to abase class
in C++ terminology. Any custom iterator adds its own custom fields that are allocated beyond the header.Following is an example:
mco_trans_iterator_h json; mco_stream_h stream; mco_json_converter_params_t json_params; // declare parameters stream = ... <create stream, see below> ... void mco_json_converter_params_init(&json_params); // initialize parameters json_params.compact = true; // change default parameter value json = mco_create_json_converter(stream, &json_params); // create JSON iterator if (! json) { // Handle errors } mco_HA_replica_params_t replica_params; mco_HA_replica_params_init(&replica_params); replica_params.iterator = json->callback; // set callback function replica_params.iterator_context = json; // set context mco_HA_attach_master(db, conn_string, &replica_params, stop_reason, timeout);Database objects rendered to the
JSON
object stream are separated by\0
(zero is not present inJSON
). Therefore the stream can be processed as in the following code snippet (Java is used for simplicity):InputStream fis = (new java.net.Socket("127.0.0.1", 5566)).getInputStream(); java.util.Scanner s = new java.util.Scanner(fis).useDelimiter("\\\0"); while (s.hasNext()) { JSONObject obj = new JSONObject(s.next()); ..... }Each JSON object includes an operation field, which can be assigned one of the following values:
TRANS_BEGIN
– A transaction was startedTRANS_END
- Transaction was completed via arollback
orcommit
OBJ_NEW
- A new object was created inside a transactionOBJ_UPDATE
- An existing object was modified (inside a transaction)OBJ_DELETE
- An object was deleted inside a transactionDELETE_ALL
- All object from a class were erased (inside a transaction)DB_CLEAN
- Master application executedmco_db_clean()
outside a transactionSCHEMA_CHANGED
- The database schema was modified outside a transactionIn addition JSON objects for
OBJ_NEW
,OBJ_UPDATE
,OBJ_DELETE
andDELETE_ALL
operations include a table field. The value of the field indicates the table (class name) the affected object(s) belongs to. For theOBJ_NEW
,OBJ_UPDATE
andOBJ_DELETE
operations an object field is included into theJSON
object. The object field layout (the structure and data fields types) corresponds to the database object layout. Note the following:1. Database arrays and vectors are represented via
JSON
arrays2. Nested structures are represented as separate
JSON
objects3. It is assumed that
char
andstring
database fields contain zero-terminated strings in theUTF-8
format4.
nchar
andnstrings
database field values are zero-terminatedUTF-16
strings5.
wchar
andwstring
values are zero-terminatedUTF-32
strings6.
BLOB
s are represented as base64JSON
strings7.
NULL
values and missing optional structure values are represented through theJSON
-integrated literal nullFor example, for the following SQL layout and content:
create table a (i int, j int, s string); insert into a values (1,2,'test');The following
JSON
would be generated:{ "operation":"SCHEMA_CHANGED" } { "operation":"TRANS_BEGIN" } { "operation":"OBJ_NEW", "table":"a", "object":{ "auto_oid@":1, "i":1, "j":2, "s":"test" } } { "operation":"TRANS_END" }Extended eXtremeDB Streams
The eXtremeDB runtime customarily uses streams to output the database content to an external media and to import data into the database from an external media or a storage device. The eXtremeDB export / import APIs,
mco_db_save()
andmco_db_load()
and some other functions use read- and write-streams to import / export data. The iterator extends stream functions with the ability to retain and then return error information from the stream back to the calling application or database runtime. The stream is represented through themco_stream_h
structure defined as follows:typedef struct mco_stream_t { mco_stream_write writer; MCO_RET (*close) (struct mco_stream_t *self); MCO_RET last_error; int last_errno; } mco_stream_t, *mco_stream_h;Here:
writer
is a standard eXtremeDBstream_writer
:typedef mco_size_sig_t(*mco_stream_write)(void* stream_handle, const void* from, mco_size_t nbytes);
close
represents the streamdestructor
last_error
areMCO_E
_... errorslast_errno
are OS-level errno valuesCurrently implemented are file-based, server_socket - based, client_socket –based and tee_stream -based stream functions.
File-based stream
File streams are created with either of the following functions:
mco_stream_h mco_create_mcofile_stream(const char *filename, mco_file_stream_params_t *params);or
mco_stream_h mco_create_stdfile_stream(const char *filename, mco_file_stream_params_t *params);The first function makes use of the eXtremeDB
fs
library through themco_file_h
handle, while the second uses the standard I/O (stdio.h
)FILE*
handle. Currently the only required parameter is the filename, which indicates the file name to write to, it must not beNULL
. There are no optimal parameters at present, and the stream parameters structure only includes fields where to return errors received from the stream:typedef struct { MCO_RET last_error; int last_errno; } mco_file_stream_params_t;However in the future some additional optional parameters (such as encryption) are possible, so the following function is called to initialize them:
void mco_file_stream_params_init(mco_file_stream_params_t *params);Following is a usage example:
mco_stream_h stream; mco_file_stream_params_t file_params; mco_file_stream_params_init(&file_params); stream = mco_create_stdfile_stream("myfile.txt", &file_params);or
mco_stream_h stream = mco_create_stdfile_stream("myfile.txt", 0);Server-socket stream
The following function is called to create a server-socket stream:
mco_stream_h mco_create_server_socket_stream(int port, mco_server_socket_stream_params_t *params);The only required parameter is the port number to listen on. Optional parameters are defined by the following structure:
typedef struct { timer_unit write_timeout; mco_size_t buffer_size; mco_size_t max_clients; const char *net_interface; mco_sock_params_t sock_params; MCO_RET last_error; int last_errno; } mco_server_socket_stream_params_t;Where:
write_timeout
is the write socket timeout in milliseconds The default is 1000 (1 second)buffer_size
indicates the size of the buffer accumulating data to pass to the socketsend()
function. In bytes, the default is16*1024
max_clients
- the maximum number of concurrently connected clients. The default is16
net_interface
- network interface address to listen on. The default is0.0.0.0
-- all interfacessock_params
- socket parameters:type
,domain
,SSL
, etc.,last_error
- OUT: serves to returnMCO_E_
... errors from themco_create_server_socket_stream()
functionlast_errno
- OUT: serves to return OS-level error (errno) from themco_create_server_socket_stream()
functionClient-socket stream
The following function is called to create a client-socket stream:
mco_stream_h mco_create_client_socket_stream(const char *hostname, int port, mco_client_socket_stream_params_t *params);The required arguments are
hostname
(the name and IP addresses) andport
(the port number). Optional parameters are defined by the following structure:typedef struct { timer_unit write_timeout; timer_unit connect_timeout; int connect_attempts; timer_unit connect_interval; mco_bool auto_reconnect; mco_size_t buffer_size; mco_sock_params_t sock_params; MCO_RET last_error; int last_errno; } mco_client_socket_stream_params_t;Where:
write_timeout
- write socket timeout in milliseconds The default is 1000 (1 second)connect_timeout
- a timeout for theconnect()
function in milliseconds. The default is 2000 (2 seconds)connect_attempts
- A number of attempts to callconnect()
until the connection has been established. By default 3connect_interval
- how long to wait before attempting toconnect()
again after unsuccessful attempt. By default no wait –0buffer_size
- indicates the size of the buffer accumulating data to pass to the socketsend()
function. In bytes, the default is16*1024
sock_params
- socket parameters: type, domain,SSL
, etc.,last_error
- OUT: serves to returnMCO_E_
... errors from themco_create_client_socket_stream()
functionlast_errno
- OUT: serves to return OS-level error (errno) from themco_create_client_socket_stream()
functionDefault values are initialized by calling:
void mco_client_socket_stream_params_init(mco_client_socket_stream_params_t *params);Following is a usage example:
mco_stream_h stream; mco_client_socket_stream_params_t stream_params; mco_client_socket_stream_params_init(&stream_params); stream_params.connect_timeout = 3000; // change connect timeout // connect to myhost.com:10023 stream = mco_create_client_socket_stream("myhost.com", 10023, &stream_params);or
// connect to myhost.com:10023 mco_stream_h stream = mco_create_client_socket_stream("myhost.com", 10023, 0);Tee stream
A tee-stream allows the iterator to write into two separate output streams simultaneously. For example one of the output streams can be a socket, while the other could be a file. Tee streams can be nested.
The following function is called to create a tee-stream:
mco_stream_h mco_create_tee_stream(mco_stream_h stream1, mco_stream_h stream2, mco_bool any_ok);Where:
stream1
– the first output streamstream2
- the second output streamany_ok
- if set to true, the write is deemed successful if the data was written successfully into any of the output streams. If set to false, both writes must have succeededFollowing is a usage example:
// create file stream mco_stream_h file_stream = mco_create_stdfile_stream("file.txt", 0); // create socket stream mco_stream_h socket_stream = mco_create_client_socket_stream("myhost.com", 10023, 0); // join file and socket stream mco_stream_h tee_stream = mco_create_tee_stream(file_stream, socket_stream, false); // create JSON iterator writing to both file and socket mco_trans_iterator_h json = mco_create_json_converter(tee_stream, 0);Custom stream
It is possible to inherit the functionality of an existing stream implementations and create a custom stream handle
mco_stream_h
based on the existingstream_handle
andstream_writer
:mco_stream_h mco_create_custom_stream(void* stream_handle, mco_stream_write output_stream_writer, MCO_RET(*close)(void *));Where:
stream_handle
–stream contextstream_writer
- the stream functionclose
- streamdestructor
, can beNULL
Following is a usage example:
static mco_size_sig_t file_writer(void* stream_handle, const void* from, mco_size_t nbytes) { FILE *f = (FILE *) stream_handle; return fwrite(from, 1, nbytes, f); } FILE *f = fopen(filename, "w"); mco_stream_h stream = mco_create_custom_stream(f, file_writer,(MCO_RET (*)(void*)) fclose);Notes
- In case of errors all the above
mco_create_XXX_stream()
functions returnNULL
and set the appropriate error codes inparams->last_error
andparams_last_errno
(if defined)- If a stream is destroyed, all nested streams are destroyed as well. For example for the tee stream:
MCO_RET mco_destroy_stream(mco_stream_h s);
- When the iterator is destroyed, all dependent resources (streams) are also destroyed:
MCO_RET mco_destroy_trans_iterator(mco_trans_iterator_h t);For example:
// create file stream mco_stream_h file_stream = mco_create_stdfile_stream("file.txt", 0); // create socket stream mco_stream_h socket_stream = mco_create_client_socket_stream("myhost.com", 10023, 0); // join file and socket stream mco_stream_h tee_stream = mco_create_tee_stream(file_stream, socket_stream, false); // create JSON iterator writing to both file and socket mco_trans_iterator_h json = mco_create_json_converter(tee_stream, 0); //... Use iterator ... mco_destroy_trans_iterator(json); // Don't call mco_destroy_stream() for file_stream, socket_stream, tee_stream
- The iterator and the
mco_stream_h
are implemented in thetarget/mcoiter
library. This library makes use only of the eXtremeDB public API (mco.h
,mcouda.h
andmconet.h
). In addition the standard I/O (stdio.h
) is utilized for themco_create_stdfile_stream()
. The use of the stdio can be turned off through the pound-defineMCO_STDIO_FILE_STREAM
in thetarget/mcoiter/mcoiter.c
file:#define MCO_STDIO_FILE_STREAM 0
Finally, the "new style" streams can also be used with the eXtremeDB API functions written for the "old style" streams. The following snippet illustrates how to connect to the server and export the database content using the
mco_db_save()
API and the "new style" socket stream:mco_stream_h socket_stream = mco_create_client_socket_stream("myhost.com", 10023, 0); if (socket_stream) { mco_db_save(socket_stream, socket_stream->writer, connection); mco_destroy_stream(socket_stream); }Java API
The Java-based iterator makes use of the standard Java OutputStream to output database content. The iterators themselves are defined in the TransIterator class:
public class TransIterator { public abstract static class Iterator { public void close(); } public static class JsonConverter extends Iterator { public JsonConverter(java.io.OutputStream os, boolean compact, boolean ignoreStreamErrors); public JsonConverter(java.io.OutputStream os); } }Here the TransIterator.Iterator represents the base class for all interior types and TransIterator.JsonConverter implements the JSON-based iterator.
Following is an example for the JSON iterator writing into a file:
ReplicaConnection con = new ReplicaConnection(db); ReplicaConnection.Parameters replParams = new ReplicaConnection.Parameters(); replParams.iterator = new TransIterator.JsonConverter(new java.io.FileOutputStream("myfile.txt"), false, false); con.attachMaster(connectionString, replParams, CONNECT_TIMEOUT); replParams.iterator.close();C# API
Similar to Java the standard .NET Framework
input/output
streams are used:public class TransIterator { public abstract class Iterator { public Iterator(Connection con); public void Close(); } public class JsonIterator : Iterator { public JsonIterator(Connection con, System.IO.Stream s, bool compact, bool ignore_stream_errors); public JsonIterator(Connection con, System.IO.Stream s); } }Following is an example for the
JSON
iterator writing into a file:ReplicaConnection con = new ReplicaConnection(db); ReplicaConnection.Parameters replParams = new ReplicaConnection.Parameters(); replParams.Iterator = new TransIterator.JsonIterator(con, new System.IO.FileStream("myfile.txt", System.IO.FileMode.Create)); con.AttachMaster(connectionString, replParams, CONNECT_TIMEOUT); replParams.Iterator.Close();Python API
The Python API is a wrapper around the C language
mco_stream_h
. As such most Python methods and their parameters are the same as the corresponding C functions.To create a file-stream:
def create_file_stream(filename)To create a Server
(listen=True)
and client stream(listen= False)
:def create_socket_stream(hostname, port, listen = False, write_timeout = -1, buffer_size = -1, max_clients = -1, connect_timeout = -1, connect_attempts = -1, c onnect_interval = -1, reconnect = -1, socket_domain = -1 )Note that for the server type socket the
hostname
can be None. If the hostname is not equal to None, then it corresponds to themco_server_socket_stream_params_t::net_interface
. Thebuffer_size
,write_timeout
,socket_domain
parameters are common for both the client and the server sockets, while themax_clients
parameter is defined only for the server, andconnect_timeout
,connect_attempts
,connect_interval
, reconnect only for the client sockets.Following is an example for the
JSON/Tee
stream usage:db = exdb.open_database(dbname="rpldb", dictionary=dict, is_disk=is_disk, log_params=log_params) sock_stream = exdb.create_socket_stream("myhost.com", 10023, listen=False, write_timeout=200); file_stream = exdb.create_file_stream("out1.txt"); tee_stream = exdb.create_tee_stream(sock_stream, file_stream); json_ierator = exdb.create_json_iterator(tee_stream); replcon = exdb.ReplicaConnection(db) params = exdb.ReplicaConnectionParameters() params.iterator = json_iterator; replcon.attachMaster(ha_replica_connstr, params, CONNECT_TIMEOUT): exdb.destroy_trans_iterator(params.iterator);xSQL API
For
xSQL
the iterator is described in the configuration file in the form of aJSON
object. Each iterator contains a type field, which currently must be set tojson
. Other fields and their values depend on the iterator type. For thejson
type the fields arestream
,compact
(optional, boolean) andignore_stream_errors
(optional, boolean).The Stream is also defined in the configuration file. The type of the stream can be
For typefile
,socket
ortee
. Other fields and their values depend on the stream type.file
:{ type : "file", name : "myfile" # file name } *type: "socket", listen: true (server-side socket: { type : "socket", listen : true, port : 10000, # listen port buffer_size : 1k, # optional, mco_server_socket_stream_params_t::buffer_size max_clients : 3, # optional, mco_server_socket_stream_params_t::max_clients net_interface : 192.168.0.3, # optional, mco_server_socket_stream_params_t::net_interface write_timeout : 1000, # optional, mco_server_socket_stream_params_t::write_timeout sock_params : { # optional domain : "inet" # other options are "local" or "sdp", # mco_server_socket_stream_params_t::sock_params::domain mode : ["nodelay"], # combination of "nodelay", "do_not_reuse_address", # "non_blocking", "keep_alive" or # "do_not_cloexec", mco_server_socket stream_params_t::sock_params::mode sndbuf : 16k, # mco_server_socket_stream_params_t::sock_params::sndbuf use_ssl : false, # apply "ssl_params" for this socket or not } }For type
socket
,listen: false (client socket)
:{ type : "socket", listen : false, hostname : "192.168.0.1" # host to connect to, port : 10023 # port to connect auto_reconnect : true, # optional, mco_client_socket_stream_params_t::auto_reconnect buffer_size : 1k, # optional, mco_client_socket_stream_params_t::buffer_size connect_attempts : 3, # optional, mco_client_socket_stream_params_t::connect_attempts connect_interval : 1000, # optional, mco_client_socket_stream_params_t::connect_attempts connect_timeout : 2000, # optional, mco_client_socket_stream_params_t::connect_timeout write_timeout : 500, # optional, mco_client_socket_stream_params_t::write_timeout sock_params : { optional, domain : "inet" # other options are "local" or "sdp", # mco_server_socket_stream_params_t::sock_params::domain mode : ["nodelay"], # combination of "nodelay", "do_not_reuse_address", # "non_blocking", "keep_alive" or # "do_not_cloexec", mco_server_socket_stream_params_t::sock_params::mode sndbuf : 16k, # mco_server_socket_stream_params_t::sock_params::sndbuf use_ssl : false, # apply "ssl_params" for this socket or not } }For type
tee
:{ type : "tee", stream1 : { ... } stream2 : { ... } }For example, the following configuration defines a replica-side
JSON
iterator that writes the file calledmyfile.txt
and into a socket127.0.0.1:10023
ha_params : { connection_strings : "127.0.0.1:10000", replica_params : { iterator : { type : "json", compact : false, stream : { type : "tee", stream1 : { type : "socket", listen : false, hostname : "127.0.0.1", port : 10023, }, stream2 : { type : "file", name : "myfile.txt" } } } } }