Shard-Query blog

The only open source MPP database engine for MySQL

Tips for working with append-only databases using sharding and log structured tables

This post is structured like a series of questions and answers in a conversation.  I recently had a number of conversations that all pretty much went this same way.  If you, like others, have many basic questions about how to proceed when faced with an append-only store for the first time, well then hopefully this post will help provide some answers for you.  The post focuses on column stores, the most common append-only store, but there are others.

Why do I want to use a column store?

Column stores are optimal for OLAP analysis

Column stores offer substantial performance increases for OLAP  compared to row stores.  Row stores are optimized for OLTP workloads.  While a row store can be used for OLAP, it may not perform well because a row store has to retrieve every column for a row (unless there is a covering index).  This is one of the reason’s that I’ve said that covering index allows you to approximate the performance of a column store on a row store, when the table has a large number of columns.

You can find an introduction to column store technology here, including basic performance comparisons:

… but they have their downside

Many open source column stores are append only

When I start talking about column stores I start with the upsides regarding IO reduction, the  high compression and then bring this up this bombshell pretty late in the conversation.

This is what people look like when I tell them that ICE and Fastbit (two open source column stores with good on-disk performance) are append-only:

Simon Cowel with a thumbs down gesture

Please don’t let this turn you off.  Change your perspective.

I understand that if you have worked with a traditional RDBMS for a long time then being told that you can only insert data into a table is a pretty big shock.  There are, however many good reasons to use a column store, and workarounds exist which allow you to take advantage of the upsides while also minimizing downsides.  The append-only nature is just a trade-off.  Just like a column store arranges data completely differently on disk, you may re-arrange your thinking slightly about using an append-only database once you understand the workarounds.

Above all, be pragmatic.  Understand that there is no perfect technology for every situation.  Column stores highly optimize for read-only access at the cost of the loss of the ability to perform updates.  Also keep in mind that these column stores are not the only read-only data stores.  Map/Reduce with HDFS can’t really update data either.  If you want to update a file, you drop the file and upload a new one in it’s place.

One of the first questions is usually “How do we delete old data?”

Delete with the DROP command

Like HDFS, the only way to delete data in an append-only store is to DROP an object.  That could be as small as a partition (if the column store supports partitioning data), or a table or even an entire database.

To facilitate DROPing data use multiple databases and/or multiple tables

If you need to periodically prune data you can split your tables up into multiple databases.   Drop databases which contain data that you no longer wish to retain.

For example, you can segment your data into three schemata by year: year_2011, year_2012, year_2013

When you want to get rid of the year 2011 data:

DROP DATABASE year_2001;

Q: “That is great, but won’t splitting data up like that increases query complexity for OLAP queries?”

It may. When accessing a single year, queries are easy to write, but when accessing more than one year it can be more difficult.

When you think about it, however, placing the data into different databases is sharding.

This is where Shard-Query comes in.  It does the complex SQL work for your because it understands how to aggregate queries over multiple shards.  In addition, because Shard-Query is massively parallel processing it can access each shard (a year in this case) in parallel.  For increased parallelism you could create one database per month.  Of course, since each schema is a shard, you could place each month or year on different machines to achieve as much parallelism as is desired.

You can control data placement for optimal performance

Obtain the optimal degree of parallel processing so that queries on a small amount of data (a day or month) are fast enough without parallelism, and that queries on any larger quantity achieve parallelism because data is spread evenly over all the machines.  Queries that examine consecutive months should not examine data on the same machine.

Q: “What about updates?  How do I correct a mistake if I make one?”

You really can’t update an append-only table.  You have to think out of the box.

Method 1 – Get rid of “in-place” updates the easy way (copy/modify/swap)

Instead of updating the data in place, create a copy of the table (using a different storage method like MyISAM) and then modify the copy.  When you are finished, swap the old table with the new version of the table.  This is often the best course of action for smaller tables such as lookup tables and dimension tables in a star schema.  You probably can’t use this method for big tables such as fact tables.  You can technically do it, but you must orchestrate a complex ETL process to do so.

Keep in mind that you probably won’t have a lot of small tables if you have big data.  Infobright (ICE and IEE) engines really prefer one big table for your data instead of a normalized schema with lookup tables.  Because these databases doesn’t support indexes, it is necessary to perform hash joins or sort/merge joins in order to join tables together.  Fastbit doesn’t support joins at all – you must implement hash joins in your application.  If you do need to perform joins, keep the dimension tables short so that at least one side of the join fits in memory.

