The eXtremeSQL Distributed SQL Engine provides limited support for the database sharding architecture for eXtremeDB Cluster installations. Why is it limited? Most full-blown distributed database engines (such as the one found in Oracle for example) normally create an execution plan based on the query tree and data distribution statistics (or other knowledge of how the data is distributed between shards), that contains “map-reduce” style operations. The eXtremeSQL distributed engine merely executes the query on every node over that node's shard, and consolidates the result sets received from multiple nodes when possible (the consolidation of the results is referred to as merge). Sometimes merging the result sets is simply impossible, as it is for example for calculating an average. More often, the engine does not have enough information to make sure that the combined result set is correct. The Distributed SQL Engine is taking the most optimistic approach -- it always assumes that the application distributed data between shards and created the SQL query to avoid merging problems. Yet with the understanding of the engine's limitations, many application's will benefit from using the distributed engine and improve their overall database access performance dramatically.
The Distributed SQL Engine sends a query to one of the network nodes, or broadcasts queries to all nodes. A node is specified through the query prefix. In order to control the distribution of data, the application must either load data to each shard locally, or specify the node ID (number) in the insert statements. For example:
10:insert into T values (…)Also the application can explicitly use the current node ID in the select condition when selecting out records that are inserted on the specific node (
%#
indicates the current node ID, and%@
the number of shards). For example:insert into hist_cpvehicleid_jj select * from foreign table (path='/home/usr/shea2.csv', skip=1) as hist_cpvehicleid_jj where mod(hashcode(fstr_vechileid), %#)=%@;If none of those methods are used, the DistributedSQL broadcasts the insert on all nodes. The following query types are supported:
select * from T;
*:select * from T;
-- similar to the above, run the statement on all nodes
N:select * from T;
-- execute the statement on the nodeN
(nodes are enumerated from 1)
?:select * from T;
-- execute the statement on any node. Currently SQL picks up the node in the round-robin fashion, thus implementing a simple load-balancing schemeOnce the query has been executed and the result set is created on each node, the Distributed SQL Engine collects the resulting data sets from all nodes. If the query contained an aggregation or a sort clause, or sequence functions (statistical functions operating on fields of type sequence; See the “Vector-based Statistical Functions for SQL” section in Chapter 3 for details), then the result sets are merged. The Distributed SQL Engine currently supports the following aggregates:
- COUNT
- MIN
- MAX
- SUM (aggregate operand is a column) with or without a '
group by
' clauseThe engine does not currently support:
- Merging aggregates with the DISTINCT qualifier;
- Aggregates over complex expressions (such as
X+Y
), except if the sort by or group by columns are included into the result set or the select statement;For instance (using
Metatable
as an example):select Metatable.*,FieldNo+FieldSize as ns from Metatable order by ns;or
select T.*,x+y as xy from T order by xy;
- Aggregates over hash (
seq_hash_aggregate_*
), except for two scenarios:
when the data that the hash table is built over belongs to a single shard (exchange in the example below):
select seq_hash_agg_sum(price,exchange) from Quote;
- if the sequence is converted into a horizontal representation -- flattened, the query can run regardless of the data distribution:
select flattened seq_hash_agg_sum(price,exchange) from Quote;
- The
AVG
aggregate. Unless the groups from different nodes do not overlap. For example the following layout is supported:Node1: Symbol Price AAA 10.0 AAA 12.0 AAA 9.0 BBB 11.0 BBB 10.0 BBB 10.0 Node2: Symbol Price CCC 8.0 CCC 7.0 CCC 10.0 DDD 15.0 DDD 14.0 DDD 13.0 select avg(Price) from Quote group by Symbol;Other examples of valid and invalid statements are:
select * from T;Supported; the results are concatenated.
select * from T order by y;Supported; sort results from all nodes
select * from T order by x+y;Complex expressions are not currently supported
select sum(x) from T;Supported; the aggregated results are merged
select avg(x) from T;Merge of AVG aggregate is not currently supported
select y,sum(x) from T group by y;Supported; groups and aggregates are merged
select sum(x) from T group by y;Supported; note that the 'group by' columns must be included in the "from" list
select sum(x*2) from T;Complex expressions are not currently supported
select ifnull(sum(x), 0) from T;Supported; aggregate results are merged
select seq_sum(x) from T;Supported; results are merged
select seq_hash_agg_sum(x,y) from T;Merge of the hash aggregates are not currently supported
select flattened seq_hash_agg_sum(x,y) from T;Supported; sorted, split in groups and aggregated
The Distributed SQL Engine supports definition for a sharding condition when data is imported from a
CSV
file or through application code. Sharding of CSV imported data is specified through the following SQL statement:select from foreign table (path='csv-file', skip=n) as PatternTable where distribution-conditionAs mentioned above, the Distributed SQL Engine also supports special
%@ and %#
pseudo-parameters. The first corresponds to the node number (zero based); the second is used to specify the total number of nodes. For example:echo "insert into table_name select * from foreign table (path='table_name_file.csv', skip=1) as table_name where mod(instrument_sid/$chunk_size,%#)=%@;" > loadrisk.sql ./xsql.sh loadrisk.sqlThe
xsql.sh
script invokes the Distributed SQL Engine as follows:xsql @node1 @node2 ... @nodeN $@When the application reads the input data from a stream (a socket in the example below) or any other source, it is inserted into the database through a code fragment similar to the following:
table_name tb; socket_read(s, &tb); engine.execute("insert into table_name(starttime, endtime, book1, book2, instrument_sid) values (%l,%l,%s,%s,%l)", tb.starttime, tb.endtime, tb.book1, tb.book2, tb.instrument_sid);The following fragments would add a sharding condition to the application's code:
char sql[MAX_SQL_STMT_LEN]; table_name tb; socket_read(s, &tb); sprintf(sql, "%d:insert into table_name (starttime,endtime,book1,book2,instrument_sid) values (%%l,%%l,%%s,%%s,%%l)", tb.instrument_sid%n_nodes); engine.execute(sql, tb.starttime, tb.endtime, tb.book1, tb.book2, tb.instrument_sid);