Shard-Query blog

The only open source MPP database engine for MySQL

Category Archives: Hints and tips

SQL injection in the MySQL server (of the proxy kind!)

As work on WarpSQL (Shard-Query 3) progresses, it has outgrown MySQL proxy.  MySQL proxy is a very useful tool, but it requires LUA scripting, and it is an external daemon that needs to be maintained.  The MySQL proxy module for Shard-Query works well, but to make WarpSQL into a real distributed transaction coordinator, moving the proxy logic inside of the server makes more sense.

The main benefit of MySQL proxy is that it allows a script to “inject” queries between the client and server, intercepting the results and possibly sending back new results to the client.  I would like similar functionality, but inside of the server.

For example, I would like to implement new SHOW commands, and these commands do not need to be implemented as actual MySQL SHOW commands under the covers.

For example, for this blog post I made a new example command called “SHOW PASSWORD

Example “injection” which adds SHOW PASSWORD functionality to the server
mysql> select user();
+----------------+
| user()         |
+----------------+
| root@localhost |
+----------------+
1 row in set (0.00 sec)

-- THIS COMMAND DOES NOT EXIST
mysql> show password;
+-------------------------------------------+
| password_hash                             |
+-------------------------------------------+
| *00A51F3F48415C7D4E8908980D443C29C69B60C9 |
+-------------------------------------------+
1 row in set (0.00 sec)

Important – This isn’t a MySQL proxy plugin.  There is C++ code in the SERVER to answer that query, but it isn’t the normal SHOW command code.  This “plugin” (I put it in quotes because my plan is for a pluggable interface but it isn’t added to the server yet) doesn’t access the mysql.user table using normal internal access methods.  It runs actual SQL inside of the server, on the same THD as the client connection, in the same transaction as the client connection, to get the answer!

Problem #1 – Running SQL in the server

The MySQL C client API doesn’t have any methods for connecting to the server from inside of the server, except to connect to the normally available socket interfaces, authenticate, and then issue queries like a normal client.  While it is perfectly possible to connect to the server as a client in this manner, it is sub-optimal for a number of reasons.  First, it requires a second connection to the server, second, it requires that you authenticate again (which requires you have the user’s password), and lastly, any work done in the second connection is not party to transactional changes in the first, and vice-versa.

The problem is communication between the client and server, which uses a mechanism called VIO.  There was work done a long time ago for external stored procedures, which never made it into the main server that would have alleviated this problem by implementing a in-server VIO layer, and making the parser re-entrant.  That work was done on MySQL 5.1 though.

It is possible to run queries without using VIO though.  You simply can’t get results back, except to know if the query succeeded or not.  This means it is perfectly acceptable for any command that doesn’t need a resultset, basically anything other than SELECT.  There is a loophole however, in that any changes made to the THD stay made to that THD.  Thus, if the SQL executed sets any user variables, then those variables are of course visible after query execution.

Solution  – encapsulate arbitrary SQL resultsets through a user variable

Since user variables are visible after query execution, the goal is to get the complete results of a query into a user variable, so that the resultset can be accessed from the server.  To accomplish this, first a method to get the results into the variable must be established, and then some data format for communication that is amenable to that method has to be decided upon so that the resultset can be accessed conveniently..

With a little elbow grease MySQL can convert any SELECT statement into CSV resultset.  To do so, the following are used:

  1. SELECT … INTO @user_variable
  2. A subquery in the FROM clause (for the original query)
  3. CONCAT, REPLACE, IFNULL, GROUP_CONCAT (to encode the resultset data)
Here is the SQL that the SHOW PASSWORD command uses to get the correct password:
select authentication_string as pw,
       user 
  from mysql.user 
 where concat(user,'@',host) = USER() 
    or user = USER() 
LIMIT 1
Here is the “injected” SQL that the database generates to encapsulate the SQL resultset as CSV:
select 
  group_concat( 
    concat('"',
           IFNULL(REPLACE(REPLACE(`pw`,'"','\\"'),"\n","\\n"),"\N"),
           '"|"',
           IFNULL(REPLACE(REPLACE(`user`,'"','\\"'),"\n","\\n"),"\N"),
           '"'
    ) 
  separator "\n"
  ) 
from 
  ( select authentication_string as pw,
           user 
      from mysql.user 
      where concat(user,'@',host) = USER() 
        OR user = USER() 
    LIMIT 1
  ) the_query 
into @sql_resultset ;
Query OK, 1 row affected (0.00 sec)
Here is the actual encapsulated resultset.  If there were more than one row, they would be newline separated.
mysql> select @sql_resultset;
+----------------+
| @sql_resultset |
+----------------+
| ""|"root"      |
+----------------+
1 row in set (0.00 sec)

Injecting SQL in the server

With the ability to encapsulate resultsets into CSV in user variables, it is possible to create a cursor over the resultset data and access it in the server.  The MySQL 5.7 pre-parse rewrite plugins, however,  still run inside the parser.  The THD is not “clean” with respect to being able to run a second query.  The parser is not re-entrant.  Because I desire to run (perhaps many) queries between the time a user enters a query and the server actually answers the query (perhaps with a different query than the user entered!) the MySQL 5.7 pre-parse rewrite plugin infrastructure doesn’t work for me.

I modified the server, instead, so that there is a hook in do_command() for query injections.  I called it conveniently query_injection_point() and the goal is to make it a new plugin type, but I haven’t written that code yet.  Here is the current signature for query_injection_point():

bool query_injection_point(
  THD* thd, COM_DATA *com_data, enum enum_server_command command,
  COM_DATA* new_com_data, enum enum_server_command* new_command );

It has essentially the same signature as dispatch_command(), but it provides the ability to replace the command, or keep it as is.  It returns true when the command has been replaced.

Because it is not yet pluggable, here is the code that I placed in the injection point:

/* TODO: make this pluggable */
bool query_injection_point(THD* thd, COM_DATA *com_data, enum enum_server_command command,
 COM_DATA* new_com_data, enum enum_server_command* new_command)
{
 /* example rewrite rule for SHOW PASSWORD*/
 if(command != COM_QUERY)
 { return false; }
 
 /* convert query to upper case */
 std::locale loc;
 std::string old_query(com_data->com_query.query,com_data->com_query.length);
 for(unsigned int i=0;i<com_data->com_query.length;++i) {
   old_query[i] = std::toupper(old_query[i], loc);
 } 
   
 if(old_query == "SHOW PASSWORD")
 {
   std::string new_query;
   SQLClient conn(thd);
   SQLCursor* stmt;
   SQLRow* row;

   if(conn.query("pw,user",
    "select authentication_string as pw,user from mysql.user " \
    "where concat(user,'@',host) = USER() or user = USER() LIMIT 1", &stmt))
   {
     if(stmt != NULL)
     {
       if((row = stmt->next()))
       {
          new_query = "SELECT '" + row->at(0) + "' as password_hash";
       }
       } else
       {
         return false;
       }
     } else {
       return false;
     }

     /* replace the command sent to the server */
     if(new_query != "")
     {
       Protocol_classic *protocol= thd->get_protocol_classic();
       protocol->create_command(
         new_com_data, COM_QUERY, 
         (uchar *) strdup(new_query.c_str()), 
         new_query.length()
       );
       *new_command = COM_QUERY;
     } else {
       if(stmt) delete stmt;
       return false;
     }
     if(stmt) delete stmt;
     return true;
   }
 }

 /* don't replace command */
 return false;
}

SQLClient

You will notice that the code access the mysql.user table using SQL, using the SQLClient, SQLCursor, and SQLRow objects.  These are the objects that wrap around encapsulating the SQL into a CSV resultset, and actually accessing the result set.  The interface is very simple, as you can see from the example.  You create a SQLClient for a THD (one that is NOT running a query already!) and then you simply run queries and access the results.

The SQLClient uses a stored procedure to methodically encapsulate the SQL into CSV and then provides objects to access and iterate over the data that is buffered in the user variable.  Because MySQL 5.7 comes with the sys schema, I placed the stored procedure into it, as there is no other available default database that allows the creation of stored procedures.  I called it sys.sql_client().

Because the resultset is stored as text data, the SQLRow object returns all column values as std::string.

What’s next?

I need to add a proper plugin type for “SQL injection plugins”.  Then I need to work on a plugin for parallel queries.  Most of the work for that is already done, actually, at least to get it into an alpha quality state.  There is still quite a bit of work to be done though.

You can find the code in the internal_client branch of my fork of MySQL 5.7:

http://github.com/greenlion/warpsql-server

 

Access Shard-Query with the MySQL client without using MySQL proxy

One of the great features of Shard-Query is the ability to use MySQL proxy to access resultsets transparently. While this is a great tool, many people have expressed reservations about using MySQL Proxy, an alpha component in their production environment.

I recognize that this is a valid concern, and have implemented an alternate method of retrieving resultsets directly in the MySQL client, without using a proxy. This means that any node can easily act as the “head” node without any extra daemon, instead of having to run many proxies.

The sq_helper() routine has been checked into the git repository and is available now.

The function takes a few parameters:

  • sql to run
  • shard-query schema name (empty string or null for default schema)
  • schema to store temp table in
  • temp table name (where results are sent to)
  • return result (boolean, 1 returns result to client, 0 does not return the result)
  • drop table (boolean, 1 drops the table at the end of the procedure, 0 does not)
mysql> call shard_query.sq_helper("select * from dim_date limit 2", "", 'test','testtab',1,1)\G
*************************** 1. row ***************************
         D_DateKey: 19911231
            D_Date: December 31, 1991
       D_DayOfWeek: Wednesday
           D_Month: December
            D_Year: 1991
    D_YearMonthNum: 199112
       D_YearMonth: Dec1991
    D_DayNumInWeek: 4
   D_DayNumInMonth: 31
    D_DayNumInYear: 365
  D_MonthNumInYear: 12
   D_WeekNumInYear: 53
   D_SellingSeason: Christmas
 D_LastDayInWeekFl: 0
D_LastDayInMonthFl: 0
       D_HolidayFl: 0
       D_WeekDayFl: 1
*************************** 2. row ***************************
         D_DateKey: 19920101
            D_Date: January 1, 1992
       D_DayOfWeek: Thursday
           D_Month: January
            D_Year: 1992
    D_YearMonthNum: 199201
       D_YearMonth: Jan1992
    D_DayNumInWeek: 5
   D_DayNumInMonth: 1
    D_DayNumInYear: 1
  D_MonthNumInYear: 1
   D_WeekNumInYear: 1
   D_SellingSeason: Winter
 D_LastDayInWeekFl: 0
D_LastDayInMonthFl: 1
       D_HolidayFl: 1
       D_WeekDayFl: 1
2 rows in set (0.07 sec)

Important:
The only requirement is the Gearman UDF (which is also required by the proxy). Don’t forget to call gman_server_set(…) with your gearman server, or this function won’t work.

Shard-Query loader gets a facelift and now Amazon S3 support too

Shard-Query (source) now supports the MySQL “LOAD DATA INFILE” command.

When you use LOAD DATA LOCAL INFILE a single threaded load from the current process will be performed.  You can specify a path to a file anywhere readable by the PHP script.  This allows loading without using the Gearman workers and without using a shared filesystem.

If you do not specify LOCAL, then the Gearman based loader is used.  You must not specify a path to the file when you omit the LOCAL keyword.  This is because the shared path will the pre-pended to the filename automatically.  The shared path must be a shared or network filesystem (NFS,CIFS,etc) and the files to be loaded must be placed on the shared filesystem for the Gearman based loader to work.  This is because workers may run on multiple nodes and all workers have to be able to read from the files to be loaded.

S3 is supported as a source of data

Instead of using a shared filesystem, S3 is now supported too.  You must specify an AWS access key and secret key when setting up Shard-Query.  After those are set up, simply use LOAD DATA INFILE ‘s3://bucket/filename‘ to load from an S3 bucket using Gearman workers.  The file will be split up into smaller chunks efficiently and automatically, and each 16MB chunk will be loaded individually.

