Data Relay and Persistent Event Queue Strategies and Implementation

A transaction log that is usable for restoring the main eXtremeDB database (which is often the primary purpose of Transaction Logging) may or may not be created. Data to potentially be passed to an external data store is read by the function mco_translog_iterate() which calls in its turn a user-defined call-back function. The data mco_translog_iterate() passes to this call-back function are objects that were created, deleted or modified in the eXtremeDB database inside logged transactions. (Note that this is comparable to using the standard eXtremeDB function mco_trans_iterate() to iterate through the objects effected by a transaction, except that mco_trans_iterate() only iterates though the objects in the current transaction, and thus provides the capability of exporting this data synchronously, whereas with mco_translog_iterate() a log file can be read and “exported” at any time.)

There are two ways to supply data from a writer process (the process that started transaction logging and is currently processing data inside the database) to a reader process (the process reading data and transferring it to the external data store by calling the user-defined callback).

The first method is to create a log file. After the log file has been closed by the log writer, this log can be opened and read for export just as it could be opened to restore the main database as in ordinary TL usage.

The second method is to use the pipe mechanism supplied by eXtremeDB Transaction Logging. In this case, data is being transferred through a memory buffer, either in conventional memory or in shared memory. This method is faster than the file-based method, but can create a race situation: if the reader process (which is calling the function mco_translog_iterate()) does not read data fast enough, then the writer process blocks with the result that there is no available memory in the pipe buffer. This causes the writer process to be blocked until the pipe buffer has available memory (that is, until the reader process releases the necessary amount of pipe buffer). This problem can be solved by extending the main pipe buffer with a temporary file in case of buffer overflow; then no blocking occurs inside the writer process.

The two methods can be combined by setting the flag MCO_TRANSLOG_DUAL_OUT.

Be aware that when using TL with hybrid databases, performance can be improved by turning off disk manager logging (passing the NO_LOG log type into the mco_db_open_dev() API). If a database contains only persistent classes, then the only reason to use TL is to export transactions to an external system because reXtremeDB has its own integral transaction logging implementation that is separate from the eXtremeDB Transaction Logging optional module.

The asynchronous export is started by calling the function mco_translog_start() with the flag MCO_TRANSLOG_ITERABLE. This allows reading and then iterating the log by calling the function mco_translog_iterate().

Whether a file or pipe is used between the reader and writer processes the flag MCO_TRANSLOG_ITERABLE is necessary to start the relay process correctly. The writer and reader may be threads of a single process or they may be different processes (applications) running on a single computer. When using the file-based method, these applications may even be on different computers – if the process that is applying the logged transactions to the external data store is on a different computer, then the file-based method, though slower, is the only option.

Ordinary log file as a transport to a reader

To use an ordinary log file for exporting transactions to an external data store, the file must be opened by function mco_translog_iterate(). To be a valid log file, it has to have been closed by mco_translog_stop() on the writer side, or it has to have been indirectly closed by calling mco_translog_start() with MCO_TRANSLOG_RESTART. (Note that it is possible to use additional files as semaphores to indicate that a log file was properly closed by a writer.)

Opening simple file based log:

     
    #define DISK_PAGE_SIZE 4096
    #define LOG_FILE       "/tmp/mylog"
    mco_TL_start_data_t log_parms;
    log_parms.flags = MCO_TRANSLOG_ITERABLE;
    log_parms.disk_page_size = DISK_PAGE_SIZE;
    mco_translog_start (db, LOG_FILE, &log_parms);
     

Reading and iteration of log file:

     
    #define TEMP_BUFFER_SIZE (32*1024*1024)
    #define LOG_FILE       "/tmp/mylog"
    MCO_RET iteration_proc (mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx)
    {
        return MCO_S_OK;
    }
    ...
    mem_buff = malloc (TEMP_BUFFER_SIZE);
    mco_translog_iterate (LOG_FILE, 0, iteration_proc, user_ctx, mydatabase_get_dictionary (),
            
                    mem_buff, TEMP_BUFFER_SIZE);
            
     

Partitioning the log into multiple files

To reduce the delay between the moment when a transaction is written into the log and when it is passed to the user-defined iteration callback, and also to reduce the disk space used by log files, it is possible to limit the maximum size of a log file and switch logging to another file when that limit has been reached.

There are two ways to implement this file size limit. The first method is by starting the logging with flag MCO_TRANSLOG_SIZE_CLBK, a maximum log file size and the address of a user-defined file-size callback function. This implements a kind of interrupt strategy; when the specified log file size is exceeded the callback function is called.

The second method is a polling strategy. The current log file size can be periodically checked by function mco_translog_get_info().

With both of these methods the log file size is being checked from the writer process.

When the log file needs to be switched to another, simply call the function mco_translog_start() again but with the additional flag MCO_TRANSLOG_RESTART. This causes the log file to be switched to another without the necessity of calling the function mco_translog_stop(), and also without having to freeze transaction activity for this period, which is very advantageous for multithreaded applications.

