Shard-Query blog

The only open source MPP database engine for MySQL

If you downloaded Shard-Query 2.5, please redownload to remove a PHP warning

There was some code in Shard-Query 2.5 that was not protected by an if() clause, and subsequently caused queries without a GROUP BY to generate a warning.  The warnings didn’t cause the test suite to fail and I missed them.   I updated the Shard-Query 2.5 binary so please redownload it if you get a warning about GROUP BY when not using GROUP BY in a query.

Shard-Query 2.5 is now released

Shard-Query 2.5 has been a long time coming, but the release is finally officially out the door.

There are numerous changes from the last major release including:

  •  Improved parser – fully handles complex expressions
  •  LOAD DATA INFILE support and S3 support
  •  Semi-join materialization for IN and NOT IN subqueries
  •  Improved support for subqueries in the FROM clause
  •  INSERT .. SELECT and CREATE TABLE .. SELECT support
  •  Ability to do range lookups on the shard key (IN/BETWEEN/etc)
  •  Improved proxy – supports SHOW commands too
  •  Support for all MySQL SELECT dialect including WITH ROLLUP
  •  Custom aggregate function support
  •  Asynchronous query support
  •  Numerous bug fixes

You can find it here

Shard-Query loader gets a facelift and now Amazon S3 support too

Shard-Query (source) now supports the MySQL “LOAD DATA INFILE” command.

When you use LOAD DATA LOCAL INFILE a single threaded load from the current process will be performed.  You can specify a path to a file anywhere readable by the PHP script.  This allows loading without using the Gearman workers and without using a shared filesystem.

If you do not specify LOCAL, then the Gearman based loader is used.  You must not specify a path to the file when you omit the LOCAL keyword.  This is because the shared path will the pre-pended to the filename automatically.  The shared path must be a shared or network filesystem (NFS,CIFS,etc) and the files to be loaded must be placed on the shared filesystem for the Gearman based loader to work.  This is because workers may run on multiple nodes and all workers have to be able to read from the files to be loaded.

S3 is supported as a source of data

Instead of using a shared filesystem, S3 is now supported too.  You must specify an AWS access key and secret key when setting up Shard-Query.  After those are set up, simply use LOAD DATA INFILE ‘s3://bucket/filename‘ to load from an S3 bucket using Gearman workers.  The file will be split up into smaller chunks efficiently and automatically, and each 16MB chunk will be loaded individually.

If you use LOAD DATA LOCAL INFILE ‘s3://bucket/filename’ then Gearman will not be used and the file will be loaded from the local process instead.

Important: When the Gearman loader is used (recommended) the S3 load will be split over the workers, each worker loading a 16MB chunk of the file in parallel.

Shard-Query supports background jobs, query parallelism, and all SELECT syntax

SkySQL just blogged about a tool to schedule long running MySQL jobs, prevent too many queries from running simultaneously, and stores the results in tables.  It even uses Gearman.  You will note that the article says that it uses PAQU, which uses Shard-Query.

I think PAQU was created for two reasons.  A) Shard-Query lacked support for fast aggregation of STDDEV and VARIANCE (this has been fixed), and B) their data set requires “cross-shard queries”.  From what I can see though, their type of cross-shard queries can be solved using subqueries in the FROM clause using Shard-Query, instead of using a customized (forked) version of Shard-Query.  It is unfortunate, because my recent improvements to Shard-Query have to be ported into PAQU by the PAQU authors.

I’d like to encourage you to look at Shard-Query if you need to run complex jobs in the background and get the results later.  As a bonus, you get support for parallel query using partitioning, and you get shared-nothing scale-out for sharding too.  You get the latest and greatest improvements to Shard-Query, and if you have a query that you aren’t sure can be executed by Shard-Query, run the problem by me first so that I can suggest solutions before you fork the product.

Also:
Since scientific data sets rarely need to be updated (if ever), a column store like Infobright is ideal for such data.  This can reduce raw multi-TB size data sets into multi-hundred GB sized data sets that can be processed much faster.  Combine this with the ability to save on IO by reading on the necessary columns, and extremely fast data processing is possible.

To run background jobs in Shard-Query:

  1. $SQ = new ShardQuery();
  2. $SQ->async = true;
  3. $job_id=$SQ->query(“select count(*) from 1TB_table”);
  4. echo “Check the shard_query.jobs table for the completion status of $job_id\n”;

To poll for completed jobs:

  1. function wait_for_jobid($job_id) {
  2.   $sql = “select * from jobs where completion_percent = 100.00 or completion = ‘error’ and job_id = ” . (!is_numeric($job_id) ? ‘NULL’ : $job_id);
  3.   while(1) {
  4.     $SQ->state->mapper->sweep_jobs();
  5.     $stmt = $SQ->state->mapper->conn->my_query($sql);
  6.     $cnt = 0;
  7.     while($row=$SQ->DAL->my_fetch_assoc($stmt)) {
  8.       $cnt++;
  9.       break;
  10.     }
  11.     if($cnt > 0) break;
  12.     sleep(1);
  13.   }
  14.   return 1;
  15. }

