A transaction log that is usable for restoring the main eXtremeDB database (which is often the primary purpose of Transaction Logging) may or may not be created. Data to potentially be passed to an external data store is read by the function
mco_translog_iterate()
which calls in its turn a user-defined call-back function. The datamco_translog_iterate()
passes to this call-back function are objects that were created, deleted or modified in the eXtremeDB database inside logged transactions. (Note that this is comparable to using the standard eXtremeDB functionmco_trans_iterate()
to iterate through the objects effected by a transaction, except thatmco_trans_iterate()
only iterates though the objects in the current transaction, and thus provides the capability of exporting this data synchronously, whereas withmco_translog_iterate()
a log file can be read and “exported” at any time.)There are two ways to supply data from a writer process (the process that started transaction logging and is currently processing data inside the database) to a
reader
process (the process reading data and transferring it to the external data store by calling the user-defined callback).The first method is to create a log file. After the log file has been closed by the log writer, this log can be opened and read for export just as it could be opened to restore the main database as in ordinary TL usage.
The second method is to use the pipe mechanism supplied by eXtremeDB Transaction Logging. In this case, data is being transferred through a memory buffer, either in conventional memory or in shared memory. This method is faster than the file-based method, but can create a race situation: if the
reader
process (which is calling the functionmco_translog_iterate()
) does not read data fast enough, then thewriter
process blocks with the result that there is no available memory in the pipe buffer. This causes thewriter
process to be blocked until the pipe buffer has available memory (that is, until thereader
process releases the necessary amount of pipe buffer). This problem can be solved by extending the main pipe buffer with a temporary file in case of buffer overflow; then no blocking occurs inside thewriter
process.The two methods can be combined by setting the flag
MCO_TRANSLOG_DUAL_OUT
.Be aware that when using TL with hybrid databases, performance can be improved by turning off disk manager logging (passing the
NO_LOG
log type into themco_db_open_dev()
API). If a database contains only persistent classes, then the only reason to use TL is to export transactions to an external system because reXtremeDB has its own integral transaction logging implementation that is separate from the eXtremeDB Transaction Logging optional module.The asynchronous export is started by calling the function
mco_translog_start()
with the flagMCO_TRANSLOG_ITERABLE
. This allows reading and then iterating the log by calling the functionmco_translog_iterate()
.Whether a file or pipe is used between the reader and writer processes the flag
MCO_TRANSLOG_ITERABLE
is necessary to start the relay process correctly. Thewriter
andreader
may be threads of a single process or they may be different processes (applications) running on a single computer. When using the file-based method, these applications may even be on different computers – if the process that is applying the logged transactions to the external data store is on a different computer, then the file-based method, though slower, is the only option.Ordinary log file as a transport to a reader
To use an ordinary log file for exporting transactions to an external data store, the file must be opened by function
mco_translog_iterate()
. To be a valid log file, it has to have been closed bymco_translog_stop()
on thewriter
side, or it has to have been indirectly closed by callingmco_translog_start()
withMCO_TRANSLOG_RESTART
. (Note that it is possible to use additional files as semaphores to indicate that a log file was properly closed by a writer.)Opening simple file based log:
#define DISK_PAGE_SIZE 4096 #define LOG_FILE "/tmp/mylog" mco_TL_start_data_t log_parms; log_parms.flags = MCO_TRANSLOG_ITERABLE; log_parms.disk_page_size = DISK_PAGE_SIZE; mco_translog_start (db, LOG_FILE, &log_parms);Reading and iteration of log file:
#define TEMP_BUFFER_SIZE (32*1024*1024) #define LOG_FILE "/tmp/mylog" MCO_RET iteration_proc (mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx) { return MCO_S_OK; } ... mem_buff = malloc (TEMP_BUFFER_SIZE); mco_translog_iterate (LOG_FILE, 0, iteration_proc, user_ctx, mydatabase_get_dictionary (), mem_buff, TEMP_BUFFER_SIZE);Partitioning the log into multiple files
To reduce the delay between the moment when a transaction is written into the log and when it is passed to the user-defined iteration callback, and also to reduce the disk space used by log files, it is possible to limit the maximum size of a log file and switch logging to another file when that limit has been reached.
There are two ways to implement this file size limit. The first method is by starting the logging with flag
MCO_TRANSLOG_SIZE_CLBK
, a maximum log file size and the address of a user-defined file-size callback function. This implements a kind ofinterrupt
strategy; when the specified log file size is exceeded the callback function is called.The second method is a
polling
strategy. The current log file size can be periodically checked by functionmco_translog_get_info()
.With both of these methods the log file size is being checked from the
writer
process.When the log file needs to be switched to another, simply call the function
mco_translog_start()
again but with the additional flagMCO_TRANSLOG_RESTART
. This causes the log file to be switched to another without the necessity of calling the functionmco_translog_stop()
, and also without having to freeze transaction activity for this period, which is very advantageous for multithreaded applications.Example of writing a multiple file log – method 1: “interrupt”
#define DISK_PAGE_SIZE 4096 #define MAX_LOG_SIZE 128*1024*1024 #define LOG_FILE1 "/tmp/mylog1" #define MARK_FILE1 "/tmp/mymark1" #define LOG_FILE2 "/tmp/mylog2" #define MARK_FILE2 "/tmp/mymark2" volatile int size_exceeded = 0; void warn_sz_proc (mco_size_t log_size) { size_exceeded = 1; } ... mco_TL_current_info_t log_info; mco_TL_start_data_t log_parms; log_parms.flags = MCO_TRANSLOG_ITERABLE | MCO_TRANSLOG_SIZE_CLBK; log_parms.disk_page_size = DISK_PAGE_SIZE; log_parms.max_size = MAX_LOG_SIZE; log_parms.warn_sz_proc = warn_sz_proc; mco_translog_start (db, LOG_FILE1, &log_parms); /* Work with database. Current log is the file LOG_FILE1 */ while (!size_exceeded) { ... } log_parms.flags |= MCO_TRANSLOG_RESTART; mco_translog_start (db, LOG_FILE2, &log_parms); create_file (MARK_FILE1); /* Work with database further. Current log is the file LOG_FILE2 */ ... mco_translog_stop (db); create_file (MARK_FILE2);Example of writing a multiple file log – method 2: “polling”
#define DISK_PAGE_SIZE 4096 #define MAX_LOG_SIZE 128*1024*1024 #define LOG_FILE1 "/tmp/mylog1" #define MARK_FILE1 "/tmp/mymark1" #define LOG_FILE2 "/tmp/mylog2" #define MARK_FILE2 "/tmp/mymark2" mco_TL_current_info_t log_info; mco_TL_start_data_t log_parms; log_parms.flags = MCO_TRANSLOG_ITERABLE; log_parms.disk_page_size = DISK_PAGE_SIZE; mco_translog_start (db, LOG_FILE1, &log_parms); /* Work with database. Current log is the file LOG_FILE1 */ while (mco_translog_get_info (db, &log_info) == MCO_S_OK && log_info.log_size < MAX_LOG_SIZE) { ... } log_parms.flags |= MCO_TRANSLOG_RESTART; mco_translog_start (db, LOG_FILE2, &log_parms); create_file (MARK_FILE1); /* Work with database further. Current log is the file LOG_FILE2 */ ... mco_translog_stop (db); create_file (MARK_FILE2);Reading and iteration of multiple log files
Reading from multiple log files is quite simple. The application simply switches log files by sequentially calling function
mco_translog_iterate()
.Example of reading and iteration of multiple log files
#define TEMP_BUFFER_SIZE (32*1024*1024) #define LOG_FILE1 "/tmp/mylog1" #define MARK_FILE1 "/tmp/mymark1" #define LOG_FILE2 "/tmp/mylog2" #define MARK_FILE2 "/tmp/mymark2" MCO_RET iteration_proc (mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx) { /* Note "trans" is always 0 in case of asynchronous export */ return MCO_S_OK; } ... mem_buff = malloc (TEMP_BUFFER_SIZE); wait_while_no_file (MARK_FILE1); mco_translog_iterate (LOG_FILE1, 0, iteration_proc, user_ctx, mydatabase_get_dictionary (), mem_buff, TEMP_BUFFER_SIZE); unlink (MARK_FILE1); wait_while_no_file (MARK_FILE2); mco_translog_iterate (LOG_FILE2, 0, iteration_proc, user_ctx, mydatabase_get_dictionary (), mem_buff, TEMP_BUFFER_SIZE); unlink (MARK_FILE2);Please note that the function
mco_translog_iterate()
returns only after it has completed reading of a log file or if the user defined call-back returns a value other thanMCO_S_OK
or if some other internal error occurs. Also note that it is necessary to callmco_translog_iterate()
repeatedly to read the next log file.
Using a pipe to transport data
When pipe mode is used as a transport for exporting transaction data to an external database, a pipe-buffer is placed in shared or conventional memory (depending on the memory settings used in the main eXtremeDB database being used as the data source). If the
reader
process is processing data faster than thewriter
process is placing it into the pipe, then this mode is clearly the fastest. However, if the reader is slower than the writer process, the pipe buffer will overflow and the writer process will be blocked inside the transaction commit until the reader releases enough memory to store the transaction.It is possible to avoid this blocking by using an optional overflow file. This file is specified in the
file_path
argument of functionsmco_translog_start()
andmco_translog_iterate()
. If a file is specified, it is used if pipe buffer overflow occurs and, therefore, the writer process is never blocked.Naturally, the
writer
andreader
processing with this overflow file will be much slower. This is a consideration when determining the pipe buffer size. Another consideration is that, in any case, pipe mode does not generate log files that can be used for recovery of a database.
The reader process must open a database handle (by calling
mco_db_connect()
) of the database for which some or all transactional data is to be exported. This database handle, in turn, must be passed to the functionmco_translog_iterate()
. This allows access to the pipe from the reader.The pipe buffer is an instance of type
mco_device_t
and must be specified when the database is created. It will be assignedMCO_MEMORY_ASSIGN_PIPE_BUF
and typeMCO_MEMORY_NAMED
orMCO_MEMORY_CONV
, which must be the same memory type as that used by the main database device.It is also possible to specify more than one pipe buffering device. In this case transactions will automatically be broadcast to all pipe instances as separate pipe buffers. This implies that the count of readers (processes or threads calling function
mco_translog_iterate()
ormco_translog_play()
) must be equal to the count of pipe devices. It is useful to check the fieldpipe_readers_connected
of themco_TL_current_info
structure to verify that all readers are connected. As all of the pipe devices work concurrently, it is recommended to specify the same size for all of the pipes. And likewise for all temporary buffers passed intomco_translog_iterate()
. (See sample tlogitermultipipe for an example implementation of multiple pipes.)The temporary buffer passed into
mco_translog_iterate()
should be allocated bymalloc()
if the main database device is of type conventional memory. But if shared (named) memory is used by the main database device, then the pointer to a temporary buffer passed tomco_translog_iterate()
must be 0. In this case,mco_translog_iterate()
will create a temporary buffer in shared memory automatically.
Note that, when using shared memory, it is important to distinguish between the 'offset' and 'direct pointer' nature of the eXtremeDB core. If the 'dptr' core is used then all memory devices (objects of structure
mco_device_t
) must specify some unique 'hint' field which is actually the beginning address of the requested shared memory block. Regarding functions mco_translog_play(),and mco_translog_iterate() , the argument (or corresponding field of structuremco_TL_play_params_h
)void *mem_ptr
is actually related to an internally used memory device which temporarily holds database objects read from a log stream. Thus the argumentmem_ptr
here is the same 'hint' for that memory device. It should be zero in the case of the 'offset' core or should be a valid address value in the case of 'dptr' core.Pre-buffering
When initializing the pipe, the application can specify the
MCO_TRANSLOG_SYNC_INSTANTLY
flag in thedata_flags
structure. If specified, then each transaction is written into the pipe immediately. Otherwise the Transaction Logging runtime buffers transactions in its internal intermediate buffer (the default size is64K
). When the buffer becomes full it is written into the pipe all at once. The flag valuesMCO_TRANSLOG_SYNC_COUNT
andMCO_TRANSLOG_SYNC_TIMER
are not allowed when using multiple pipes and dual out mode (see below).The user-defined callback
iteration_proc
may report an error by returning a value different fromMCO_S_OK
. In this casemco_translog_iterate()
returns immediately with the same return code that theiteration_proc
has just returned. On the writer side the transaction commit (mco_trans_commit()
ormco_trans_commit_phase2()
) will return error codeMCO_E_TL_PIPE_TERM
. This informs the writer process that the reader has broken its loop. In this case, the transaction log should be stopped by callingmco_translog_stop()
before any further actions.Option Mark_Last_Object
The runtime flag
MCO_RT_OPTION_MARK_LAST_OBJ
can be set (by functionmco_runtime_setoption()
) to specify that notification of ”end-of-transaction” is required. Transactions in a log file or pipe stream are output in their original order. By default while reading and iterating the log or pipe there is no simple way to understand if the object passed into the user-defined iteration callback is the latest in the current transaction. For this purpose the runtime optionMCO_RT_OPTION_MARK_LAST_OBJ
is used to enable the runtime to notify the user-defined callback about the end of each transaction. If it is set then user-defined iteration callback will be called one additional time with a zeroed object handle to indicate that the current transaction has finished.Further implementation details are demonstrated in samples tlogiterpipe, tlogitermultipipe and tlogiterdualout.
Opening a pipe-based log
#define LOG_PAGE_SIZE 128 #define PIPE_BUFFER_SIZE (16*1024*1024) mco_device_t dev[]; mco_TL_start_data_t log_parms; ... /* assign pipe buffer as mco device. It will be automatically passed to a reader */ dev[x].assignment = MCO_MEMORY_ASSIGN_PIPE_BUF; dev[x].size = PIPE_BUFFER_SIZE; dev[x].type = MCO_MEMORY_NAMED; sprintf( dev[x].dev.named.name, "%s-pipe", databaseName_ ); dev[x].dev.named.flags = 0; dev[x].dev.named.hint = 0; ... mco_db_open_dev (DATABASE_NAME, mydatabase_get_dictionary(), dev, x, &db_params); mco_db_connect (DATABASE_NAME, &db); log_parms.flags = MCO_TRANSLOG_ITERABLE | MCO_TRANSLOG_PIPE; log_parms.disk_page_size = LOG_PAGE_SIZE; mco_translog_start (db, 0, &log_parms);Reading and iteration of a pipe-based log
#define TEMP_BUFFER_SIZE (32*1024*1024) #define PIPE_BUFFER_SIZE (16*1024*1024) MCO_RET iteration_proc (mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx) { mco_trans_counter_t trans_no; mco_trans_no (trans, &trans_no); if (all_ok) return MCO_S_OK; else return MCO_ERR_LAST + 1; } ... MCO_RET rc; void *mem_buff; mco_db_connect (mydatabase_name, &db); /* if used memory management library is different from mcomconv */ #ifdef USE_SHARED_MEMORY mem_buff = 0; #else mem_buff = malloc (TEMP_BUFFER_SIZE); #endif mco_translog_iterate (0, db, iteration_proc, user_ctx, mydatabase_get_dictionary (), mem_buff, TEMP_BUFFER_SIZE);Opening a pipe-based log with overflow file
#define LOG_PAGE_SIZE 128 #define PIPE_BUFFER_SIZE (16*1024*1024) #define PIPE_OVEFLOW_FILE "/tmp/overflow_file" mco_device_t dev[]; mco_TL_start_data_t log_parms; ... /* assign pipe buffer as mco device. It will be automatically passed to a reader */ dev[x].assignment = MCO_MEMORY_ASSIGN_PIPE_BUF; dev[x].size = PIPE_BUFFER_SIZE; dev[x].type = MCO_MEMORY_NAMED; sprintf( dev[x].dev.named.name, "%s-pipe", databaseName_ ); dev[x].dev.named.flags = 0; dev[x].dev.named.hint = 0; ... mco_db_open_dev (DATABASE_NAME, mydatabase_get_dictionary(), dev, x, &db_params); mco_db_connect (DATABASE_NAME, &db); log_parms.flags = MCO_TRANSLOG_ITERABLE | MCO_TRANSLOG_PIPE; log_parms.disk_page_size = LOG_PAGE_SIZE; mco_translog_start (db, PIPE_OVEFLOW_FILE, &log_parms);Reading and iteration of a pipe-based log with overflow file
#define TEMP_BUFFER_SIZE (32*1024*1024) #define PIPE_BUFFER_SIZE (16*1024*1024) #define PIPE_OVEFLOW_FILE "/tmp/overflow_file" MCO_RET iteration_proc (mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx) { mco_trans_counter_t trans_no; mco_trans_no (trans, &trans_no); ... if (all_ok) return MCO_S_OK; else return MCO_ERR_LAST + 1; } ... MCO_RET rc; void *mem_buff; mco_db_connect (mydatabase_name, &db); /* if used memory management library is different from mcomconv */ #ifdef USE_SHARED_MEMORY mem_buff = 0; #else mem_buff = malloc (TEMP_BUFFER_SIZE); #endif mco_translog_iterate (PIPE_OVEFLOW_FILE, db, iteration_proc, user_ctx, mydatabase_get_dictionary (), mem_buff, TEMP_BUFFER_SIZE);
Endian conversion
It is possible to mix little-endian and big-endian architectures using Transaction Logging. In other words, applications can create a transaction log on a little-endian system and replay the log to a big-endian system, or vice-versa. There is proper auto-detection and auto-conversion which does not require further configuration or user intervention. However, note that mixing of different word-size platforms, i,e,
x32
vs.x64
, is not allowed. Also, note that the same transaction manager (MURSIW or MVCC) must be used in both the writer (log creator) and reader applications.Dual out logging mode
eXtremeDB Transaction Logging allows writing a copy of the transaction log to a file while the pipe mode is used as a transport for exporting transaction data. This file may be used, for instance, to restore a database or to store it in log form on persistent media. The stored log file may then be processed by the API function
mco_translog_apply()
. Or it can be iterated by functionsmco_translog_iterate()
andmco_translog_play()
. Dual out logging is enabled by specifying flagMCO_TRANSLOG_DUALOUT
in addition to flag. Field
MCO_TRANSLOG_PIPE
dual_log_path
of structuremco_TL_start_data
specifies the destination log file. (Sample tlogiterdualout demonstrates this functionality.)Dynamic Pipes
This feature (in eXtremeDB Transaction Logging version 6.5 and later) allows applications to create / remove pipes as well as connect to and disconnect from them at runtime. Database events are logged to the pipe and the connected applications are able to process these logs as necessary; they may also stop processing them and remove the associated resources (pipes and connections) at will. The following application scenario is supported through the dynamic pipes:
1. The primary process creates a shared memory database and starts the transaction logging.
2. Secondary processes connect to the database at any given time, create a pipe using this connection and then process the log that the primary application writes into the pipe.
In reality the secondary processes are asynchronously listening to the database events written into the log, and also have access to the content of the modified database objects that triggered those events. The secondary processes are free to disconnect from the database, and destroy their pipe at any time.
Naturally if the primary process stops the transaction logging via
mco_translog_stop()
, all connected readers get the appropriate return code. The primary application does not receive or process any notifications from the secondary applications, but merely commits its transactions as usual, logging events if required. Multiple pipes can be created at a time and multiple readers can be connected to those pipes.Implementation
1. If the flag
MCO_TRANSLOG_DYNAMIC_PIPE
formco_translog_start()
is set, then readers can attach and detach to pipes at any time without affecting the writers (threads from which the transactions are committed). Each reader must create its own pipe - two or more readers cannot be connected to the same pipe.2. The API
mco_translog_play_ex()
is similar tomco_translog_play()
, except that it takes a single structuremco_TL_play_params_t
as parameter. The fields of the structure correspond to parameters ofmco_translog_play()
with one additional field:mco_device_t *pipe_device;The
pipe_device
is a descriptor of the pipe device to be used by the readers. If the value of thepipe_device
isNULL
then the transaction logging runtime chooses the first pipe that does not have any connected readers. The same happens when the old stylemco_translog_play()
andmco_translog_iterate()
APIs are used.3. The
mco_translog_play_params_init()
function zeroes out themco_TL_play_params_t
structure.4. The runtime option
MCO_RT_MAX_DYNAMIC_PIPES
has the default value of0
. This value defines the maximum number of thePIPE_BUF
type devices that can be added throughmco_db_extend_dev()
. This limit does not include the pipe devices created viamco_db_open_dev()
. In other words, if the application setsMCO_RT_MAX_DYNAMIC_PIPES
to 3 and defines 5 pipe devices at the time the database was created, then it will be able to have up to 8 pipe devices at any moment in time.Using event handlers during iteration of log file or pipe
If the database schema defines events, their handlers may be registered and fired during the reading of the log file or pipe in addition to or instead of iterating the objects themselves. This technique is called 'persistent events queue' and can be used as a method of asynchronous event handling by the current process or by a separate process.
To use this feature, the database schema must define one or more events, and the application must define the handlers for these events, and a function which registers the handlers must be defined in the application. Then function
mco_translog_play()
is used instead of functionmco_translog_iterate()
to iterate the log. (Sample tlogiterevents demonstrates this functionality.)Filtering data stored in the log by events
An events mask may be specified to limit the data written to the logging stream transferred to the log file or pipe. Note that this mode does not allow the use of the log for database recovery due to the filtering. However, it can be useful for specific purposes to reduce the amount of data transferred to the log. For example, if event handlers are used on the reader's side and it is not necessary to transfer all of the data to the log except notification of the event itself.
Flag
is specified to enable this feature and the
MCO_TRANSLOG_EVENT_MASK
event_mask
field of structuremco_TL_start_data
is used to define the desired mask. There are two predefined mask values:MCO_LOG_MASK_ALL_CHANGES
stores all data in the log, which is equivalent to disabling events mask filtering;MCO_LOG_MASK_ALL_EVENTS
stores only the data related to events defined in the database schema. If these predefined values are not used, the mask must be composed from the event handler identifiers generated by the database schema compiler. (Note that event handler identifiers are not zero-based, i.e. they are natural numbers starting from one, and must be decremented by one to use it in a mask. For examplenew
,delete
anddelete all
events could be specified in a mask as follows:tl_parms.event_mask = (1 << (MCO_EVENT_newEvent - 1)) | (1 << (MCO_EVENT_deleteEvent - 1)) | (1 << (MCO_EVENT_deleteAllEvent - 1));(Sample tlogiterevents demonstrates the use of filtering.)
User-defined iteration callback
Consider the following schema:
class Record_A { uint4 key; }; class Record_B { uint4 key; };
Part of the code generated by the schema compiler would be:
typedef struct Record_A_ { MCO_Hf h; } Record_A; #define Record_A_code 1 typedef struct Record_B_ { MCO_Hf h; } Record_B; #define Record_B_code 2The user-defined callback
iteration_proc
is called bymco_translog_iterate()
for each object one by one. Thisiteration_proc
has the following definition:
Prototype
MCO_RET
iteration_proc (
mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx )
Arguments trans
Transaction handle. (It is possible to get the transaction number by calling mco_trans_no()
with this handle.) Could be 0 to indicate that functionmco_db_clean()
was called in source database. See sample tlogiterpipe for details.obj
Object handle of the object that was actually created, deleted or updated inside the transaction. This handle may be used to access the fields of the object with calls like: Record_A_key_get((Record_A*)obj, &key);
Record_B_key_get((Record_B*)obj, &key);
Could be 0 if end of transaction state is indicated. (See sectionOption Mark_Last_Object
for details).cid
The class Id of this object. This will have the same value as the definitions Record_A_code
orRecord_B_code
. Compare this code toRecord_A_code
orRecord_B_code
to determine which type of object is present before attempting to access its fields.obj_state
The type of operation that was performed on the object: MCO_TRANS_OBJ_ALL_DELETED
– delete all objects of this class.MCO_TRANS_OBJ_DELETED
– the object was deleted.MCO_TRANS_OBJ_CREATED
– the object was created. 0 - the object was updated.user_ctx
A pointer to user data passed into the function mco_trans_iterate()
.Note: If the
obj_state
value isMCO_TRANS_OBJ_ALL_DELETED
, then obj is the first object of its class in the database for which the functionclassname_delete_all()
was called inside a transaction. For example:Record_A_delete_all()
. If there were no such objects in the database at the momentclassname_delete_all()
was called, then the current iteration callback will not be called. So it is impossible to have an emptyobj
argument.Example of user-defined callback implementation for the above schema:
MCO_RET iteration_proc (mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx) { mco_trans_counter_t trans_no; if (trans == 0) { /* handle call of mco_db_clean called in source DB */ return MCO_S_OK; } mco_trans_no (trans, &trans_no); if (cid == Record_A_code) { uint4 key; if (obj_state == MCO_TRANS_OBJ_ALL_DELETED) { /* Delete all objects of class Record_A */ } else if (obj_state == MCO_TRANS_OBJ_DELETED) { /* Delete current object by the key */ Record_A_key_get ((Record_A *)obj, &key); } else if (obj_state == MCO_TRANS_OBJ_CREATED) { /* Create new object with the key */ Record_A_key_get ((Record_A *)obj, &key); } else { /* Update object with the key */ Record_A_key_get ((Record_A *)obj, &key); } } else if (cid == Record_B_code) { uint4 key; if (obj_state == MCO_TRANS_OBJ_ALL_DELETED) { /* Delete all objects of class Record_B */ } else if (obj_state == MCO_TRANS_OBJ_DELETED) { /* Delete current object by the key */ Record_B_key_get ((Record_B *)obj, &key); } else if (obj_state == MCO_TRANS_OBJ_CREATED) { /* Create new object with the key */ Record_B_key_get ((Record_B *)obj, &key); } else { /* Update object with the key */ Record_B_key_get ((Record_B *)obj, &key); } } return MCO_S_OK; }User-defined register event handlers callback
Consider the following schema:
class Record { uint4 key; event <new> newEvent; };Part of the code generated by the schema compiler would be:
#define MCO_EVENT_newEvent 1 typedef MCO_RET (*mco_newEvent_handler)( mco_trans_h t, Record * obj, MCO_EVENT_TYPE et, /*INOUT*/ void *param); MCO_RET mco_register_newEvent_handler ( mco_trans_h t, mco_newEvent_handler handler, void * param ); MCO_RET mco_unregister_newEvent_handler ( mco_trans_h t, mco_newEvent_handler handler);The user-defined callback
register_callback()
is called bymco_translog_play()
just before starting the iteration of objects. Thisregister_callback()
has the following definition:
Prototype MCO_RET
register_callback (
mco_trans_h trans, void* user_ctx )
Arguments trans
Transaction handle. This handle should be passed to event registering function. user_ctx
A pointer to user data passed into argument regevent_user_ctx
of the functionmco_trans_play()
.Example of user-defined register callback implementation for the above schema:
MCO_RET register_callback(mco_trans_h t, void *param) { return mco_register_newEvent_handler(t, my_newEvent_handler, param); } MCO_RET my_newEvent_handler( mco_trans_h t, Record * obj, MCO_EVENT_TYPE et, /*INOUT*/ void *param) { ev_stat_t *s = (ev_stat_t *)param; printf("my_newEvent_handler %d\n", ++(s->new_cnt)); return MCO_S_OK; }
Precautions for a potential system failure
If Transaction Logging is employed in a multi-process scenario using pipes, and one of the processes crashes (for example the reader) then the writer process may be suspended waiting on internal locks used in a pipe. To resolve this situation a “sniffer” task can be run alongside the Transaction Logging processes; the
sniffer
task will clean up locks left over from thedead
connections left by the crashed processes. (Please refer to the “Database Recovery from Failed Processes” section of the eXtremeDB User Guide for details regarding thesniffer
API.)Regardless of the method of Transaction Logging (TL) being used (
pipe
,multi-pipe
ordynamic-pipe
), the following steps are recommended:1. Run sniffer in the context of a separate thread in the main application:
#define SNIFFER_INTERVAL 100 MCO_RET sniffer_callback(mco_db_h db, void* context, mco_trans_counter_t trans_no) { SAMPLE_OS_TASK_ID pid = *(SAMPLE_OS_TASK_ID *)context; if ( sample_os_task_id_check( pid ) == 0 ) { return MCO_S_OK; } printf("Process %d is crashed\n", pid); return MCO_S_DEAD_CONNECTION; } void sniffer_loop( sample_task_t * descriptor ) { mco_db_h db; SAMPLE_OS_TASK_ID pid = sample_os_task_id_get(); /* Connect using mco_db_connect_ctx() and pass &pid as parameter */ MCO_RET rc = mco_db_connect_ctx(db_name, &pid, &db); if ( MCO_S_OK == rc ) { /* Descriptor->stopped flag is set to 1 by sample_stop_task() in main thread*/ while ( MCO_S_OK == rc && descriptor->stopped == 0 ) { rc = mco_db_sniffer(db, sniffer_callback, MCO_SNIFFER_INSPECT_ACTIVE_CONNECTIONS); sample_sleep(SNIFFER_INTERVAL); } mco_db_disconnect(db); } }Note that the sniffer is executed in the task that contains the
sniffer_loop()
function. The callback itself merely makes sure that all tasks connected to the database are alive (see for example samples/native/core/19-recovery/sniffer).2a) For the
pipe
case the main program will initialize the TL processing and watch for any errors reported by TL. If any errors are reported, the main application will terminate the TL processing (which will free all locks set by the TL runtime) and, if necessary, restart TL to allow other TL clients to continue processing the pipe, for example:... /* allocate pipe memory device */ dev[1].type = MCO_MEMORY_NAMED; dev[1].assignment = MCO_MEMORY_ASSIGN_PIPE_BUF; sprintf( dev[1].dev.named.name, "%s-pipe", db_name ); dev[1].size = DATABASE_SIZE / 2; dev[1].dev.named.flags = 0; dev[1].dev.named.hint = 0; ... /* Set up and run transaction logging (pipe is created here) */ tl_parms.flags = MCO_TRANSLOG_ITERABLE | MCO_TRANSLOG_PIPE; tl_parms.disk_page_size = PSTORAGE_PAGE_SIZE; rc = mco_translog_start( connection, 0, &tl_parms ); /* Wait for TL-client connection */ printf("\n\n\tWaiting for log reader ...\n" ); while(1) { mco_TL_current_info_t tl_info; mco_translog_get_info ( connection, &tl_info ); if (tl_info.pipe_readers_connected) break; else sample_sleep (100); } ... while ( data processing ) { while (1) { /* process database transaction */ if (rc == MCO_S_OK) { ... } else if (rc == MCO_E_TL_IO_ERROR || rc == MCO_E_TL_PIPE_TERM ) { /* TL processing error detected */ /* force logging to stop */ rc = mco_translog_terminate( connection ); /* wait for TL connection */ printf("\n\n\tWaiting for log reader ...\n" ); while(1) { mco_TL_current_info_t tl_info; mco_translog_get_info ( connection, &tl_info ); if (tl_info.pipe_readers_connected) break; else sample_sleep (100); } /* restart TL processing */ tl_parms.flags |= MCO_TRANSLOG_RESTART; rc = mco_translog_start( connection, 0, &tl_parms ); sample_rc_check( "\tRe-start logging", rc ); break; } else ... } }2b. For the
Multi-pipe
case the TL handling is similar, except that it is necessary to create several pipe-devices:... for ( i=1; i<=PIPES_COUNT; i++ ) { dev[i].type = MCO_MEMORY_NAMED; dev[i].assignment = MCO_MEMORY_ASSIGN_PIPE_BUF; sprintf( dev[i].dev.named.name, "%s-pipe-%u", db_name, i ); dev[i].size = DATABASE_SIZE / 2; dev[i].dev.named.flags = 0; dev[i].dev.named.hint = 0; } ... rc = mco_translog_start( connection, 0, &tl_parms ); sample_rc_check( "\tStart logging", rc ); /* Wait while iterator thread is alive */ printf("\n\n\tWaiting for log reader ...\n" ); while(1) { mco_TL_current_info_t tl_info; mco_translog_get_info ( connection, &tl_info ); if (tl_info.pipe_readers_connected == PIPES_COUNT) break; else sample_sleep (100); } ... while ( data processing ) { while (1) { /* process database transaction */ if (rc == MCO_S_OK) { ... } else if (rc == MCO_E_TL_IO_ERROR || rc == MCO_E_TL_PIPE_TERM ) { /* TL processing error detected */ /* force logging to stop */ rc = mco_translog_terminate( connection ); /* wait for TL connection */ printf("\n\n\tWaiting for log reader ...\n" ); while(1) { mco_TL_current_info_t tl_info; mco_translog_get_info ( connection, &tl_info ); if (tl_info.pipe_readers_connected == PIPES_COUNT) break; else sample_sleep (100); } /* restart TL processing */ tl_parms.flags |= MCO_TRANSLOG_RESTART; rc = mco_translog_start( connection, 0, &tl_parms ); sample_rc_check( "\tRe-start logging", rc ); break; } else ... } }2c. The
dynamic-pipes
case is trivial, for example:... /* Set maximum number of dynamic pipe segments */ mco_runtime_setoption(MCO_RT_MAX_DYNAMIC_PIPES, PIPES_COUNT); ... /* Set default database parameters */ mco_db_params_init ( &db_params ); /* Customize the params according to the application */ db_params.mem_page_size = MEMORY_PAGE_SIZE; db_params.disk_page_size = 0; /* Pure in-memory database */ db_params.db_max_connections = 10 + PIPES_COUNT; db_params.connection_context_size = sizeof(SAMPLE_OS_TASK_ID); ... /* Set up and run transaction logging (pipe is created here) */ tl_parms.flags = MCO_TRANSLOG_ITERABLE | MCO_TRANSLOG_PIPE | MCO_TRANSLOG_DYNAMIC_PIPE | MCO_TRANSLOG_SYNC_INSTANTLY; tl_parms.disk_page_size = PSTORAGE_PAGE_SIZE; rc = mco_translog_start( connection, 0, &tl_parms ); sample_rc_check( "\tStart logging", rc ); /* Wait while iterator thread is alive */ printf("\n\n\tWaiting for log reader ...\n" ); while(1) { mco_TL_current_info_t tl_info; mco_translog_get_info ( connection, &tl_info ); if (tl_info.pipe_readers_connected) break; else sample_sleep (100); } ...Possible Data Relay Deadlock
There are some possible situations where data relay deadlock may happen. For instance:
- When there are two database (or more) connections, the first connection might flood the database with new records while the second connection is used to start the transaction logging reader (e.g. calling
mco_translog_play()
etc.). Then inside a user-defined reader callback function there might be some data processing that results in writing back into the same database from this second connection. Under heavy load it is possible that a write transaction in a user-defined reader callback (on the second connection) could not be completed and is locked because there is no room in the pipe to write data. But at the same time the pipe could not be read (to free some space in it) because the reading procedure is performing in the same thread where commit operation is already blocked.- Under heavy load without a pipe overflow file when the thread committing objects in response to reading by the iterator fails to perform a commit because the pipe is full. It blocks waiting for the reader to free space in the pipe. But the reader thread fails to do that being blocked itself waiting for the first thread to write and free the queue which is also limited and full. Actually any of the threads (connections) that is trying to commit a transaction will be blocked if the pipe is full and the reader is not able to read data as reader thread is blocked in one of the hung commits itself.
This is non-obvious yet natural behavior of the closed loop system (commit - log - pipe - reader's analyzer - commit again). Trying to split the pipe reading procedure and the commit of new data which is some derivative of read data (eg. commit - log - pipe - reader's analyzer - some data queue - another application thread and DB connection - commit again) may cause deadlock. Basically if the application is committing some data to a database which is the result of reading the transaction logging pipe, dead-lock is possible under heavy load situations.
This can be moderated at the application level by artificially throttling the initial commits. In fact, initially the Data Relay feature was designed to transfer data from eXtremeDB to another DBMS or to some data processor. So it was not assumed that processed data would be committed back to eXtremeDB. Fortunately the pipe overflow files resolve this problem of "data loop-back". The important fact is the only reliable way to avoid deadlock is to use a pipe overflow file (which does imply a penalty to the overall performance).
Possible Error Conditions for MCO_E_TL_PIPE_TERM
There are two cases when error code
MCO_E_TL_PIPE_TERM
could be returned both by a reader call tomco_translog_iterate()
and a writer performing a transaction commit operation. The two scenarios could be as follows:Case A: MCO_E_TL_PIPE_TERM in reader and writer
1. Call
mco_translog_start()
to start logging2. Call
mco_translog_iterate()
in an iteration thread to start reading3. The user-defined iteration procedure returns an error code (anything but
MCO_S_OK
) to terminate functionmco_translog_iterate()
4. When function
mco_translog_iterate()
returns with the code specified by iteration procedure, the pipe gets terminated5. The writer tries to commit a transaction and function
mco_trans_commit()
will returnMCO_E_TL_PIPE_TERM
because the pipe has already been terminated6. Start
mco_translog_iterate()
again without first calling functionmco_translog_stop()
7. Now
mco_translog_iterate()
returnsMCO_E_TL_PIPE_TERM
because the pipe has terminatedCase B:
MCO_E_TL_PIPE_TERM
in reader and writer aftermco_translog_terminate()
1 Call
mco_translog_start()
to start logging2 Call
mco_translog_iterate()
in an iteration thread to start reading3 Call
mco_translog_terminate()
in a writer thread (or the main instance) and the pipe gets terminated4 Now function
mco_translog_iterate()
returnsMCO_E_TL_PIPE_TERM
5 Trying to commit a transaction causes function
mco_trans_commit()
to returnMCO_E_TL_PIPE_TERM
because the pipe has already terminatedNote that if the user-defined iteration callback always returns
MCO_S_OK
, then scenario A is not possible; scenario B is possible if functionmco_translog_terminate()
is called when error codeMCO_E_TL_IO_ERROR
is returned bymco_trans_commit()
.