There is some added complexity with this method and ICE.  Infobright Community Edition (ICE) doesn’t support converting tables to their engine via ALTER TABLE.  Instead, with ICE, export the contents of the new table with SELECT INTO OUTFILE, and load a new version of the table with LOAD DATA INFILE.

Method 2 – Use Log structured tables and track changes via insertions

You can’t rebuild a 50 billion row table because you want to update a single row because someone misplaced a zero in a transaction at the bank.  If you think about what I just said, however, you just might have a long accepted answer to the problem at hand.

Your bank carries a ledger against your account, which is just a log when you think about it

When the bank records an error, they don’t go back and update the original transaction to correct it, instead they issue an “opposite” transaction to undo the error.  For example, if the clerk accidentally deposits 1000 dollars instead of 100 dollars, the following series of transactions may happen:

      1. 1000 counter deposit
      2. 1000 counter debit
      3. 100 counter deposit

Generalizing the concept

The log/ledger concept can be generalized to cover any sort of table if you add some extra information to the table.  Instead of  “debit” and “deposit” think of  “insertion” and “delete mark”.   In addition to recording the operation type, you also need to include information necessary for establishing the sequence for the operations on the table.

Log Tablelog table

In general, you must include

      1. A transaction number (which always goes up)
      2. A sequence number (this identifies the order of operations in the transaction)
      3. The operation type (this indicates the type of operation).  I usually call this column dml_type.

I could leave it up to you as to decide why those items are important, but here are some hints:

      1. The transaction number is useful for keeping summary tables synced up at the exact same point for multiple tables.
      2. The sequence number is necessary to find the newest version of a row, and to remove deleted rows from resultsets when the newest row is “delete marked”
      3. Rows with a “delete mark” will be have to be handled specially in aggregates too.
      4. An update can be recorded as a “delete mark” followed by an insertion with new values.

Here is an example table without encoding:

The table records a simple set of metrics from a set of sensor devices.  These sensors are very simple and only record two metrics, but many devices can report hundreds of metrics (like a MySQL server).  This table doesn’t currently have a column to record the operation type.

CREATE TABLE metrics (
  device_id bigint,
  probe_dtime datetime,
  power_level tinyint,
  temperature decimal(4,2)

The problem:

If “something bad” happens and wrong values or duplicate rows are inserted into this table, you would have to rebuild the whole table to get the bad values out were you using an append-only data store.  This is not unrealistic.  Rows could easily end up duplicated if there is some sort of issue and a file is loaded twice, for example.  This can happen because these open source column stores do not support primary keys.

Use a log structured table:

CREATE TABLE metrics (
  trx_id bigint,
  trx_seq int,
  dml_type tinytint, 
  device_id bigint,
  probe_dtime datetime,
  power_level tinyint,
  temperature decimal(4,2)

Handling deleted and updated rows

Assume that (device_id,probe_dtime) can be considered the PRIMARY KEY on the table.  This is logically the case, even though physically there may be rows with duplicated PK attributes due to multiple versions of rows being stored in the table simultaneously.  This happens when one or more versions are “delete marked”.

Showing only visible rows

SELECT device_id, 
  FROM metrics 
 WHERE probe_dtime = '2013-04-30 12:34:56' 
   AND trx_seq = (select max(trx_seq) 
                    from metrics 
                   where device_id = metrics.device_id 
                     and probe_dtime = metrics.probe_dtime 
                  having sum(dml_type) > 0)
 ORDER BY device_id desc, 
          trx_id desc, 
          trx_seq desc;

Counting the rows in the table (or any group)

SELECT SUM(dml_type) as logical_count, 
       count(*) as physical_log_entry_count
  FROM metrics
SELECT probe_dtime, 
       SUM(dml_type) as logical_count, 
       count(*) as physical_log_entry_count
  FROM metrics
 WHERE probe_dtime between '2012-12-01' and '2012-12-31'
   AND device_id = 247
 GROUP BY probe_dtime;

Handling aggregating metrics with resultsets containing “delete marked” rows.  AVG() is more complicated.

SELECT max(probe_dtime), 
       max(dml_type * power_level), 
       min(dml_type * power_level), 
       sum(dml_type * power_level)/sum(dml_type * (power_level is null)) as avg_power_level
  FROM metrics
 WHERE device_id = 247
   AND probe_dtime between '2012-12-01' and '2012-12-31'
HAVING sum(dml_count) > 0;

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: