Shard-Query blog

The only open source MPP database engine for MySQL

Tag Archives: sharding

Shard-Query loader gets a facelift and now Amazon S3 support too

Shard-Query (source) now supports the MySQL “LOAD DATA INFILE” command.

When you use LOAD DATA LOCAL INFILE a single threaded load from the current process will be performed.  You can specify a path to a file anywhere readable by the PHP script.  This allows loading without using the Gearman workers and without using a shared filesystem.

If you do not specify LOCAL, then the Gearman based loader is used.  You must not specify a path to the file when you omit the LOCAL keyword.  This is because the shared path will the pre-pended to the filename automatically.  The shared path must be a shared or network filesystem (NFS,CIFS,etc) and the files to be loaded must be placed on the shared filesystem for the Gearman based loader to work.  This is because workers may run on multiple nodes and all workers have to be able to read from the files to be loaded.

S3 is supported as a source of data

Instead of using a shared filesystem, S3 is now supported too.  You must specify an AWS access key and secret key when setting up Shard-Query.  After those are set up, simply use LOAD DATA INFILE ‘s3://bucket/filename‘ to load from an S3 bucket using Gearman workers.  The file will be split up into smaller chunks efficiently and automatically, and each 16MB chunk will be loaded individually.

If you use LOAD DATA LOCAL INFILE ‘s3://bucket/filename’ then Gearman will not be used and the file will be loaded from the local process instead.

Important: When the Gearman loader is used (recommended) the S3 load will be split over the workers, each worker loading a 16MB chunk of the file in parallel.

New Shard-Query features checked into SVN

I checked some updates to Shard-Query into SVN.

Partitioning support has been extended for MySQL 5.6+ to ALL partitioning types.

This includes all previously unsupported types including RANGE LIST/COLUMNS partitioned tables that are partitioned over more than one column, and HASH/KEY/LINEAR variants as well. Shard-Query now exclusively uses the PARTITION hint for partition elimination instead of WHERE clauses in MySQL 5.6. For 5.5 and previous versions, support remains limited to LIST,RANGE, and LIST/RANGE COLUMNS over a single column.

The old mysql interface DAL has been replaced completely by the PDO DAL.

There is no major difference for end users except that you have to check that the return of the query() method is an object with the is_object() function instead of checking that it is a resource with the is_resource() function. I updated bin/run_query, which is the example application.

I made a few bug fixes to the PDO DAL as well.  Thanks again to Alex Hurd for contributing the PDO DAL.

Tips for working with append-only databases using sharding and log structured tables

This post is structured like a series of questions and answers in a conversation.  I recently had a number of conversations that all pretty much went this same way.  If you, like others, have many basic questions about how to proceed when faced with an append-only store for the first time, well then hopefully this post will help provide some answers for you.  The post focuses on column stores, the most common append-only store, but there are others.

Why do I want to use a column store?

Column stores are optimal for OLAP analysis

Column stores offer substantial performance increases for OLAP  compared to row stores.  Row stores are optimized for OLTP workloads.  While a row store can be used for OLAP, it may not perform well because a row store has to retrieve every column for a row (unless there is a covering index).  This is one of the reason’s that I’ve said that covering index allows you to approximate the performance of a column store on a row store, when the table has a large number of columns.

You can find an introduction to column store technology here, including basic performance comparisons:

… but they have their downside

Many open source column stores are append only

When I start talking about column stores I start with the upsides regarding IO reduction, the  high compression and then bring this up this bombshell pretty late in the conversation.

This is what people look like when I tell them that ICE and Fastbit (two open source column stores with good on-disk performance) are append-only:

Simon Cowel with a thumbs down gesture

Please don’t let this turn you off.  Change your perspective.

I understand that if you have worked with a traditional RDBMS for a long time then being told that you can only insert data into a table is a pretty big shock.  There are, however many good reasons to use a column store, and workarounds exist which allow you to take advantage of the upsides while also minimizing downsides.  The append-only nature is just a trade-off.  Just like a column store arranges data completely differently on disk, you may re-arrange your thinking slightly about using an append-only database once you understand the workarounds.

Above all, be pragmatic.  Understand that there is no perfect technology for every situation.  Column stores highly optimize for read-only access at the cost of the loss of the ability to perform updates.  Also keep in mind that these column stores are not the only read-only data stores.  Map/Reduce with HDFS can’t really update data either.  If you want to update a file, you drop the file and upload a new one in it’s place.

One of the first questions is usually “How do we delete old data?”

Delete with the DROP command

Like HDFS, the only way to delete data in an append-only store is to DROP an object.  That could be as small as a partition (if the column store supports partitioning data), or a table or even an entire database.

To facilitate DROPing data use multiple databases and/or multiple tables

If you need to periodically prune data you can split your tables up into multiple databases.   Drop databases which contain data that you no longer wish to retain.

For example, you can segment your data into three schemata by year: year_2011, year_2012, year_2013

When you want to get rid of the year 2011 data:

DROP DATABASE year_2001;

Q: “That is great, but won’t splitting data up like that increases query complexity for OLAP queries?”

It may. When accessing a single year, queries are easy to write, but when accessing more than one year it can be more difficult.

When you think about it, however, placing the data into different databases is sharding.

This is where Shard-Query comes in.  It does the complex SQL work for your because it understands how to aggregate queries over multiple shards.  In addition, because Shard-Query is massively parallel processing it can access each shard (a year in this case) in parallel.  For increased parallelism you could create one database per month.  Of course, since each schema is a shard, you could place each month or year on different machines to achieve as much parallelism as is desired.

You can control data placement for optimal performance