To get the result after it is finished:

  1. $stmt = $SQ->get_async_result($job_id);
  2. while($row = $SQ->DAL->my_fetch_assoc($stmt)) {
  3. print_r($row);
  4. }

Note: SimpleDAL – Shard-Query data access layer
You will notice the above query uses the ->DAL member.  This is the SimpleDAL interface which ships with Shard-Query.  It allows Shard-Query to talk to different database providers (even possibly NoSQL providers) through a unified interface.  The class operates similarly to the mysqli class, except the functions are prefixed with my_ instead of mysqli.  Internally it uses PDO to talk to MySQL.

Shard-Query is now much faster for some aggregate functions

I checked in some improvements to Shard-Query.

Now STD, STD_SAMP, VAR and VAR_SAMP can be orders of magnitude faster for large data sets. This is because they are now distributed like COUNT, AVG and other fully distributable MySQL aggregate functions.   Prior to this change, my test query would create a 22GB (530M row) temporary table.  It now creates a one row temporary table.  This reduces network traffic, temporary storage space and increases performance.

Shard-Query better reports initialization errors. This mostly means that if you don’t have gearmand running you will no longer get a cryptic PHP output, but a nice error message.

You can change the storage engine for the repo by changing only one line in shard_query.sql. This lets you more easily install on Infobright, which needs MyISAM tables, not InnoDB tables in the repo.

New Shard-Query features checked into SVN

I checked some updates to Shard-Query into SVN.

Partitioning support has been extended for MySQL 5.6+ to ALL partitioning types.

This includes all previously unsupported types including RANGE LIST/COLUMNS partitioned tables that are partitioned over more than one column, and HASH/KEY/LINEAR variants as well. Shard-Query now exclusively uses the PARTITION hint for partition elimination instead of WHERE clauses in MySQL 5.6. For 5.5 and previous versions, support remains limited to LIST,RANGE, and LIST/RANGE COLUMNS over a single column.

The old mysql interface DAL has been replaced completely by the PDO DAL.

There is no major difference for end users except that you have to check that the return of the query() method is an object with the is_object() function instead of checking that it is a resource with the is_resource() function. I updated bin/run_query, which is the example application.

I made a few bug fixes to the PDO DAL as well.  Thanks again to Alex Hurd for contributing the PDO DAL.

Comparing Amazon Redshift and Shard-Query features and performance

What is Amazon Redshift and how does it compare to Shard-Query?

Amazon Redshift is the petabyte scale data warehousing system built by Amazon.   It is (currently at the time of this writing)  a beta-quality data warehouse as a service platform hosted in the AWS cloud.   It has has been built from ParAccel technology.  Amazon is an investor in ParAccel.

Amazon Redshift works similarly to Shard-Query.   Both systems allow you to spread data over many machines and treat them as one logical machine.  This architecture  is called “shared nothing” and it has developed the short name “sharding”.

Both systems essentially provide a “virtual database” composed of smaller database.  It is like a RAID array of databases.  You could think of it as a “redundant array of independent databases” in fact.

Sharding

Both Redshift and Shard-Query shard your data internally, that is, they transparently split the database up into smaller physical databases that represent one larger logical database.

Amazon Redshift is built on top of PostgreSQL 8 but has significant underlying changes in operation.

Similarly, Shard-Query is built to access MySQL 5.1+ with some limitations.   The limitations vary since  Shard-Query can use different versions and variants of MySQL, each with various different incompatibilities with the core version.   Thus available Shard-Query (and MySQL) features are always somewhat dependent on the storage engine being used.

In short, Redshift is the petabyte scale version of PostgreSQL, as built by Amazon and ParAccel.  Shard-Query is the petabyte scale version of MySQL,  as built by me.

Data distribution in Redshift

When a table is created in Redshift, the user can optionally choose a “distkey” for that table.   If no distkey is chosen, then the table is evenly distributed over all nodes.  In Redshift, joins between tables that do not share a distkey are possible but an expensive query plan may be generated which may require the transfer of a lot of data between nodes.  This is a problem common in most shared-nothing database systems.

With Redshift, in general it is usually necessary to pick good sortkeys and distkeys to get good performance.   You must be careful to avoid joining large tables together if they are not both distributed on the same distkey.  The per-table “sortkey” indicates in which physical order in which Redshift should store tuples in a table.

In many cases sort/merge joins can be made significantly less expensive by the careful selection of sortkey and distkey because Redshift can then eliminate the sort phase of the sort/merge join when performing the join.   However, careful construction of the schema and proposed queries must be done to ensure that distkey/sortkey combinations result in good performing queries.

Data distribution in Shard-Query

Shard-Query operates a bit differently.  In Shard-Query the user must specify a “shard key”.  This is essentially a database-wide “distkey”.

When a table contains the shard key then the table is distributed over the nodes and is considered as a sharded table.  Tables that do not contain the shard key are considered unsharded tables.  Unsharded tables are completely duplicated  (ie, replicated) on each node.  For example, in this benchmark the shard key is LO_OrderDateKey which is a column contained only in the fact table and guaranteed to result in even data distribution of the largest table.  The dimension tables (lookup tables) are duplicated in each shard.

This means that joins between tables which are sharded and those that are unsharded are fast because data movement between nodes is not necessary to complete the query.  The downside is that Shard-Query doesn’t have a native understanding of how to join between tables that have different shard keys.  In fact, the system is constructed so that you can’t even do that, even if you want to.  This is enforced by the limitation that you may only have one shard key per virtual database.   I suggest that you investigate Hadoop if you really need to join between very large tables which must be sharded on different columns, or of course you can always try Redshift and see if it performs well enough for you if Shard-Query doesn’t fit your use case.

Shard-Query is geared mostly toward high performance star schema or monolithic tables.   These are common in data marts, data warehouses and in big data processing systems like call detail records, web logs and other data with a long tail.  It also works well when a “master/detail” relationship like blog posts and blog comments can be established (both are sharded by the shard key “blog_id”.)  The second case of course works well for Redshift too, as the joins between the tables will be local if the tables share the same distkey (blog_id in this case).

Database storage

Amazon Redshift is a columnar database.  It does not support indexes and it enforces no constraints but can use constraint definitions to create better informed plans.   In Redshfit you must be careful, as improperly defined constraints can lead to wrong results during query execution, especially during subquery evaluation.  Redshift has an automatic compression mechanism that can be used when COPY is used to load data from S3 or DynamoDB sources.

Shard-Query is MySQL storage engine agnostic.  It works with InnoDB, MyISAM, TokuDB, Infobright and probably more.  Thus you can have a compressing row store (Tokudb or XtraDB) or a compressing column store (ICE or IEE) or some hybrid of both: you could keep recent data in the row store for frequent modification, then move data into ICE over time,  for example.  Compression is automatic as long as the underlying storage engine compresses data automatically.

Query execution

Each Amazon Redshift cluster is assigned a “leader node”.  Some operations can be single threaded on the leader node, such as inserting into tables or doing certain types of loads.

A Shard-Query grid has no leader node.  Any node can operate as a leader for any operation and all nodes can lead independently at the same time.  Coordination and a FIFO of work is done through a message queue called Gearman.  As long as the storage engine supports concurrency, then there are no limits on concurrent loading and querying for example, as long as there are resources to support the concurrency.  If the underlying storage engine can not support concurrent insert (MyISAM, ICE) then loading performance is very limited right now.   This will be resolved in the next beta of Shard-Query.

Both systems support INSERT/UPDATE/DELETE/TRUNCATE/etc.  Shard-Query can only perform operations on a table that are allowed by the underlying storage engine.  Shard-Query does not allow DML on ICE based tables, because ICE based tables can not be modified via DML, for example.  You will simply get an error from the storage engine that the operation is unsupported if you try to modify ICE data with DML.

MPP (massively parallel processing)

Both Shard-Query and Redshift access shards in parallel.  Redshift defines “slices” on each shard so that the CPU and memory resources on each node are divided evenly for query processing.   Shard-Query supports the same by placing multiple “shards” on the same physical machine, or by using table partitioning for engines which support it, or both.

Because both systems are based on shared-nothing architecture when you double the nodes you double the speed.

Analytics and custom functions

Redshift has SQL:2003 window functions (PARTITION … OVER …), common table expressions and other SQL:2003 features that are part of PostgreSQL but not part of MySQL.  Thus Redshift is capable of executing advanced SQL:2003 queries that Shard-Query is not (yet) capable of.  Window functions are on the Shard-Query roadmap for the next major release after the current beta.

Shard-Query on the other hand can be extended with custom functions, but Redshift can not.  Shard-Query ships with a PERCENTILE example function which is similar to the NTILE SQL:2003 window function (I named it differently to avoid ambiguity.)  The extensible PHP nature of Shard-Query makes adding custom functions easy.

In the next major release of Shard-Query this pluggable function interface will be used to implement all of the SQL:2003 windowing functions, but additional parser support is needed to make that completely possible.  This will bring Redshift and Shard-Query into SQL level parity, with Shard-Query still able to support custom functions if you want to add them.

Comparing performance of Redshift versus Shard-Query on SSB scale factor 200 (120GB raw data)

Not everything is equal in the world

Instance type differences

Amazon doesn’t appear to provide a way to spin up the same AMI type they use for Redshift  for use as a normal EC2 instance.   That is, I could find no AWS EC2 instance which is a direct 1:1 mapping to a Redshift XL or 8XL node.  For the purposes to testing Shard-Query I decided to use the older m1.large instance type.  This instance has less CPU, half the ram and half the storage of the Redshift XL nodes, so please take that into consideration when comparing the results.

Execution strategy differences

Redshift can perform joins very effectively if the sortkey and distkey are carefully selected.  I have configured the Redshift schema (attached later) with optimal sortkeys and distkeys for this benchmark.  This allows Redshift to perform the most common and largest join more effectively than  Shard-Query/MySQL can.  Of course this means that the apple cart is already tilted in the benefit of Redshift for some queries.  That being said, I feel it is only fair to compare both systems when they are both configured in the most optimal way available.