If you use LOAD DATA LOCAL INFILE ‘s3://bucket/filename’ then Gearman will not be used and the file will be loaded from the local process instead.

Important: When the Gearman loader is used (recommended) the S3 load will be split over the workers, each worker loading a 16MB chunk of the file in parallel.

Shard-Query supports background jobs, query parallelism, and all SELECT syntax

SkySQL just blogged about a tool to schedule long running MySQL jobs, prevent too many queries from running simultaneously, and stores the results in tables.  It even uses Gearman.  You will note that the article says that it uses PAQU, which uses Shard-Query.

I think PAQU was created for two reasons.  A) Shard-Query lacked support for fast aggregation of STDDEV and VARIANCE (this has been fixed), and B) their data set requires “cross-shard queries”.  From what I can see though, their type of cross-shard queries can be solved using subqueries in the FROM clause using Shard-Query, instead of using a customized (forked) version of Shard-Query.  It is unfortunate, because my recent improvements to Shard-Query have to be ported into PAQU by the PAQU authors.

I’d like to encourage you to look at Shard-Query if you need to run complex jobs in the background and get the results later.  As a bonus, you get support for parallel query using partitioning, and you get shared-nothing scale-out for sharding too.  You get the latest and greatest improvements to Shard-Query, and if you have a query that you aren’t sure can be executed by Shard-Query, run the problem by me first so that I can suggest solutions before you fork the product.

Also:
Since scientific data sets rarely need to be updated (if ever), a column store like Infobright is ideal for such data.  This can reduce raw multi-TB size data sets into multi-hundred GB sized data sets that can be processed much faster.  Combine this with the ability to save on IO by reading on the necessary columns, and extremely fast data processing is possible.

To run background jobs in Shard-Query:

  1. $SQ = new ShardQuery();
  2. $SQ->async = true;
  3. $job_id=$SQ->query(“select count(*) from 1TB_table”);
  4. echo “Check the shard_query.jobs table for the completion status of $job_id\n”;

To poll for completed jobs:

  1. function wait_for_jobid($job_id) {
  2.   $sql = “select * from jobs where completion_percent = 100.00 or completion = ‘error’ and job_id = ” . (!is_numeric($job_id) ? ‘NULL’ : $job_id);
  3.   while(1) {
  4.     $SQ->state->mapper->sweep_jobs();
  5.     $stmt = $SQ->state->mapper->conn->my_query($sql);
  6.     $cnt = 0;
  7.     while($row=$SQ->DAL->my_fetch_assoc($stmt)) {
  8.       $cnt++;
  9.       break;
  10.     }
  11.     if($cnt > 0) break;
  12.     sleep(1);
  13.   }
  14.   return 1;
  15. }

To get the result after it is finished:

  1. $stmt = $SQ->get_async_result($job_id);
  2. while($row = $SQ->DAL->my_fetch_assoc($stmt)) {
  3. print_r($row);
  4. }

Note: SimpleDAL – Shard-Query data access layer
You will notice the above query uses the ->DAL member.  This is the SimpleDAL interface which ships with Shard-Query.  It allows Shard-Query to talk to different database providers (even possibly NoSQL providers) through a unified interface.  The class operates similarly to the mysqli class, except the functions are prefixed with my_ instead of mysqli.  Internally it uses PDO to talk to MySQL.

Tips for working with append-only databases using sharding and log structured tables

This post is structured like a series of questions and answers in a conversation.  I recently had a number of conversations that all pretty much went this same way.  If you, like others, have many basic questions about how to proceed when faced with an append-only store for the first time, well then hopefully this post will help provide some answers for you.  The post focuses on column stores, the most common append-only store, but there are others.

Why do I want to use a column store?

Column stores are optimal for OLAP analysis

Column stores offer substantial performance increases for OLAP  compared to row stores.  Row stores are optimized for OLTP workloads.  While a row store can be used for OLAP, it may not perform well because a row store has to retrieve every column for a row (unless there is a covering index).  This is one of the reason’s that I’ve said that covering index allows you to approximate the performance of a column store on a row store, when the table has a large number of columns.