Example of writing a multiple file log – method 1: “interrupt”

     
    #define DISK_PAGE_SIZE 4096
    #define MAX_LOG_SIZE   128*1024*1024
    #define LOG_FILE1       "/tmp/mylog1"
    #define MARK_FILE1      "/tmp/mymark1"
    #define LOG_FILE2       "/tmp/mylog2"
    #define MARK_FILE2      "/tmp/mymark2"
    volatile int            size_exceeded = 0;
 
    void warn_sz_proc (mco_size_t log_size)
    {
        size_exceeded = 1;
    }
     
    ...
     
    mco_TL_current_info_t log_info;
    mco_TL_start_data_t log_parms;
    log_parms.flags = MCO_TRANSLOG_ITERABLE | MCO_TRANSLOG_SIZE_CLBK;
    log_parms.disk_page_size = DISK_PAGE_SIZE;
    log_parms.max_size = MAX_LOG_SIZE;
    log_parms.warn_sz_proc = warn_sz_proc;
    mco_translog_start (db, LOG_FILE1, &log_parms);
     
    /* Work with database. Current log is the file LOG_FILE1 */
    while (!size_exceeded)
    {
        ...
    }
    log_parms.flags |= MCO_TRANSLOG_RESTART;
    mco_translog_start (db, LOG_FILE2, &log_parms);
    create_file (MARK_FILE1);
     
    /* Work with database further. Current log is the file LOG_FILE2 */
    ...
    mco_translog_stop (db);
    create_file (MARK_FILE2);
     

Example of writing a multiple file log – method 2: “polling”

     
    #define DISK_PAGE_SIZE 4096
    #define MAX_LOG_SIZE   128*1024*1024
    #define LOG_FILE1       "/tmp/mylog1"
    #define MARK_FILE1      "/tmp/mymark1"
    #define LOG_FILE2       "/tmp/mylog2"
    #define MARK_FILE2      "/tmp/mymark2"
     
    mco_TL_current_info_t log_info;
    mco_TL_start_data_t log_parms;
    log_parms.flags = MCO_TRANSLOG_ITERABLE;
    log_parms.disk_page_size = DISK_PAGE_SIZE;
    mco_translog_start (db, LOG_FILE1, &log_parms);
     
    /* Work with database. Current log is the file LOG_FILE1 */
    while (mco_translog_get_info (db, &log_info) == MCO_S_OK
            && log_info.log_size < MAX_LOG_SIZE)
    {
        ...
    }
    log_parms.flags |= MCO_TRANSLOG_RESTART;
    mco_translog_start (db, LOG_FILE2, &log_parms);
    create_file (MARK_FILE1);
     
    /* Work with database further. Current log is the file LOG_FILE2 */
    ...
    mco_translog_stop (db);
    create_file (MARK_FILE2);
     

Reading and iteration of multiple log files

Reading from multiple log files is quite simple. The application simply switches log files by sequentially calling function mco_translog_iterate().

Example of reading and iteration of multiple log files

     
    #define TEMP_BUFFER_SIZE (32*1024*1024)
    #define LOG_FILE1       "/tmp/mylog1"
    #define MARK_FILE1      "/tmp/mymark1"
    #define LOG_FILE2       "/tmp/mylog2"
    #define MARK_FILE2      "/tmp/mymark2"
    MCO_RET iteration_proc (mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx)
    {
        /* Note "trans" is always 0 in case of asynchronous export */
        return MCO_S_OK;
    }
    ...
    mem_buff = malloc (TEMP_BUFFER_SIZE);
     
    wait_while_no_file (MARK_FILE1);
     
    mco_translog_iterate (LOG_FILE1, 0, iteration_proc, user_ctx, mydatabase_get_dictionary (), 
                    mem_buff, TEMP_BUFFER_SIZE);
     
    unlink (MARK_FILE1);
    wait_while_no_file (MARK_FILE2);
     
    mco_translog_iterate (LOG_FILE2, 0, iteration_proc, user_ctx, mydatabase_get_dictionary (), 
                    mem_buff, TEMP_BUFFER_SIZE);
    unlink (MARK_FILE2);
     

Please note that the function mco_translog_iterate() returns only after it has completed reading of a log file or if the user defined call-back returns a value other than MCO_S_OK or if some other internal error occurs. Also note that it is necessary to call mco_translog_iterate() repeatedly to read the next log file.

 

Using a pipe to transport data

When pipe mode is used as a transport for exporting transaction data to an external database, a pipe-buffer is placed in shared or conventional memory (depending on the memory settings used in the main eXtremeDB database being used as the data source). If the reader process is processing data faster than the writer process is placing it into the pipe, then this mode is clearly the fastest. However, if the reader is slower than the writer process, the pipe buffer will overflow and the writer process will be blocked inside the transaction commit until the reader releases enough memory to store the transaction.

It is possible to avoid this blocking by using an optional overflow file. This file is specified in the file_path argument of functions mco_translog_start() and mco_translog_iterate(). If a file is specified, it is used if pipe buffer overflow occurs and, therefore, the writer process is never blocked.

Naturally, the writer and reader processing with this overflow file will be much slower. This is a consideration when determining the pipe buffer size. Another consideration is that, in any case, pipe mode does not generate log files that can be used for recovery of a database.

 

The reader process must open a database handle (by calling mco_db_connect()) of the database for which some or all transactional data is to be exported. This database handle, in turn, must be passed to the function mco_translog_iterate(). This allows access to the pipe from the reader.

The pipe buffer is an instance of type mco_device_t and must be specified when the database is created. It will be assigned MCO_MEMORY_ASSIGN_PIPE_BUF and type MCO_MEMORY_NAMED or MCO_MEMORY_CONV, which must be the same memory type as that used by the main database device.

It is also possible to specify more than one pipe buffering device. In this case transactions will automatically be broadcast to all pipe instances as separate pipe buffers. This implies that the count of readers (processes or threads calling function mco_translog_iterate() or mco_translog_play()) must be equal to the count of pipe devices. It is useful to check the field pipe_readers_connected of the mco_TL_current_info structure to verify that all readers are connected. As all of the pipe devices work concurrently, it is recommended to specify the same size for all of the pipes. And likewise for all temporary buffers passed into mco_translog_iterate(). (See sample tlogitermultipipe for an example implementation of multiple pipes.)

The temporary buffer passed into mco_translog_iterate() should be allocated by malloc() if the main database device is of type conventional memory. But if shared (named) memory is used by the main database device, then the pointer to a temporary buffer passed to mco_translog_iterate() must be 0. In this case, mco_translog_iterate() will create a temporary buffer in shared memory automatically.

Note that, when using shared memory, it is important to distinguish between the 'offset' and 'direct pointer' nature of the eXtremeDB core. If the 'dptr' core is used then all memory devices (objects of structure mco_device_t) must specify some unique 'hint' field which is actually the beginning address of the requested shared memory block. Regarding functions mco_translog_play(),and mco_translog_iterate() , the argument (or corresponding field of structure mco_TL_play_params_h) void *mem_ptr is actually related to an internally used memory device which temporarily holds database objects read from a log stream. Thus the argument mem_ptr here is the same 'hint' for that memory device. It should be zero in the case of the 'offset' core or should be a valid address value in the case of 'dptr' core.

Pre-buffering

When initializing the pipe, the application can specify the MCO_TRANSLOG_SYNC_INSTANTLY flag in the data_flags structure. If specified, then each transaction is written into the pipe immediately. Otherwise the Transaction Logging runtime buffers transactions in its internal intermediate buffer (the default size is 64K). When the buffer becomes full it is written into the pipe all at once. The flag values MCO_TRANSLOG_SYNC_COUNT and MCO_TRANSLOG_SYNC_TIMER are not allowed when using multiple pipes and dual out mode (see below).

The user-defined callback iteration_proc may report an error by returning a value different from MCO_S_OK. In this case mco_translog_iterate() returns immediately with the same return code that the iteration_proc has just returned. On the writer side the transaction commit (mco_trans_commit() or mco_trans_commit_phase2()) will return error code MCO_E_TL_PIPE_TERM. This informs the writer process that the reader has broken its loop. In this case, the transaction log should be stopped by calling mco_translog_stop() before any further actions.

Option Mark_Last_Object

The runtime flag MCO_RT_OPTION_MARK_LAST_OBJ can be set (by function mco_runtime_setoption()) to specify that notification of ”end-of-transaction” is required. Transactions in a log file or pipe stream are output in their original order. By default while reading and iterating the log or pipe there is no simple way to understand if the object passed into the user-defined iteration callback is the latest in the current transaction. For this purpose the runtime option MCO_RT_OPTION_MARK_LAST_OBJ is used to enable the runtime to notify the user-defined callback about the end of each transaction. If it is set then user-defined iteration callback will be called one additional time with a zeroed object handle to indicate that the current transaction has finished.

Further implementation details are demonstrated in samples tlogiterpipe, tlogitermultipipe and tlogiterdualout.

Opening a pipe-based log

     
    #define LOG_PAGE_SIZE 128
    #define PIPE_BUFFER_SIZE (16*1024*1024)
    mco_device_t dev[];
    mco_TL_start_data_t log_parms;
    ...
     
    /* assign pipe buffer as mco device. It will be automatically
    passed to a reader */
    dev[x].assignment = MCO_MEMORY_ASSIGN_PIPE_BUF;
    dev[x].size = PIPE_BUFFER_SIZE;
    dev[x].type = MCO_MEMORY_NAMED;
     
    sprintf( dev[x].dev.named.name, "%s-pipe", databaseName_ );
    dev[x].dev.named.flags = 0;
    dev[x].dev.named.hint  = 0;
    ...
     
    mco_db_open_dev (DATABASE_NAME, mydatabase_get_dictionary(), dev, x, &db_params);
    mco_db_connect (DATABASE_NAME, &db);
    log_parms.flags = MCO_TRANSLOG_ITERABLE | MCO_TRANSLOG_PIPE;
    log_parms.disk_page_size = LOG_PAGE_SIZE;
     
    mco_translog_start (db, 0, &log_parms);
     

Reading and iteration of a pipe-based log

     
    #define TEMP_BUFFER_SIZE (32*1024*1024)
    #define PIPE_BUFFER_SIZE (16*1024*1024)
     
    MCO_RET iteration_proc (mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx)
    {
        mco_trans_counter_t trans_no;
        mco_trans_no (trans, &trans_no);
        if (all_ok)
        return MCO_S_OK;
        else
        return MCO_ERR_LAST + 1;
    }
    ...
     
    MCO_RET rc;
    void *mem_buff;
     
    mco_db_connect (mydatabase_name, &db);
     
    /* if used memory management library is different from mcomconv */
    #ifdef USE_SHARED_MEMORY
    mem_buff = 0;
    #else
    mem_buff = malloc (TEMP_BUFFER_SIZE);
    #endif
     
    mco_translog_iterate (0, db, iteration_proc, user_ctx, mydatabase_get_dictionary (), mem_buff, TEMP_BUFFER_SIZE);
     

Opening a pipe-based log with overflow file

     
    #define LOG_PAGE_SIZE 128
    #define PIPE_BUFFER_SIZE (16*1024*1024)
    #define PIPE_OVEFLOW_FILE "/tmp/overflow_file"
    mco_device_t dev[];
    mco_TL_start_data_t log_parms;
    ...
     
    /* assign pipe buffer as mco device. It will be automatically
    passed to a reader */
    dev[x].assignment = MCO_MEMORY_ASSIGN_PIPE_BUF;
    dev[x].size = PIPE_BUFFER_SIZE;
    dev[x].type = MCO_MEMORY_NAMED;
    sprintf( dev[x].dev.named.name, "%s-pipe", databaseName_ );
    dev[x].dev.named.flags = 0;
    dev[x].dev.named.hint  = 0;
     
    ...
     
    mco_db_open_dev (DATABASE_NAME, mydatabase_get_dictionary(), dev, x, &db_params);
     
    mco_db_connect (DATABASE_NAME, &db);
     
    log_parms.flags = MCO_TRANSLOG_ITERABLE | MCO_TRANSLOG_PIPE;
    log_parms.disk_page_size = LOG_PAGE_SIZE;
     
    mco_translog_start (db, PIPE_OVEFLOW_FILE, &log_parms);
     

Reading and iteration of a pipe-based log with overflow file

     
    #define TEMP_BUFFER_SIZE (32*1024*1024)
    #define PIPE_BUFFER_SIZE (16*1024*1024)
    #define PIPE_OVEFLOW_FILE "/tmp/overflow_file"
     
    MCO_RET iteration_proc (mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx)
    {
        mco_trans_counter_t trans_no;
        mco_trans_no (trans, &trans_no);
        ...
         
        if (all_ok)
        return MCO_S_OK;
        else
        return MCO_ERR_LAST + 1;
    }
     
    ...
     
    MCO_RET rc;
    void *mem_buff;
     
    mco_db_connect (mydatabase_name, &db);
     
    /* if used memory management library is different from mcomconv */
    #ifdef USE_SHARED_MEMORY
    mem_buff = 0;
    #else
    mem_buff = malloc (TEMP_BUFFER_SIZE);
    #endif
     
    mco_translog_iterate (PIPE_OVEFLOW_FILE, db, iteration_proc, user_ctx, mydatabase_get_dictionary (), mem_buff, TEMP_BUFFER_SIZE);
     

 

Endian conversion

It is possible to mix little-endian and big-endian architectures using Transaction Logging. In other words, applications can create a transaction log on a little-endian system and replay the log to a big-endian system, or vice-versa. There is proper auto-detection and auto-conversion which does not require further configuration or user intervention. However, note that mixing of different word-size platforms, i,e, x32 vs. x64, is not allowed. Also, note that the same transaction manager (MURSIW or MVCC) must be used in both the writer (log creator) and reader applications.

Dual out logging mode

eXtremeDB Transaction Logging allows writing a copy of the transaction log to a file while the pipe mode is used as a transport for exporting transaction data. This file may be used, for instance, to restore a database or to store it in log form on persistent media. The stored log file may then be processed by the API function mco_translog_apply(). Or it can be iterated by functions mco_translog_iterate() and mco_translog_play(). Dual out logging is enabled by specifying flag MCO_TRANSLOG_DUALOUT in addition to flag MCO_TRANSLOG_PIPE. Field dual_log_path of structure mco_TL_start_data specifies the destination log file. (Sample tlogiterdualout demonstrates this functionality.)

Dynamic Pipes

This feature (in eXtremeDB Transaction Logging version 6.5 and later) allows applications to create / remove pipes as well as connect to and disconnect from them at runtime. Database events are logged to the pipe and the connected applications are able to process these logs as necessary; they may also stop processing them and remove the associated resources (pipes and connections) at will. The following application scenario is supported through the dynamic pipes:

1. The primary process creates a shared memory database and starts the transaction logging.

2. Secondary processes connect to the database at any given time, create a pipe using this connection and then process the log that the primary application writes into the pipe.

In reality the secondary processes are asynchronously listening to the database events written into the log, and also have access to the content of the modified database objects that triggered those events. The secondary processes are free to disconnect from the database, and destroy their pipe at any time.

Naturally if the primary process stops the transaction logging via mco_translog_stop(), all connected readers get the appropriate return code. The primary application does not receive or process any notifications from the secondary applications, but merely commits its transactions as usual, logging events if required. Multiple pipes can be created at a time and multiple readers can be connected to those pipes.

Implementation

1. If the flag MCO_TRANSLOG_DYNAMIC_PIPE for mco_translog_start() is set, then readers can attach and detach to pipes at any time without affecting the writers (threads from which the transactions are committed). Each reader must create its own pipe - two or more readers cannot be connected to the same pipe.

2. The API mco_translog_play_ex() is similar to mco_translog_play(), except that it takes a single structure mco_TL_play_params_t as parameter. The fields of the structure correspond to parameters of mco_translog_play() with one additional field:

     
    mco_device_t *pipe_device;
     

The pipe_device is a descriptor of the pipe device to be used by the readers. If the value of the pipe_device is NULL then the transaction logging runtime chooses the first pipe that does not have any connected readers. The same happens when the old style mco_translog_play()and mco_translog_iterate() APIs are used.

3. The mco_translog_play_params_init() function zeroes out the mco_TL_play_params_t structure.

4. The runtime option MCO_RT_MAX_DYNAMIC_PIPES has the default value of 0. This value defines the maximum number of the PIPE_BUF type devices that can be added through mco_db_extend_dev(). This limit does not include the pipe devices created via mco_db_open_dev(). In other words, if the application sets MCO_RT_MAX_DYNAMIC_PIPES to 3 and defines 5 pipe devices at the time the database was created, then it will be able to have up to 8 pipe devices at any moment in time.

Using event handlers during iteration of log file or pipe

If the database schema defines events, their handlers may be registered and fired during the reading of the log file or pipe in addition to or instead of iterating the objects themselves. This technique is called 'persistent events queue' and can be used as a method of asynchronous event handling by the current process or by a separate process.

To use this feature, the database schema must define one or more events, and the application must define the handlers for these events, and a function which registers the handlers must be defined in the application. Then function mco_translog_play() is used instead of function mco_translog_iterate() to iterate the log. (Sample tlogiterevents demonstrates this functionality.)

Filtering data stored in the log by events

An events mask may be specified to limit the data written to the logging stream transferred to the log file or pipe. Note that this mode does not allow the use of the log for database recovery due to the filtering. However, it can be useful for specific purposes to reduce the amount of data transferred to the log. For example, if event handlers are used on the reader's side and it is not necessary to transfer all of the data to the log except notification of the event itself.