On the other hand, by distributing unsharded tables to all nodes, Shard-Query trades space for time by distributing data once instead of transmitting data for each query.

Context is very important

This benchmark is somewhat of an apples to oranges comparison.  Shard-Query has different capabilities than Redshift and is given less overall resources.  I’m not complaining about this.   It just is important to understand all of the benchmark numbers in this context.

You should already understand that Shard-Query will likely not perform exactly the same as Redshift with the same data set and same queries because of the above logic.  It maybe worse, or it may be better.  Understanding why it is different is what is key.

Tested Configuration

    • Both tests used 16 nodes.  Shard-Query used 16 m1.large nodes (no leader node as one is not required).
    • Redshift was provisioned with 16 XL nodes and one leader node (not configurable).

Shard-Query was used with the newest version of Infobright Community Edition.  Each Shard-Query node ran two copies of the database software, each on a different port for isolation.  Thus there are two “slices” per node.  Redshift also places two slices per XL node.

Aggregated grid resources

    • At 16 nodes Shard-Query has 120GB of aggregate memory available.
      • Redshift has 240GB of aggregate memory.
    • At 16 nodes Shard-Query has 64 ECU.
      • Redshift has 70.4 (plus the leader node).
    • At 16 nodes Shard-Query has 13340MB of locally available database storage.
      • Redshift has 32000MB.

Price/performance ratio

    • In the tested configuration Shard-Query costs 3.84/hour to run 16 nodes.  Redshift costs 13.60/hour.
    • Redshift at most exceeds Shard-Query performance by 3x.  Most queries are close in performance for significantly less cost.
    • The price/performance argument for Shard-Query is very compelling.
    • With Shard-Query you can choose any instance size from micro (not a good idea) all the way to  high IO  instances.  The latter of which have 16 cores, 60GB ram and 2TB of SSD storage.
    • Only two node sizes are available in Redshift which limits flexibility.

Response times (each measure is an average over three query executions)

image001

Here in this first set of tests you can see that the distkey/sortkey combination between dim_date and the fact table (lineorder) is very effective in increasing join performance in Redshift.

The date dimension has a sortkey of (D_Year, D_DateKey) and a distkey of (D_DateKey).

The fact table (lineorder) is being joined to dim_date using D_DateKey = LO_OrderDateKey, and lineorder has a distkey of LO_OrderDateKey and ALSO a sortkey of LO_OrderDateKey.

There is a predicate on D_Year that allows the filter of the dimension to access both the dimension and fact tables in sorted order, thus the join happens on the same node and the sort phase of the sort/merge join can be bypassed.

Shard-Query does a hash join because that is what ICE does and that is just going to be slower than the fancy pre-sorted sort/merge optimization employed by Redshift.

I must say that the way Redshift does it is quite neat :)
image003

Here Shard-Query bests Redshift when the amount of data to be examined decreases and performs very similarly when the data to be examined is large.  It looks like execution in the ICE engine may be slower when lots of data has to be uncompressed but this will need more investigation.

image005

image007

These two flights show that Shard-Query performs pretty closely to Redshift, but Redshift still performs better in a lot of cases.  That being said, it is important to understand price/performance measurement.

Conclusion

In many cases Redshift does better in this benchmark, sometimes quite a bit better (around 3x better in some cases).

Keep in mind that Shard-Query is running with less resources at 1/3 the cost!

And remember that we can triple the size of the Shard-Query cluster to likely get the same or better performance compared to Redshift for the same amount of money.

Overall I am very happy with Shard-Query performance.  Let me know how it works for you.

Redshift Schema

CREATE TABLE customer
(
 C_CustomerKey int primary key,
 C_Name varchar(25),
 C_Address varchar(25),
 C_City varchar(10),
 C_Nation varchar(15),
 C_Region varchar(12),
 C_Phone varchar(15),
 C_MktSegment varchar(10)
)
 distkey(C_CustomerKey)
 sortkey(C_Region, C_CustomerKey)
;
CREATE TABLE part
(
 P_PartKey int primary key,
 P_Name varchar(25),
 P_MFGR varchar(10),
 P_Category varchar(10),
 P_Brand varchar(15),
 P_Colour varchar(15),
 P_Type varchar(25),
 P_Size smallint,
 P_Container char(10)
) 
 distkey(P_PartKey)
 sortkey(P_MFGR, P_PartKey)
;
CREATE TABLE supplier
(
 S_SuppKey int primary key,
 S_Name char(25),
 S_Address varchar(25),
 S_City char(10),
 S_Nation char(15),
 S_Region char(12),
 S_Phone char(15)
) 
 distkey ( S_SuppKey )
 sortkey ( S_Region, S_SuppKey )