You can find an introduction to column store technology here, including basic performance comparisons:

… but they have their downside

Many open source column stores are append only

When I start talking about column stores I start with the upsides regarding IO reduction, the  high compression and then bring this up this bombshell pretty late in the conversation.

This is what people look like when I tell them that ICE and Fastbit (two open source column stores with good on-disk performance) are append-only:

Simon Cowel with a thumbs down gesture

Please don’t let this turn you off.  Change your perspective.

I understand that if you have worked with a traditional RDBMS for a long time then being told that you can only insert data into a table is a pretty big shock.  There are, however many good reasons to use a column store, and workarounds exist which allow you to take advantage of the upsides while also minimizing downsides.  The append-only nature is just a trade-off.  Just like a column store arranges data completely differently on disk, you may re-arrange your thinking slightly about using an append-only database once you understand the workarounds.

Above all, be pragmatic.  Understand that there is no perfect technology for every situation.  Column stores highly optimize for read-only access at the cost of the loss of the ability to perform updates.  Also keep in mind that these column stores are not the only read-only data stores.  Map/Reduce with HDFS can’t really update data either.  If you want to update a file, you drop the file and upload a new one in it’s place.

One of the first questions is usually “How do we delete old data?”

Delete with the DROP command

Like HDFS, the only way to delete data in an append-only store is to DROP an object.  That could be as small as a partition (if the column store supports partitioning data), or a table or even an entire database.

To facilitate DROPing data use multiple databases and/or multiple tables

If you need to periodically prune data you can split your tables up into multiple databases.   Drop databases which contain data that you no longer wish to retain.

For example, you can segment your data into three schemata by year: year_2011, year_2012, year_2013

When you want to get rid of the year 2011 data:

DROP DATABASE year_2001;

Q: “That is great, but won’t splitting data up like that increases query complexity for OLAP queries?”

It may. When accessing a single year, queries are easy to write, but when accessing more than one year it can be more difficult.

When you think about it, however, placing the data into different databases is sharding.

This is where Shard-Query comes in.  It does the complex SQL work for your because it understands how to aggregate queries over multiple shards.  In addition, because Shard-Query is massively parallel processing it can access each shard (a year in this case) in parallel.  For increased parallelism you could create one database per month.  Of course, since each schema is a shard, you could place each month or year on different machines to achieve as much parallelism as is desired.

You can control data placement for optimal performance

Obtain the optimal degree of parallel processing so that queries on a small amount of data (a day or month) are fast enough without parallelism, and that queries on any larger quantity achieve parallelism because data is spread evenly over all the machines.  Queries that examine consecutive months should not examine data on the same machine.

Q: “What about updates?  How do I correct a mistake if I make one?”

You really can’t update an append-only table.  You have to think out of the box.

Method 1 – Get rid of “in-place” updates the easy way (copy/modify/swap)

Instead of updating the data in place, create a copy of the table (using a different storage method like MyISAM) and then modify the copy.  When you are finished, swap the old table with the new version of the table.  This is often the best course of action for smaller tables such as lookup tables and dimension tables in a star schema.  You probably can’t use this method for big tables such as fact tables.  You can technically do it, but you must orchestrate a complex ETL process to do so.

Keep in mind that you probably won’t have a lot of small tables if you have big data.  Infobright (ICE and IEE) engines really prefer one big table for your data instead of a normalized schema with lookup tables.  Because these databases doesn’t support indexes, it is necessary to perform hash joins or sort/merge joins in order to join tables together.  Fastbit doesn’t support joins at all – you must implement hash joins in your application.  If you do need to perform joins, keep the dimension tables short so that at least one side of the join fits in memory.

There is some added complexity with this method and ICE.  Infobright Community Edition (ICE) doesn’t support converting tables to their engine via ALTER TABLE.  Instead, with ICE, export the contents of the new table with SELECT INTO OUTFILE, and load a new version of the table with LOAD DATA INFILE.

Method 2 – Use Log structured tables and track changes via insertions

You can’t rebuild a 50 billion row table because you want to update a single row because someone misplaced a zero in a transaction at the bank.  If you think about what I just said, however, you just might have a long accepted answer to the problem at hand.

Your bank carries a ledger against your account, which is just a log when you think about it

When the bank records an error, they don’t go back and update the original transaction to correct it, instead they issue an “opposite” transaction to undo the error.  For example, if the clerk accidentally deposits 1000 dollars instead of 100 dollars, the following series of transactions may happen:

      1. 1000 counter deposit
      2. 1000 counter debit
      3. 100 counter deposit

Generalizing the concept

The log/ledger concept can be generalized to cover any sort of table if you add some extra information to the table.  Instead of  “debit” and “deposit” think of  “insertion” and “delete mark”.   In addition to recording the operation type, you also need to include information necessary for establishing the sequence for the operations on the table.

Log Tablelog table

In general, you must include

      1. A transaction number (which always goes up)
      2. A sequence number (this identifies the order of operations in the transaction)
      3. The operation type (this indicates the type of operation).  I usually call this column dml_type.

I could leave it up to you as to decide why those items are important, but here are some hints:

      1. The transaction number is useful for keeping summary tables synced up at the exact same point for multiple tables.
      2. The sequence number is necessary to find the newest version of a row, and to remove deleted rows from resultsets when the newest row is “delete marked”
      3. Rows with a “delete mark” will be have to be handled specially in aggregates too.
      4. An update can be recorded as a “delete mark” followed by an insertion with new values.

Here is an example table without encoding:

The table records a simple set of metrics from a set of sensor devices.  These sensors are very simple and only record two metrics, but many devices can report hundreds of metrics (like a MySQL server).  This table doesn’t currently have a column to record the operation type.

CREATE TABLE metrics (
  device_id bigint,
  probe_dtime datetime,
  power_level tinyint,
  temperature decimal(4,2)
);

The problem:

If “something bad” happens and wrong values or duplicate rows are inserted into this table, you would have to rebuild the whole table to get the bad values out were you using an append-only data store.  This is not unrealistic.  Rows could easily end up duplicated if there is some sort of issue and a file is loaded twice, for example.  This can happen because these open source column stores do not support primary keys.

Use a log structured table:

CREATE TABLE metrics (
  trx_id bigint,
  trx_seq int,
  dml_type tinytint, 
  device_id bigint,
  probe_dtime datetime,
  power_level tinyint,
  temperature decimal(4,2)
);

Handling deleted and updated rows

Assume that (device_id,probe_dtime) can be considered the PRIMARY KEY on the table.  This is logically the case, even though physically there may be rows with duplicated PK attributes due to multiple versions of rows being stored in the table simultaneously.  This happens when one or more versions are “delete marked”.

Showing only visible rows

SELECT device_id, 
       probe_dtime 
  FROM metrics 
 WHERE probe_dtime = '2013-04-30 12:34:56' 
   AND trx_seq = (select max(trx_seq) 
                    from metrics 
                   where device_id = metrics.device_id 
                     and probe_dtime = metrics.probe_dtime 
                  having sum(dml_type) > 0)
 ORDER BY device_id desc, 
          trx_id desc, 
          trx_seq desc;

Counting the rows in the table (or any group)

SELECT SUM(dml_type) as logical_count, 
       count(*) as physical_log_entry_count
  FROM metrics
SELECT probe_dtime, 
       SUM(dml_type) as logical_count, 
       count(*) as physical_log_entry_count
  FROM metrics
 WHERE probe_dtime between '2012-12-01' and '2012-12-31'
   AND device_id = 247
 GROUP BY probe_dtime;

Handling aggregating metrics with resultsets containing “delete marked” rows.  AVG() is more complicated.

SELECT max(probe_dtime), 
       max(dml_type * power_level), 
       min(dml_type * power_level), 
       sum(dml_type * power_level)/sum(dml_type * (power_level is null)) as avg_power_level
  FROM metrics
 WHERE device_id = 247
   AND probe_dtime between '2012-12-01' and '2012-12-31'
HAVING sum(dml_count) > 0;