Flag MCO_TRANSLOG_EVENT_MASK is specified to enable this feature and the event_mask field of structure mco_TL_start_data is used to define the desired mask. There are two predefined mask values: MCO_LOG_MASK_ALL_CHANGES stores all data in the log, which is equivalent to disabling events mask filtering; MCO_LOG_MASK_ALL_EVENTS stores only the data related to events defined in the database schema. If these predefined values are not used, the mask must be composed from the event handler identifiers generated by the database schema compiler. (Note that event handler identifiers are not zero-based, i.e. they are natural numbers starting from one, and must be decremented by one to use it in a mask. For example new, delete and delete all events could be specified in a mask as follows:

     
    tl_parms.event_mask = (1 << (MCO_EVENT_newEvent - 1)) |
                    (1 << (MCO_EVENT_deleteEvent - 1)) |
                    (1 << (MCO_EVENT_deleteAllEvent - 1));
     

(Sample tlogiterevents demonstrates the use of filtering.)

User-defined iteration callback

Consider the following schema:

     
    class Record_A
    {
        uint4 key;
    };
 
    class Record_B
    {
        uint4 key;
    };
     

 

Part of the code generated by the schema compiler would be:

     
    typedef struct Record_A_   { MCO_Hf h; }  Record_A;
    #define                  Record_A_code          1
 
    typedef struct Record_B_   { MCO_Hf h; }  Record_B;
    #define                  Record_B_code          2
     

The user-defined callback iteration_proc is called by mco_translog_iterate() for each object one by one. This iteration_proc has the following definition:

Prototype

MCO_RET iteration_proc ( mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx )
  Arguments trans Transaction handle. (It is possible to get the transaction number by calling mco_trans_no() with this handle.) Could be 0 to indicate that function mco_db_clean() was called in source database. See sample tlogiterpipe for details.
    obj Object handle of the object that was actually created, deleted or updated inside the transaction. This handle may be used to access the fields of the object with calls like: Record_A_key_get((Record_A*)obj, &key); Record_B_key_get((Record_B*)obj, &key); Could be 0 if end of transaction state is indicated. (See section Option Mark_Last_Object for details).
    cid The class Id of this object. This will have the same value as the definitions Record_A_code or Record_B_code. Compare this code to Record_A_code or Record_B_code to determine which type of object is present before attempting to access its fields.
    obj_state The type of operation that was performed on the object: MCO_TRANS_OBJ_ALL_DELETED – delete all objects of this class. MCO_TRANS_OBJ_DELETED – the object was deleted. MCO_TRANS_OBJ_CREATED – the object was created. 0 - the object was updated.
    user_ctx A pointer to user data passed into the function mco_trans_iterate().

Note: If the obj_state value is MCO_TRANS_OBJ_ALL_DELETED, then obj is the first object of its class in the database for which the function classname_delete_all() was called inside a transaction. For example: Record_A_delete_all(). If there were no such objects in the database at the moment classname_delete_all() was called, then the current iteration callback will not be called. So it is impossible to have an empty obj argument.

Example of user-defined callback implementation for the above schema:

     
    MCO_RET iteration_proc (mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx)
    {
        mco_trans_counter_t trans_no;
        if (trans == 0)
        {
            /* handle call of mco_db_clean called in source DB */
            return MCO_S_OK;
        }
         
        mco_trans_no (trans, &trans_no);
        if (cid == Record_A_code)
        {
            uint4 key;
     
            if (obj_state == MCO_TRANS_OBJ_ALL_DELETED)
            {
                /* Delete all objects of class Record_A */
            }
            else if (obj_state == MCO_TRANS_OBJ_DELETED)
            {
                /* Delete current object by the key */
                Record_A_key_get ((Record_A *)obj, &key);
            }
            else if (obj_state == MCO_TRANS_OBJ_CREATED)
            {
                /* Create new object with the key */
                Record_A_key_get ((Record_A *)obj, &key);
            }
            else
            {
                /* Update object with the key */
                Record_A_key_get ((Record_A *)obj, &key);
            }
        }
        else if (cid == Record_B_code)
        {
            uint4 key;
 
            if (obj_state == MCO_TRANS_OBJ_ALL_DELETED)
            {
                /* Delete all objects of class Record_B */
            }
            else if (obj_state == MCO_TRANS_OBJ_DELETED)
            {
                /* Delete current object by the key */
                Record_B_key_get ((Record_B *)obj, &key);
            }
            else if (obj_state == MCO_TRANS_OBJ_CREATED)
            {
                /* Create new object with the key */
                Record_B_key_get ((Record_B *)obj, &key);
            }
            else
            {
                /* Update object with the key */
                Record_B_key_get ((Record_B *)obj, &key);
            }
        }
     
        return MCO_S_OK;
    }
     

User-defined register event handlers callback

Consider the following schema:

     
    class Record
    {
        uint4 key;
        event <new>         newEvent;
    };
     

Part of the code generated by the schema compiler would be:

     
    #define MCO_EVENT_newEvent  1
    typedef MCO_RET (*mco_newEvent_handler)( mco_trans_h t, Record * obj, MCO_EVENT_TYPE et, /*INOUT*/ void *param);
    MCO_RET  mco_register_newEvent_handler       ( mco_trans_h t, mco_newEvent_handler handler, void * param );
    MCO_RET  mco_unregister_newEvent_handler     ( mco_trans_h t, mco_newEvent_handler handler);
 

The user-defined callback register_callback() is called by mco_translog_play() just before starting the iteration of objects. This register_callback() has the following definition:

Prototype MCO_RET register_callback ( mco_trans_h trans, void* user_ctx )
  Arguments trans Transaction handle. This handle should be passed to event registering function.
    user_ctx A pointer to user data passed into argument regevent_user_ctx of the function mco_trans_play().

Example of user-defined register callback implementation for the above schema:

     
    MCO_RET register_callback(mco_trans_h t, void *param)
    {
        return mco_register_newEvent_handler(t, my_newEvent_handler, param);
    }
     
    MCO_RET my_newEvent_handler( mco_trans_h t, Record * obj, MCO_EVENT_TYPE et, /*INOUT*/ void *param)
    {
        ev_stat_t *s = (ev_stat_t *)param;
        printf("my_newEvent_handler %d\n", ++(s->new_cnt));
        return MCO_S_OK;
    }
     

 

Precautions for a potential system failure

If Transaction Logging is employed in a multi-process scenario using pipes, and one of the processes crashes (for example the reader) then the writer process may be suspended waiting on internal locks used in a pipe. To resolve this situation a “sniffer” task can be run alongside the Transaction Logging processes; the sniffer task will clean up locks left over from the dead connections left by the crashed processes. (Please refer to the “Database Recovery from Failed Processes” section of the eXtremeDB User Guide for details regarding the sniffer API.)

Regardless of the method of Transaction Logging (TL) being used (pipe, multi-pipe or dynamic-pipe), the following steps are recommended:

1. Run sniffer in the context of a separate thread in the main application:

     
    #define SNIFFER_INTERVAL 100
    MCO_RET sniffer_callback(mco_db_h db, void* context, mco_trans_counter_t trans_no)
    {
        SAMPLE_OS_TASK_ID pid = *(SAMPLE_OS_TASK_ID *)context;
        if ( sample_os_task_id_check( pid ) == 0 ) 
        {
            return MCO_S_OK;
        }
        printf("Process %d is crashed\n", pid);
        return MCO_S_DEAD_CONNECTION;
    }
    void sniffer_loop( sample_task_t * descriptor )
    {
        mco_db_h db;
        SAMPLE_OS_TASK_ID pid = sample_os_task_id_get();
         
        /* Connect using mco_db_connect_ctx() and pass &pid as parameter */
        MCO_RET rc = mco_db_connect_ctx(db_name, &pid, &db);
        if ( MCO_S_OK == rc ) 
        {
         
            /* Descriptor->stopped flag is set to 1 by sample_stop_task() in main thread*/
            while ( MCO_S_OK == rc && descriptor->stopped == 0 ) 
            {
                rc = mco_db_sniffer(db, sniffer_callback, MCO_SNIFFER_INSPECT_ACTIVE_CONNECTIONS);
                sample_sleep(SNIFFER_INTERVAL);
            }
            mco_db_disconnect(db);
        }
    }
     

Note that the sniffer is executed in the task that contains the sniffer_loop() function. The callback itself merely makes sure that all tasks connected to the database are alive (see for example samples/native/core/19-recovery/sniffer).

2a) For the pipe case the main program will initialize the TL processing and watch for any errors reported by TL. If any errors are reported, the main application will terminate the TL processing (which will free all locks set by the TL runtime) and, if necessary, restart TL to allow other TL clients to continue processing the pipe, for example:

    ...
     
    /* allocate pipe memory device */
    dev[1].type       = MCO_MEMORY_NAMED;
    dev[1].assignment = MCO_MEMORY_ASSIGN_PIPE_BUF;
    sprintf( dev[1].dev.named.name, "%s-pipe", db_name );
    dev[1].size       = DATABASE_SIZE / 2;
    dev[1].dev.named.flags = 0;
    dev[1].dev.named.hint  = 0;
    ...
     
    /* Set up and run transaction logging (pipe is created here) */
    tl_parms.flags = MCO_TRANSLOG_ITERABLE | MCO_TRANSLOG_PIPE;
    tl_parms.disk_page_size = PSTORAGE_PAGE_SIZE;
    rc = mco_translog_start( connection, 0, &tl_parms );
     
    /* Wait for TL-client connection */
    printf("\n\n\tWaiting for log reader ...\n" );
    while(1) 
    {
        mco_TL_current_info_t tl_info;
        mco_translog_get_info ( connection, &tl_info );
        if (tl_info.pipe_readers_connected)
            break;
        else
            sample_sleep (100);
    }
    ...
    while ( data processing ) 
    {
        while (1) 
        {
            /* process database transaction */
            if (rc == MCO_S_OK) 
            {
                ...
            }
             else if (rc == MCO_E_TL_IO_ERROR || rc == MCO_E_TL_PIPE_TERM ) 
            {
                /* TL processing error detected */
                 
                /* force logging to stop */
                rc = mco_translog_terminate( connection );
                 
                /* wait for TL connection */
                printf("\n\n\tWaiting for log reader ...\n" );
                while(1) 
                {
                    mco_TL_current_info_t tl_info;
                    mco_translog_get_info ( connection, &tl_info );
                    if (tl_info.pipe_readers_connected)
                        break;
                    else
                        sample_sleep (100);
                }
                 
                /* restart TL processing  */
                tl_parms.flags |= MCO_TRANSLOG_RESTART;
                rc = mco_translog_start( connection, 0, &tl_parms );
                sample_rc_check( "\tRe-start logging", rc );
                break;
            } 
            else
                ...
        }
    }
     

