Big SQL v4.2 included several new goodies to help performance and scalability. So much so that I felt the urge to update the Performance Tips blog from the previous version. So once again I’ve pulled together the 6 most frequent tips I pass along to clients. Rather than priority order, they‚Äôre ordered chronologically in the order I think you‚Äôll need them.
1. Resource sharing:
Determine what percentage of compute resource should be assigned to Big SQL.
A Hadoop cluster with Big SQL installed has its resources split between Big SQL and all other Hadoop components. The Big SQL percent usage (bigsql_pct_usage) install property specifies how much of the cluster‚Äôs resources (CPU and memory) are assigned to Big SQL. During install, specify a value for bigsql_pct_usage property indicating the percentage of cluster resources you would like to allocate to Big SQL:
The default is 25% but I find the vast majority of deployments have increased this to at least 50%. It can be tricky to get right at install time since it‚Äôs difficult to determine the optimal split of resources required between Big SQL and other Hadoop components until your workload is up and running. Fortunately, this property is relatively easy to change post install.
The exact resources you need to dedicate to Big SQL vs other Hadoop components depends upon the performance requirements of the Big SQL queries and Hadoop jobs within your workload. For example, you may have accepted the default of 25% at install time but now wish to dedicate more of the cluster resources to Big SQL – to say a 50:50 split. In this case you could re-assign cluster resources and dedicate more to Big SQL and less to Map-Reduce jobs.
To change the percentage of resources dedicated to Big SQL post install, use the AUTOCONFIGURE command. For example, to increase Big SQL‚Äôs allocation from 25% to 50%, connect to the bigsql database and use:
CALL SYSHADOOP.BIG_SQL_SERVICE_MODE('ON'); AUTOCONFIGURE USING MEM_PERCENT 50 WORKLOAD_TYPE COMPLEX IS_POPULATED NO APPLY DB AND DBM;
Corresponding adjustments will need to be made to the following YARN properties to decrease its allocation (these are not done automatically by the AUTOCONFIGURE command). Both these properties are found under the “YARN – Configs – Settings” panel on the Ambari interface:
- yarn.nodemanager.resource.memory-mb – used to control teh total amount of memory allocated to YARN:
- yarn.scheduler.maximum-allocation-mb – used to control the maximum allocation for a container request:
For more information on this topic please read further on Increasing Big SQL Memory.
2. Choose an optimized storage format
Big SQL 4.2 is optimized for Parquet storage format.
The storage format used to store your Big SQL data is a critical contributor to the performance and efficiency of your cluster. Not only does the storage format have a direct influence on query times, it also impacts the volume of data stored (due to compression), CPU consumed to read and process that data, and data ingest times.
Although Big SQL 4.2 supports many of the popular Hadoop storage formats, for analytical workloads the Parquet storage format is recommended. Big SQL is highly optimized to process data in Parquet files, and internal tests have shown Parquet to have the most efficient compression ratio, and lowest query elapsed times.
Regardless of the storage format chosen, it is important to understand that HDFS is optimized for storing and processing large files. If your data ingest techniques create thousands of small files, then this can severely impact the performance and scalability of your hadoop cluster. This is general recommendation across all hadoop distros and is not just limited to Big SQL or IOP.
3. PARTITION your data
Partition your data to improve query response times and improve cluster efficiency
Partitioning is a common practice in SQL over Hadoop solutions today. The aim of partitioning a table is to reduce the amount of data a query has to process in order to arrive at the final answer. By reducing the data processed, the query completes faster, and consumes fewer resources.
Both Big SQL and Hive use a similar partitioning scheme ‚Äď specified by the ‚ÄúPARTITIONED BY‚ÄĚ clause on the ‚ÄúCREATE HADOOP TABLE‚ÄĚ statement. Like Hive, Big SQL stores different data partitions as separate files in HDFS and only scans the partitions/files required by the query. However, Big SQL v4.2 introduces several new and unique techniques to better exploit partitioned data sets – which expands the use-cases for partitioning in Big SQL above and beyond those offered by other SQL over Hadoop solutions.
Lets first consider an example which exploits standard Hive partitioning. The following table holds details of items sold over a 5 year period:
CREATE HADOOP TABLE LINEITEM ( L_ORDERKEY BIGINT NOT NULL, L_PARTKEY INTEGER NOT NULL, L_LINENUMBER INTEGER NOT NULL, L_QUANTITY FLOAT NOT NULL, L_DISCOUNT FLOAT NOT NULL, L_TAX FLOAT NOT NULL, L_SHIPINSTRUCT VARCHAR(25) NOT NULL, L_SHIPMODE VARCHAR(10) NOT NULL, L_COMMENT VARCHAR(44) NOT NULL) PARTITIONED BY (L_SHIPDATE VARCHAR(10)) STORED AS PARQUETFILE;
The table is partitioned on the date the item was shipped ‚Äď so data for every day over the 5 year period is stored in separate HDFS files (over 5 years, that‚Äôs approx. 1825 files). Now, if a query has a predicate based on the partitioning column then Big SQL can eliminate all other partitions/files from the query without having to read them. For example, the following predicate would results in Big SQL reading and processing just 10 partitions (storing the 10 days of data of interest) to complete the query ‚Äď instead of all 1825 ‚Äď resulting in a large performance improvement:
‚Äúl_shipdate >= date ('2015-01-01') and l_shipdate <=date(‚Äė2015-01-10‚Äô)‚ÄĚ
Up until Big SQL v4.2, to get the maximum benefit from partitioning one had to choose a partitioning key that was:
- commonly referenced in range delimiting or equality predicates of the queries, and
- had a few hundred to a few thousand unique values.
In Big SQL v4.2, both of these criteria have been relaxed by introducing deferred partition elimination and partitioning on column expression. Let me explain further.
Many of today’s Data Lake implementations on Hadoop employ a star or snowflake schema – primarily a hang over from the Relational Data Warehouses from which the Data Lakes originate. Such schemas are normalized, with dimension attributes (such as dates, item descriptions, customer details etc…) being held in small dimension tables. These attributes are linked to the large fact tables via a foreign key relationship. Queries against these schemas would not normally have an explicit predicate on a candidate partitioning column belonging to the fact table. For example, the following query finds the min, max and average sales for each day in a given month:
SELECT MIN(s.s_price), MAX(s.s_price), AVG(s.s_price) FROM store_sales s, date_dim d WHERE s.s_date_sk = d.d_date_sk AND d.d_date between '2002-08-01' AND '2002-08-01' + 1 MONTH GROUP BY d.d_date
Partitioning on s_date_sk column of the store_sales (fact) table may seem like the way to go, but because there is no explicit predicate on the s_date_sk column — it has a join predicate to the date_dim (dimension) table, but the explicit predicate is on the date_dim table — then there would be no partition elimination. This has all changed in Big SQL v4.2 with the introduction of deferred partition elimination which enables Big SQL to eliminate partitions based on join keys. In the above example, the fact table can now be partitioned on s_date_sk, and Big SQL is smart enough to use the join between store_sales and date_dim, along with the explicit predicate on date_dim to eliminate those partitions in the fact table which are not in the one month range. For a more in-depth look at deferred partition elimination see the following article.
Partitioning on column expression is immensely powerful because it allows users to partition on high cardinality columns which previously was not practical. As a rule of thumb, prior to v42 I would always recommend clients not to exceed 10,000 partitions per table. Anything beyond this creates too many directories and too many small files in HDFS, and it’s well known that HDFS does not efficiently handle large numbers of small files. Big SQL v4.2 introduces support for expressions in the PARTITIONED BY clause of the CREATE HADOOP TABLE statement, thus allowing significantly more flexibility in how the data can be partitioned. Lets look at an example with a TRANSACTION table which contains a unique transaction id column and a transaction timestamp:
CREATE HADOOP TABLE TRANSACTION ( trans_id int, -- transaction id, unique product varchar(50), trans_ts varchar(20) -- transaction timestamp ) PARTITIONED BY ( INT(trans_id/10000) AS trans_part, YEAR(trans_ts) AS year_part )
Most of the queries against the transaction table have predicates involving one, or both of trans_id and trans_ts. If the table holds details of 10M transactions across a 10 year period, then partitioning on trans_id and trans_ts would create 10M*10=100M partitions – which shatters our rule of thumb of 10K partitions per table. By partitioning on the column expression shown in the example, the table will have (10M/10000)*10=10K partitions. So by introducing partitioning on column expression in Big SQL v4.2, users are now able to deploy partitioning in many more situations than previously possible – which can lead to large improvements in performance as the volume of data read and processed can be greatly reduced.
In the above example, each of the following queries would benefit from partitioning on the column expression:
SELECT ...... FROM transaction WHERE trans_id = 10 ; SELECT ...... FROM transaction WHERE trans_ts > '2015-06-14' ; SELECT ...... FROM transaction WHERE trans_id = 20 AND trans_ts > '2015-06-15' ;
For more information on partitioning on column expression see Overview of partition on column expression feature, or if you want to see it in action play the following video.
Partitioning has many additional benefits including the ability to dynamically add and remove partitions to/from a table ‚Äď making it very easy to roll-in new data and roll-out old data. In addition, when the ANALYZE command (see below) is run against a partitioned table, it will only ANALYZE those partitions that don’t have statistics already collected on them, and not the entire table.
For more information on how to create partitioned tables, see the Big SQL Knowledge Center.
4. Use ANALYZE to gather statistics
Frequently use ANALYZE to update a table‚Äôs statistics
This is still the single most important step in ensuring your Big SQL queries are efficiently and expeditiously executed. Whenever I get asked to investigate a performance issue with a query, my first step is always to ensure that all tables and columns involved in the query have accurate and up to date statistics. This usually solves 80% of the performance issues I investigate.
The ANALYZE command gathers statistics about the data within a table. The stats are then used by the Big SQL cost-based optimizer to make informed decisions about query execution ‚Äď such as join order, join types, when & where to sort data etc‚Ä¶. Prior to v4.2, ANALYZE spawned a Map-Reduce job to go gather the stats. In v4.2, ANALYZE was redesigned and no longer uses Map-Reduce, statistics are gathered and computed completely within the Big SQL engine; making it simpler, faster and more efficient.
ANALYZE should be run whenever:
- A new table is populated with data,
- An existing table‚Äôs data undergoes significant changes:
- new data added,
- old data removed,
- existing data is updated
However, statistics do not need to be gathered for all columns within a table. Only those columns which are referenced by predicates (including join predicates), aggregation functions and used in informational constraints (see below) need to have statistics gathered. In addition, if there are sets of columns that have an inter-relationship, then column group statistics should be gathered on these columns. For example, columns storing the STATE and COUNTRY of a customer are likely to have an inter-relationship ‚Äď all customers who live in the STATE of ‚ÄėCalifornia‚Äô, will have ‚ÄėUSA‚Äô for COUNTRY. By gathering column group statistics on STATE and COUNTRY together, the Big SQL optimizer is able to infer this relationship and use the information to more efficiently plan the execution of queries. For more information on which statistics to gather, see the Big SQL v4.2: Best Practices for Collecting Statistics blog.
The development team also introduced smarts in gathering statistics by means of Auto-Analyze. Auto-Analyze will automatically collects statistics for tables and columns immediately following a LOAD or HCAT_SYNC_OBJECTS statement. It will also automatically analyze a table which it thinks has out of date statistics. These smarts are designed to ensure stats are always accurate and current, which in turn will improve query performance.
Another big plus for ANALYZE in v4.2 is the introduction of sampling. Sampling allows only a fraction of the entire table to be processed when calculating statistics. I recommend using a 10% sampling rate for all the large tables in a schema, and no sampling (ie: all the data) for the small tables. Performance tests have shown sampling 10% of the data can improve performance of ANALYZE by as much as 5x compared to no sampling. In lab tests, a 10% sample still maintained sufficient accuracy of the statistics and did not adversely impact query performance.
The following example collects table and column level statistics on the ORDERS table, along with column group statistics on STATE and COUNTRY by sampling 10% of the table:
ANALYZE TABLE SIMON.ORDERS COMPUTE STATISTICS FOR COLUMNS ORDERKEY, CUSTKEY, ORDERSTATUS, TOTALPRICE, ORDERDATE, ORDERPRIORITY, STATE, COUNTRY, (STATE, COUNTRY) TABLESAMPLE SYSTEM (10);
If you are wondering which tables in your schema are missing statistics, use the following query:
select substr(tabname,1,20) as table, stats_time, card from syscat.tables where tabschema='SIMON' and card=-1;
And the following query to identify the columns within a table that have no statistics:
select substr(colname,1,20) as column, colcard from syscat.columns where tabschema='SIMON' and tabname='ORDERS' and colcard=-1;
5. Define Informational Constraints
Use Informational Constraints to define relationships within your schema.
Informational Constraints are much like regular primary and foreign key constraints (available in most RDBMSs today) except the constraints are not enforced. One might ask what‚Äôs the use of a constraint that is not enforced ‚Äď well, even though these constraints are not enforced, once defined they do provide useful information to the Big SQL optimizer which it can use when planning a query. As a simple example, if a query requires only distinct values from a column, then if a primary key constraint is defined on that column the Big SQL optimizer knows that the column only contains unique values and no operations are necessary in order to ensure this uniqueness. However, if a unique constraint was not defined on the column, then the optimizer would have to sort and aggregate the data during query execution in order to guarantee uniqueness.
It is recommended to create primary and foreign key informational constraints to define relationships that exist in your schema. The following example defines primary keys on the ORDERS and LINEITEM tables, and a primary-foreign key relationship between ORDERS and LINEITEM:
alter table simon.orders add primary key (O_ORDERKEY) not enforced; alter table simon.lineitem add primary key (L_ORDERKEY,L_LINENUMBER) not enforced; alter table simon.lineitem add foreign key (L_ORDERKEY) references SIMON.orders (O_ORDERKEY) not enforced;
Important: Because these constraints are not enforced, no error is returned if the table data violates an informational constraint, and incorrect results might occur. When using an informational constraint, it is critical to ensure that your data adheres to the constraint definition outside of Big SQL. The most common methods for enforcing the constraints outside of Big SQL include doing so with ETL logic, or if the source of the data is a database system that has a similar (but enforced) constraints.
6. Use appropriate data types for the values being stored
In Big SQL 4.2 the use of STRING data types is not recommended.
The STRING data type is commonly used in Hive to represent character data of variable (and undetermined) length. In Big SQL, the STRING type is mapped internally as VARCHAR(32K). The Big SQL engine internally works with data in units of 32K pages and works most efficiently when the definition of a table allows a row to fit within 32k of memory. Once the calculated row size exceeds 32k, performance degradation can be seen for certain queries. Therefore, the use of STRING when defining columns of a table is strongly discouraged. Instead, modify references to STRING to explicit VARCHAR(n) that most appropriately fits the data size, or use the bigsql.string.size property to lower the default size of the VARCHAR to which the STRING is mapped when creating new tables.
In Big SQL 4.2 the use of TIMESTAMP and DATE data types with PARQUET storage format is not recommended.
The use of TIMESTAMP data types with the PARQUET storage format can cause high amounts of system CPU to be consumed. This is observed when the PARQUET file is generated using Map-Reduce (including Hive and BigSQL LOAD) with TIMESTAMP data and is due to conversions which occur when the TIMESTAMP data is read/written. For more information, see the following article.
Since DATE types are stored internally as TIMESTAMPs in PARQUET, then DATEs will suffer the same issue and should also be avoided when using PARQUET.
Instead, define TIMESTAMPs and DATEs as appropriate length CHAR or VARCHAR fields.
Is there anything else I should look out for ?
Well there are other knobs within Big SQL and the larger Hadoop eco-system that can be tweaked to improve performance, but if you follow the 6 tips above then your cluster should hum along nicely. The Big SQL 4.2 installer automatically calculates the memory required for sorting, caching and various heaps based on the memory available on the cluster and the Big SQL percent usage specified. As workloads are executed, Big SQL‚Äôs Self Tuning Memory Manager (STMM) monitors the memory consumption between the consumers and automatically adjusts the allocation to tune for the workload being executed. So there‚Äôs no need to jiggle Big SQL‚Äôs memory allocation ‚Äď that‚Äôs all done automatically by STMM.
Not many folks realize, but Big SQL has a sophisticated built in Workload Manager that can be used to prioritize work, meet SLAs and prevent run away queries. This workload manager is inherited from the workload manager in DB2 Linux, Unix and Windows and is enabled by default. The Workload Manager Dispatcher can also be used to set and control CPU limits between different categories of Big SQL users.
If you plan to do a little benchmarking of Big SQL queries its worth noting that the default Big SQL JDBC client (JSQSH) does a fantastic job of prettying up the output from queries in a very nice tabular form. Although this looks very nice, it comes at quite a cost; when the result set is large this pretty formatting can add 7-8x overhead to the elapsed time of the query! The overhead is still present even if redirected to /dev/null as the data is still being formatted by jsqsh before it is discarded. To void this unnecessary overhead, use ‚Äė-vstyle=discard‚Äô jsqsh option to fetch data to the client and discard without rendering, or ‚Äė-vstyle=vertical‚Äô to fetch data to the client and render data using a more efficient format.
Its a well know fact that Hadoop does not cope well with many small files – just google “hadoop small files”, and read a few of the results. As such, one should avoid creating many small files and then querying them via Big SQL (or Hive for that matter) as performance will be severely impacted.
Finally, but by no means least, if you want the maximum performance out of your Big SQL cluster (and why wouldn’t you), be sure to check that CPU Scaling is disabled.
To get started, download the IBM Open Platform with Apache Hadoop.
Visit the Knowledge Center for installation and configuration information.