;
CREATE TABLE dim_date
(
 D_DateKey int primary key,
 D_Date char(18),
 D_DayOfWeek char(9),
 D_Month char(9),
 D_Year smallint,
 D_YearMonthNum int,
 D_YearMonth char(7),
 D_DayNumInWeek smallint,
 D_DayNumInMonth smallint,
 D_DayNumInYear smallint,
 D_MonthNumInYear smallint,
 D_WeekNumInYear smallint,
 D_SellingSeason char(12),
 D_LastDayInWeekFl smallint,
 D_LastDayInMonthFl smallint,
 D_HolidayFl smallint,
 D_WeekDayFl smallint
) 
 distkey ( D_DateKey )
 sortkey ( D_Year,D_DateKey )
;
CREATE TABLE lineorder
(
 LO_OrderKey bigint not null,
 LO_LineNumber smallint not null,
 LO_CustKey int not null references customer(C_CustomerKey),
 LO_PartKey int not null references part(P_PartKey),
 LO_SuppKey int not null references supplier(S_SuppKey),
 LO_OrderDateKey int not null references dim_date(D_DateKey),
 LO_OrderPriority varchar(15),
 LO_ShipPriority char(1),
 LO_Quantity smallint,
 LO_ExtendedPrice decimal,
 LO_OrdTotalPrice decimal,
 LO_Discount decimal,
 LO_Revenue decimal,
 LO_SupplyCost decimal,
 LO_Tax smallint,
 LO_CommitDateKey int not null references dim_date(D_DateKey),
 LO_ShipMode varchar(10)
)
 distkey( LO_OrderDateKey )
 sortkey( LO_OrderDateKey )
;

ICE Schema

CREATE TABLE IF NOT EXISTS customer
(
 C_CustomerKey int ,
 C_Name varchar(25),
 C_Address varchar(25),
 C_City varchar(10),
 C_Nation varchar(15),
 C_Region varchar(12),
 C_Phone varchar(15),
 C_MktSegment varchar(10)
);

CREATE TABLE IF NOT EXISTS part
(
 P_PartKey int ,
 P_Name varchar(25),
 P_MFGR varchar(10),
 P_Category varchar(10),
 P_Brand varchar(15),
 P_Colour varchar(15),
 P_Type varchar(25),
 P_Size tinyint,
 P_Container char(10)
);

CREATE TABLE supplier
(
 S_SuppKey int ,
 S_Name char(25),
 S_Address varchar(25),
 S_City char(10),
 S_Nation char(15),
 S_Region char(12),
 S_Phone char(15)
);

CREATE TABLE IF NOT EXISTS dim_date
(
 D_DateKey int ,
 D_Date char(18),
 D_DayOfWeek char(9),
 D_Month char(9),
 D_Year smallint,
 D_YearMonthNum int,
 D_YearMonth char(7),
 D_DayNumInWeek tinyint,
 D_DayNumInMonth tinyint,
 D_DayNumInYear smallint,
 D_MonthNumInYear tinyint,
 D_WeekNumInYear tinyint,
 D_SellingSeason char(12),
 D_LastDayInWeekFl tinyint,
 D_LastDayInMonthFl tinyint,
 D_HolidayFl tinyint,
 D_WeekDayFl tinyint
);

CREATE TABLE IF NOT EXISTS lineorder
(
 LO_OrderKey bigint not null,
 LO_LineNumber tinyint not null,
 LO_CustKey int not null,
 LO_PartKey int not null,
 LO_SuppKey int not null,
 LO_OrderDateKey int not null,
 LO_OrderPriority varchar(15),
 LO_ShipPriority char(1),
 LO_Quantity tinyint,
 LO_ExtendedPrice decimal,
 LO_OrdTotalPrice decimal,
 LO_Discount decimal,
 LO_Revenue decimal,
 LO_SupplyCost decimal,
 LO_Tax tinyint,
 LO_CommitDateKey int not null,
 LO_ShipMode varchar(10)
);

Queries:

-- Q1.1
 select sum(lo_extendedprice*lo_discount) as
 revenue
 from lineorder join dim_date on lo_orderdatekey = d_datekey
 where
 d_year = 1993
 and lo_discount between 1 and 3
 and lo_quantity < 25;
-- Q1.2
 select sum(lo_extendedprice*lo_discount) as revenue
 from lineorder
 join dim_date on lo_orderdatekey = d_datekey
 where d_yearmonth = 'Jan1994' and lo_discount
 between 4 and 6 and lo_quantity between 26 and 35;
-- Q1.3
 select sum(lo_extendedprice*lo_discount) as revenue
 from lineorder
 join dim_date on lo_orderdatekey = d_datekey
 where d_weeknuminyear = 6
 and d_year = 1994
 and lo_discount between 5 and 7 and lo_quantity between 26 and 35;
-- Q2.1
 select sum(lo_revenue), d_year, p_brand
 from lineorder
 join dim_date
 on lo_orderdatekey = d_datekey
 join part
 on lo_partkey = p_partkey join supplier
 on lo_suppkey = s_suppkey
 where p_category = 'MFGR#12'
 and s_region = 'AMERICA'
 group by d_year, p_brand
 order by d_year, p_brand;
-- Q2.2
 select sum(lo_revenue), d_year, p_brand
 from lineorder
 join dim_date
 on lo_orderdatekey = d_datekey
 join part
 on lo_partkey = p_partkey
 join supplier
 on lo_suppkey = s_suppkey
 where p_brand between 'MFGR#2221' and 'MFGR#2228'
 and s_region = 'ASIA'
 group by d_year, p_brand
 order by d_year, p_brand;
-- Q2.3
 select sum(lo_revenue), d_year, p_brand
 from lineorder
 join dim_date
 on lo_orderdatekey = d_datekey
 join part
 on lo_partkey = p_partkey
 join supplier
 on lo_suppkey = s_suppkey
 where p_brand= 'MFGR#2239'
 and s_region = 'EUROPE'
 group by d_year, p_brand
 order by d_year, p_brand;
-- Q3.1
 select c_nation, s_nation, d_year, sum(lo_revenue) as revenue
 from customer
 join lineorder
 on lo_custkey = c_customerkey
 join supplier
 on lo_suppkey = s_suppkey
 join dim_date on lo_orderdatekey = d_datekey
 where c_region = 'ASIA'
 and s_region = 'ASIA'
 and d_year >= 1992 and d_year <= 1997
 group by c_nation, s_nation, d_year
 order by d_year asc, revenue desc;
-- Q3.2
 select c_city, s_city, d_year, sum(lo_revenue) as revenue
 from customer
 join lineorder
 on lo_custkey = c_customerkey
 join supplier
 on lo_suppkey = s_suppkey
 join dim_date
 on lo_orderdatekey = d_datekey
 where c_nation = 'UNITED STATES'
 and s_nation = 'UNITED STATES'
 and d_year >= 1992
 and d_year <= 1997
 group by c_city, s_city, d_year
 order by d_year asc, revenue desc;
-- Q3.3
 select c_city, s_city, d_year, sum(lo_revenue) as revenue
 from customer
 join lineorder
 on lo_custkey = c_customerkey
 join supplier on lo_suppkey = s_suppkey
 join dim_date on lo_orderdatekey = d_datekey
 where (c_city='UNITED KI1' or c_city='UNITED KI5')
 and (s_city='UNITED KI1' or s_city='UNITED KI5')
 and d_year >= 1992
 and d_year <= 1997
 group by c_city, s_city, d_year
 order by d_year asc, revenue desc;
-- Q3.4
 select c_city, s_city, d_year, sum(lo_revenue)
 as revenue
 from customer
 join lineorder
 on lo_custkey = c_customerkey
 join supplier
 on lo_suppkey = s_suppkey
 join dim_date
 on lo_orderdatekey = d_datekey
 where
 (c_city='UNITED KI1' or c_city='UNITED KI5')
 and (s_city='UNITED KI1' or s_city='UNITED KI5')
 and d_yearmonth = 'Dec1997'
 group by c_city, s_city, d_year
 order by d_year asc, revenue desc;
-- Q4.1
 select d_year, c_nation,
 sum(lo_revenue - lo_supplycost) as profit
 from lineorder
 join dim_date
 on lo_orderdatekey = d_datekey
 join customer
 on lo_custkey = c_customerkey
 join supplier
 on lo_suppkey = s_suppkey
 join part
 on lo_partkey = p_partkey
 where
 c_region = 'AMERICA'
 and s_region = 'AMERICA'
 and (p_mfgr = 'MFGR#1'
 or p_mfgr = 'MFGR#2')
 group by d_year, c_nation
 order by d_year, c_nation;
-- Q4.2
 select d_year, s_nation, p_category,
 sum(lo_revenue - lo_supplycost) as profit
 from lineorder
 join dim_date
 on lo_orderdatekey = d_datekey
 join customer
 on lo_custkey = c_customerkey
 join supplier
 on lo_suppkey = s_suppkey
 join part
 on lo_partkey = p_partkey
 where
 c_region = 'AMERICA'
 and s_region = 'AMERICA'
 and (d_year = 1997 or d_year = 1998)
 and (p_mfgr = 'MFGR#1'
 or p_mfgr = 'MFGR#2')
 group by d_year, s_nation, p_category
 order by d_year, s_nation, p_category;
-- Q4.3
 select d_year, s_city, p_brand,
 sum(lo_revenue - lo_supplycost) as profit
 from lineorder
 join dim_date
 on lo_orderdatekey = d_datekey
 join customer
 on lo_custkey = c_customerkey
 join supplier
 on lo_suppkey = s_suppkey
 join part
 on lo_partkey = p_partkey
 where
 s_nation = 'UNITED STATES'
 and (d_year = 1997 or d_year = 1998)
 and p_category = 'MFGR#14'
 group by d_year, s_city, p_brand
 order by d_year, s_city, p_brand;

Tips for working with append-only databases using sharding and log structured tables

This post is structured like a series of questions and answers in a conversation.  I recently had a number of conversations that all pretty much went this same way.  If you, like others, have many basic questions about how to proceed when faced with an append-only store for the first time, well then hopefully this post will help provide some answers for you.  The post focuses on column stores, the most common append-only store, but there are others.