2b. For the Multi-pipe case the TL handling is similar, except that it is necessary to create several pipe-devices:

     
    ...
    for ( i=1; i<=PIPES_COUNT; i++ ) {
        dev[i].type       = MCO_MEMORY_NAMED;
        dev[i].assignment = MCO_MEMORY_ASSIGN_PIPE_BUF;
        sprintf( dev[i].dev.named.name, "%s-pipe-%u", db_name, i );
        dev[i].size       = DATABASE_SIZE / 2;
        dev[i].dev.named.flags = 0;
        dev[i].dev.named.hint  = 0;
    }
    ...
    rc = mco_translog_start( connection, 0, &tl_parms );
     
    sample_rc_check( "\tStart logging", rc );
     
    /* Wait while iterator thread is alive */
    printf("\n\n\tWaiting for log reader ...\n" );
    while(1) 
    {
        mco_TL_current_info_t tl_info;
        mco_translog_get_info ( connection, &tl_info );
        if (tl_info.pipe_readers_connected == PIPES_COUNT)
            break;
        else
            sample_sleep (100);
    }
    ...
    while ( data processing ) 
    {
        while (1) 
        {
            /* process database transaction */
            if (rc == MCO_S_OK) 
            {
                ...
            } 
            else if (rc == MCO_E_TL_IO_ERROR || rc == MCO_E_TL_PIPE_TERM ) 
            {
                /* TL processing error detected */
                /* force logging to stop */
                rc = mco_translog_terminate( connection );
                 
                /* wait for TL connection */
                printf("\n\n\tWaiting for log reader ...\n" );
                while(1) 
                {
                    mco_TL_current_info_t tl_info;
                    mco_translog_get_info ( connection, &tl_info );
                    if (tl_info.pipe_readers_connected == PIPES_COUNT)
                        break;
                    else
                        sample_sleep (100);
                }
                /* restart TL processing  */
                tl_parms.flags |= MCO_TRANSLOG_RESTART;
                rc = mco_translog_start( connection, 0, &tl_parms );
                sample_rc_check( "\tRe-start logging", rc );
                break;
            } else
            ...
        }
    }
     