Obtain the optimal degree of parallel processing so that queries on a small amount of data (a day or month) are fast enough without parallelism, and that queries on any larger quantity achieve parallelism because data is spread evenly over all the machines.  Queries that examine consecutive months should not examine data on the same machine.

Q: “What about updates?  How do I correct a mistake if I make one?”

You really can’t update an append-only table.  You have to think out of the box.

Method 1 – Get rid of “in-place” updates the easy way (copy/modify/swap)

Instead of updating the data in place, create a copy of the table (using a different storage method like MyISAM) and then modify the copy.  When you are finished, swap the old table with the new version of the table.  This is often the best course of action for smaller tables such as lookup tables and dimension tables in a star schema.  You probably can’t use this method for big tables such as fact tables.  You can technically do it, but you must orchestrate a complex ETL process to do so.

Keep in mind that you probably won’t have a lot of small tables if you have big data.  Infobright (ICE and IEE) engines really prefer one big table for your data instead of a normalized schema with lookup tables.  Because these databases doesn’t support indexes, it is necessary to perform hash joins or sort/merge joins in order to join tables together.  Fastbit doesn’t support joins at all – you must implement hash joins in your application.  If you do need to perform joins, keep the dimension tables short so that at least one side of the join fits in memory.

There is some added complexity with this method and ICE.  Infobright Community Edition (ICE) doesn’t support converting tables to their engine via ALTER TABLE.  Instead, with ICE, export the contents of the new table with SELECT INTO OUTFILE, and load a new version of the table with LOAD DATA INFILE.

Method 2 – Use Log structured tables and track changes via insertions

You can’t rebuild a 50 billion row table because you want to update a single row because someone misplaced a zero in a transaction at the bank.  If you think about what I just said, however, you just might have a long accepted answer to the problem at hand.

Your bank carries a ledger against your account, which is just a log when you think about it

When the bank records an error, they don’t go back and update the original transaction to correct it, instead they issue an “opposite” transaction to undo the error.  For example, if the clerk accidentally deposits 1000 dollars instead of 100 dollars, the following series of transactions may happen:

      1. 1000 counter deposit
      2. 1000 counter debit
      3. 100 counter deposit

Generalizing the concept

The log/ledger concept can be generalized to cover any sort of table if you add some extra information to the table.  Instead of  “debit” and “deposit” think of  “insertion” and “delete mark”.   In addition to recording the operation type, you also need to include information necessary for establishing the sequence for the operations on the table.

Log Tablelog table

In general, you must include

      1. A transaction number (which always goes up)
      2. A sequence number (this identifies the order of operations in the transaction)
      3. The operation type (this indicates the type of operation).  I usually call this column dml_type.

I could leave it up to you as to decide why those items are important, but here are some hints:

      1. The transaction number is useful for keeping summary tables synced up at the exact same point for multiple tables.
      2. The sequence number is necessary to find the newest version of a row, and to remove deleted rows from resultsets when the newest row is “delete marked”
      3. Rows with a “delete mark” will be have to be handled specially in aggregates too.
      4. An update can be recorded as a “delete mark” followed by an insertion with new values.

Here is an example table without encoding:

The table records a simple set of metrics from a set of sensor devices.  These sensors are very simple and only record two metrics, but many devices can report hundreds of metrics (like a MySQL server).  This table doesn’t currently have a column to record the operation type.

CREATE TABLE metrics (
  device_id bigint,
  probe_dtime datetime,
  power_level tinyint,
  temperature decimal(4,2)

The problem:

If “something bad” happens and wrong values or duplicate rows are inserted into this table, you would have to rebuild the whole table to get the bad values out were you using an append-only data store.  This is not unrealistic.  Rows could easily end up duplicated if there is some sort of issue and a file is loaded twice, for example.  This can happen because these open source column stores do not support primary keys.

Use a log structured table:

CREATE TABLE metrics (
  trx_id bigint,
  trx_seq int,
  dml_type tinytint, 
  device_id bigint,
  probe_dtime datetime,
  power_level tinyint,
  temperature decimal(4,2)

Handling deleted and updated rows

Assume that (device_id,probe_dtime) can be considered the PRIMARY KEY on the table.  This is logically the case, even though physically there may be rows with duplicated PK attributes due to multiple versions of rows being stored in the table simultaneously.  This happens when one or more versions are “delete marked”.

Showing only visible rows

SELECT device_id, 
  FROM metrics 
 WHERE probe_dtime = '2013-04-30 12:34:56' 
   AND trx_seq = (select max(trx_seq) 
                    from metrics 
                   where device_id = metrics.device_id 
                     and probe_dtime = metrics.probe_dtime 
                  having sum(dml_type) > 0)
 ORDER BY device_id desc, 
          trx_id desc, 
          trx_seq desc;

Counting the rows in the table (or any group)

SELECT SUM(dml_type) as logical_count, 
       count(*) as physical_log_entry_count
  FROM metrics
SELECT probe_dtime, 
       SUM(dml_type) as logical_count, 
       count(*) as physical_log_entry_count
  FROM metrics
 WHERE probe_dtime between '2012-12-01' and '2012-12-31'
   AND device_id = 247
 GROUP BY probe_dtime;

Handling aggregating metrics with resultsets containing “delete marked” rows.  AVG() is more complicated.

SELECT max(probe_dtime), 
       max(dml_type * power_level), 
       min(dml_type * power_level), 
       sum(dml_type * power_level)/sum(dml_type * (power_level is null)) as avg_power_level
  FROM metrics
 WHERE device_id = 247
   AND probe_dtime between '2012-12-01' and '2012-12-31'
HAVING sum(dml_count) > 0;