Why do I want to use a column store?

Column stores are optimal for OLAP analysis

Column stores offer substantial performance increases for OLAP  compared to row stores.  Row stores are optimized for OLTP workloads.  While a row store can be used for OLAP, it may not perform well because a row store has to retrieve every column for a row (unless there is a covering index).  This is one of the reason’s that I’ve said that covering index allows you to approximate the performance of a column store on a row store, when the table has a large number of columns.

You can find an introduction to column store technology here, including basic performance comparisons:

http://www.slideshare.net/MySQLGeek/intro-to-column-stores

… but they have their downside

Many open source column stores are append only

When I start talking about column stores I start with the upsides regarding IO reduction, the  high compression and then bring this up this bombshell pretty late in the conversation.

This is what people look like when I tell them that ICE and Fastbit (two open source column stores with good on-disk performance) are append-only:

Simon Cowel with a thumbs down gesture

Please don’t let this turn you off.  Change your perspective.

I understand that if you have worked with a traditional RDBMS for a long time then being told that you can only insert data into a table is a pretty big shock.  There are, however many good reasons to use a column store, and workarounds exist which allow you to take advantage of the upsides while also minimizing downsides.  The append-only nature is just a trade-off.  Just like a column store arranges data completely differently on disk, you may re-arrange your thinking slightly about using an append-only database once you understand the workarounds.

Above all, be pragmatic.  Understand that there is no perfect technology for every situation.  Column stores highly optimize for read-only access at the cost of the loss of the ability to perform updates.  Also keep in mind that these column stores are not the only read-only data stores.  Map/Reduce with HDFS can’t really update data either.  If you want to update a file, you drop the file and upload a new one in it’s place.

One of the first questions is usually “How do we delete old data?”

Delete with the DROP command

Like HDFS, the only way to delete data in an append-only store is to DROP an object.  That could be as small as a partition (if the column store supports partitioning data), or a table or even an entire database.

To facilitate DROPing data use multiple databases and/or multiple tables

If you need to periodically prune data you can split your tables up into multiple databases.   Drop databases which contain data that you no longer wish to retain.

For example, you can segment your data into three schemata by year: year_2011, year_2012, year_2013

When you want to get rid of the year 2011 data:

DROP DATABASE year_2001;

Q: “That is great, but won’t splitting data up like that increases query complexity for OLAP queries?”

It may. When accessing a single year, queries are easy to write, but when accessing more than one year it can be more difficult.

When you think about it, however, placing the data into different databases is sharding.

This is where Shard-Query comes in.  It does the complex SQL work for your because it understands how to aggregate queries over multiple shards.  In addition, because Shard-Query is massively parallel processing it can access each shard (a year in this case) in parallel.  For increased parallelism you could create one database per month.  Of course, since each schema is a shard, you could place each month or year on different machines to achieve as much parallelism as is desired.

You can control data placement for optimal performance

Obtain the optimal degree of parallel processing so that queries on a small amount of data (a day or month) are fast enough without parallelism, and that queries on any larger quantity achieve parallelism because data is spread evenly over all the machines.  Queries that examine consecutive months should not examine data on the same machine.

Q: “What about updates?  How do I correct a mistake if I make one?”

You really can’t update an append-only table.  You have to think out of the box.

Method 1 – Get rid of “in-place” updates the easy way (copy/modify/swap)

Instead of updating the data in place, create a copy of the table (using a different storage method like MyISAM) and then modify the copy.  When you are finished, swap the old table with the new version of the table.  This is often the best course of action for smaller tables such as lookup tables and dimension tables in a star schema.  You probably can’t use this method for big tables such as fact tables.  You can technically do it, but you must orchestrate a complex ETL process to do so.

Keep in mind that you probably won’t have a lot of small tables if you have big data.  Infobright (ICE and IEE) engines really prefer one big table for your data instead of a normalized schema with lookup tables.  Because these databases doesn’t support indexes, it is necessary to perform hash joins or sort/merge joins in order to join tables together.  Fastbit doesn’t support joins at all – you must implement hash joins in your application.  If you do need to perform joins, keep the dimension tables short so that at least one side of the join fits in memory.

There is some added complexity with this method and ICE.  Infobright Community Edition (ICE) doesn’t support converting tables to their engine via ALTER TABLE.  Instead, with ICE, export the contents of the new table with SELECT INTO OUTFILE, and load a new version of the table with LOAD DATA INFILE.

Method 2 – Use Log structured tables and track changes via insertions

You can’t rebuild a 50 billion row table because you want to update a single row because someone misplaced a zero in a transaction at the bank.  If you think about what I just said, however, you just might have a long accepted answer to the problem at hand.

Your bank carries a ledger against your account, which is just a log when you think about it

When the bank records an error, they don’t go back and update the original transaction to correct it, instead they issue an “opposite” transaction to undo the error.  For example, if the clerk accidentally deposits 1000 dollars instead of 100 dollars, the following series of transactions may happen:

      1. 1000 counter deposit
      2. 1000 counter debit
      3. 100 counter deposit

Generalizing the concept

The log/ledger concept can be generalized to cover any sort of table if you add some extra information to the table.  Instead of  “debit” and “deposit” think of  “insertion” and “delete mark”.   In addition to recording the operation type, you also need to include information necessary for establishing the sequence for the operations on the table.

Log Tablelog table

In general, you must include

      1. A transaction number (which always goes up)
      2. A sequence number (this identifies the order of operations in the transaction)
      3. The operation type (this indicates the type of operation).  I usually call this column dml_type.

I could leave it up to you as to decide why those items are important, but here are some hints:

      1. The transaction number is useful for keeping summary tables synced up at the exact same point for multiple tables.
      2. The sequence number is necessary to find the newest version of a row, and to remove deleted rows from resultsets when the newest row is “delete marked”
      3. Rows with a “delete mark” will be have to be handled specially in aggregates too.
      4. An update can be recorded as a “delete mark” followed by an insertion with new values.

Here is an example table without encoding:

The table records a simple set of metrics from a set of sensor devices.  These sensors are very simple and only record two metrics, but many devices can report hundreds of metrics (like a MySQL server).  This table doesn’t currently have a column to record the operation type.

CREATE TABLE metrics (
  device_id bigint,
  probe_dtime datetime,
  power_level tinyint,
  temperature decimal(4,2)
);

The problem:

If “something bad” happens and wrong values or duplicate rows are inserted into this table, you would have to rebuild the whole table to get the bad values out were you using an append-only data store.  This is not unrealistic.  Rows could easily end up duplicated if there is some sort of issue and a file is loaded twice, for example.  This can happen because these open source column stores do not support primary keys.

Use a log structured table:

CREATE TABLE metrics (
  trx_id bigint,
  trx_seq int,
  dml_type tinytint, 
  device_id bigint,
  probe_dtime datetime,
  power_level tinyint,
  temperature decimal(4,2)
);

Handling deleted and updated rows

Assume that (device_id,probe_dtime) can be considered the PRIMARY KEY on the table.  This is logically the case, even though physically there may be rows with duplicated PK attributes due to multiple versions of rows being stored in the table simultaneously.  This happens when one or more versions are “delete marked”.

Showing only visible rows

SELECT device_id, 
       probe_dtime 
  FROM metrics 
 WHERE probe_dtime = '2013-04-30 12:34:56' 
   AND trx_seq = (select max(trx_seq) 
                    from metrics 
                   where device_id = metrics.device_id 
                     and probe_dtime = metrics.probe_dtime 
                  having sum(dml_type) > 0)
 ORDER BY device_id desc, 
          trx_id desc, 
          trx_seq desc;

Counting the rows in the table (or any group)

SELECT SUM(dml_type) as logical_count, 
       count(*) as physical_log_entry_count
  FROM metrics
SELECT probe_dtime, 
       SUM(dml_type) as logical_count, 
       count(*) as physical_log_entry_count
  FROM metrics
 WHERE probe_dtime between '2012-12-01' and '2012-12-31'
   AND device_id = 247
 GROUP BY probe_dtime;

Handling aggregating metrics with resultsets containing “delete marked” rows.  AVG() is more complicated.

SELECT max(probe_dtime), 
       max(dml_type * power_level), 
       min(dml_type * power_level), 
       sum(dml_type * power_level)/sum(dml_type * (power_level is null)) as avg_power_level
  FROM metrics
 WHERE device_id = 247
   AND probe_dtime between '2012-12-01' and '2012-12-31'
HAVING sum(dml_count) > 0;

Slides from Percona MySQL University Portland – Conquering “Big Data”: An introduction to Shard-Query

I posted the slides to my talk on SlideShare.  This talk includes high level information about Shard-Query, why it is needed, and the kind of schema it works well with.
http://www.slideshare.net/MySQLGeek/introduction-to-shard-query-m

You can also find a more technical information about Shard-Query and the how it works here:

Bulk insert into tables in sorted order to avoid deadlocks

Shard-Query inserts data into a “coordinator” table when answering queries.   When there is a GROUP BY on the original query, the coordinator table contains a UNIQUE KEY over the GROUP BY attributes.   Shard-Query uses INSERT .. ON DUPLICATE KEY UPDATE in combination with bulk insert (insert into … values (),(),() ) when inserting into the table.

For what would normally be efficiency sake, Shard-Query sends queries to the shards using ORDER BY NULL which disables the filesort operation. Of course, this often results in the rows being sent back from the shards in random order.

Because the results are in random order, the bulk insertion that the worker does into the coordinator table can deadlock with other worker threads when using InnoDB or TokuDB as the coordinator table. Right now I’ve just been using MyISAM for the coordinator table, which serializes queries at the bulk insert stage.  Having to insert the rows more than once due to the deadlock is not efficient.

I am going to test removing ORDER BY NULL which will distribute the sort.  The deadlocks should go away and concurrency should go up. I’ll have to test it out and if it improves performance it will be included in beta 2 of Shard-Query 2.0.

Follow

Get every new post delivered to your Inbox.

Join 162 other followers