2c. The dynamic-pipes case is trivial, for example:

     
    ...
    /* Set maximum number of dynamic pipe segments */
    mco_runtime_setoption(MCO_RT_MAX_DYNAMIC_PIPES, PIPES_COUNT);
    ...
    /* Set default database parameters */
    mco_db_params_init ( &db_params );
     
    /* Customize the params according to the application */
    db_params.mem_page_size            = MEMORY_PAGE_SIZE;
    db_params.disk_page_size           = 0;            /* Pure in-memory database */
    db_params.db_max_connections       = 10 + PIPES_COUNT;
    db_params.connection_context_size  = sizeof(SAMPLE_OS_TASK_ID);
    ...
     
    /* Set up and run transaction logging (pipe is created here) */
    tl_parms.flags = MCO_TRANSLOG_ITERABLE | MCO_TRANSLOG_PIPE |
    MCO_TRANSLOG_DYNAMIC_PIPE | MCO_TRANSLOG_SYNC_INSTANTLY;
    tl_parms.disk_page_size = PSTORAGE_PAGE_SIZE;
    rc = mco_translog_start( connection, 0, &tl_parms );
    sample_rc_check( "\tStart logging", rc );
     
    /* Wait while iterator thread is alive */
    printf("\n\n\tWaiting for log reader ...\n" );
    while(1) 
    {
        mco_TL_current_info_t tl_info;
        mco_translog_get_info ( connection, &tl_info );
        if (tl_info.pipe_readers_connected)
            break;
        else
            sample_sleep (100);
    }
    ...
     

Possible Data Relay Deadlock

There are some possible situations where data relay deadlock may happen. For instance:

  • When there are two database (or more) connections, the first connection might flood the database with new records while the second connection is used to start the transaction logging reader (e.g. calling mco_translog_play() etc.). Then inside a user-defined reader callback function there might be some data processing that results in writing back into the same database from this second connection. Under heavy load it is possible that a write transaction in a user-defined reader callback (on the second connection) could not be completed and is locked because there is no room in the pipe to write data. But at the same time the pipe could not be read (to free some space in it) because the reading procedure is performing in the same thread where commit operation is already blocked.
  • Under heavy load without a pipe overflow file when the thread committing objects in response to reading by the iterator fails to perform a commit because the pipe is full. It blocks waiting for the reader to free space in the pipe. But the reader thread fails to do that being blocked itself waiting for the first thread to write and free the queue which is also limited and full. Actually any of the threads (connections) that is trying to commit a transaction will be blocked if the pipe is full and the reader is not able to read data as reader thread is blocked in one of the hung commits itself.

This is non-obvious yet natural behavior of the closed loop system (commit - log - pipe - reader's analyzer - commit again). Trying to split the pipe reading procedure and the commit of new data which is some derivative of read data (eg. commit - log - pipe - reader's analyzer - some data queue - another application thread and DB connection - commit again) may cause deadlock. Basically if the application is committing some data to a database which is the result of reading the transaction logging pipe, dead-lock is possible under heavy load situations.

This can be moderated at the application level by artificially throttling the initial commits. In fact, initially the Data Relay feature was designed to transfer data from eXtremeDB to another DBMS or to some data processor. So it was not assumed that processed data would be committed back to eXtremeDB. Fortunately the pipe overflow files resolve this problem of "data loop-back". The important fact is the only reliable way to avoid deadlock is to use a pipe overflow file (which does imply a penalty to the overall performance).

Possible Error Conditions for MCO_E_TL_PIPE_TERM

There are two cases when error code MCO_E_TL_PIPE_TERM could be returned both by a reader call to mco_translog_iterate() and a writer performing a transaction commit operation. The two scenarios could be as follows:

Case A: MCO_E_TL_PIPE_TERM in reader and writer

1. Call mco_translog_start() to start logging

2. Call mco_translog_iterate() in an iteration thread to start reading

3. The user-defined iteration procedure returns an error code (anything but MCO_S_OK) to terminate function mco_translog_iterate()

4. When function mco_translog_iterate() returns with the code specified by iteration procedure, the pipe gets terminated

5. The writer tries to commit a transaction and function mco_trans_commit() will return MCO_E_TL_PIPE_TERM because the pipe has already been terminated

6. Start mco_translog_iterate() again without first calling function mco_translog_stop()

7. Now mco_translog_iterate() returns MCO_E_TL_PIPE_TERM because the pipe has terminated

Case B: MCO_E_TL_PIPE_TERM in reader and writer after mco_translog_terminate()

1 Call mco_translog_start() to start logging

2 Call mco_translog_iterate() in an iteration thread to start reading

3 Call mco_translog_terminate() in a writer thread (or the main instance) and the pipe gets terminated

4 Now function mco_translog_iterate() returns MCO_E_TL_PIPE_TERM

5 Trying to commit a transaction causes function mco_trans_commit() to return MCO_E_TL_PIPE_TERM because the pipe has already terminated

Note that if the user-defined iteration callback always returns MCO_S_OK, then scenario A is not possible; scenario B is possible if function mco_translog_terminate() is called when error code MCO_E_TL_IO_ERROR is returned by mco_trans_commit().