The objective of this series is to explore performance for BigSQL Federation using practical examples.

This series is divided into two parts.

  • Part 1 will look at function performance with emphasis on pushdown analysis
  • Part 2 will look at performance for queries that join nicknames with local tables and/or other nicknames

You are reading Part 2.

Before you begin

We will be taking over where we left off on Federation Performance – Part 1 . The setup described will be added to the existing from Part 1. All the code shown comes from either jsqsh, or bash command line.
SQL statements were all run on jsqsh, the jsqsh interface would usually have a prefix like: “[jabs1.ibm.com][bigsql] 1>”, we will use ‘1>’ in our code for brevity.
Command like db2exfmt which need to be run on command line will be prefixed by: “[bigsql]$”

Federation Setup

For the examples in this part we will be using a PostgreSQL 9.2, and a Netezza 7.1 as our data sources.


1> create wrapper my_odbc library 'libdb2rcodbc.so' options (DB2_FENCED 'N', MODULE '/home/bigsql/sqllib/federation/odbc/lib/libodbc.so');
0 rows affected (total: 0.015s)
1> create server netz_1 type netezza version 7 wrapper my_odbc options (host '192.168.10.25', dbname 'system');
0 rows affected (total: 0.009s)
1> create user mapping for user server netz_1 options (remote_authid 'admin', remote_password 'password');
0 rows affected (total: 0.012s)
1> create nickname netz_low_var for netz_1.biga_low_var;
0 rows affected (total: 10.830s)
1> CALL SYSPROC.NNSTAT(NULL,'BIGSQL',NULL,NULL,NULL,2,NULL,?);
ok. (total: 13.291s)

+-------------+------------+
| Return Code | Param #2 |
+-------------+------------+
| 0 | Successful |
+-------------+------------+

We want our Netezza server to use a trusted wrapper so we have manually created one.
Note that in the create server statement we did not specify the port. If no port is specified, it is assumed the instance is running on the default port for that server type. In Netezza’s case that would be port 5480.
Another noteworthy detail is the use of the special keyword ‘user’ in the create user mapping, using this keyword will create a mapping between the current session user and the remote user in the options. Because we are connected as bigsql this statement is actually equivalent to the one we used on Part 1.
As part of creating our new nickname we will also update it’s stats. In fact, by using the arguments shown, all nicknames under the BIGSQL schema will have their statistics updated.

Join local Hadoop data to nickname

When joining a local table to a remote table there are two possible execution patterns.

One execution pattern is when the join is processed at the coordinator node (this would be the head node in BigSQL). In this execution pattern the data is fetched from the worker nodes and joined serially to the data coming from the nickname, at the head node.

The other execution pattern is when data is fetched from the data source, distributed between the worker nodes for local processing. When processing is completed the partial results are sent to the head node for aggregation. Usually speaking the fact that we are able to use more workers should lead to increased performance.

Catalog Review

For this section we will look at joining local data, with data from a table in our Netezza server. We will need to know the options for the wrapper this server is using.


1> select * from syscat.servers where servername='NETZ_1;
+--------------+------------+------------+---------------+---------+
| WRAPNAME | SERVERNAME | SERVERTYPE | SERVERVERSION | REMARKS |
+--------------+------------+------------+---------------+---------+
| MY_ODBC | NETZ_1 | NETEZZA | 7 | [NULL] |
+--------------+------------+------------+---------------+---------+
1 rows in results(first row: 0.008s; total: 0.011s)

1> select * from syscat.wrapoptions where wrapname like 'MY%';
+--------------+------------+----------------------------------------------------+
| WRAPNAME | OPTION | SETTING |
+--------------+------------+----------------------------------------------------+
| MY_ODBC | DB2_FENCED | N |
| MY_ODBC | MODULE | /home/bigsql/sqllib/federation/odbc/lib/libodbc.so |
+--------------+------------+----------------------------------------------------+
2 rows in results(first row: 0.012s; total: 0.013s)

The tables

We will use the nickname we created, and two Hadoop tables. The only difference between the two Hadoop tables is their cardinality.


1> \describe netz_low_var
+-------------+-------------+-----------+-------------+----------------+-------------+
| TABLE_SCHEM | COLUMN_NAME | TYPE_NAME | COLUMN_SIZE | DECIMAL_DIGITS | IS_NULLABLE |
+-------------+-------------+-----------+-------------+----------------+-------------+
| BIGSQL | ID | INTEGER | 10 | 0 | YES |
| BIGSQL | SINT | SMALLINT | 5 | 0 | YES |
| BIGSQL | DBL | DOUBLE | 53 | [NULL] | YES |
| BIGSQL | COL_VCH | VARCHAR | 10 | [NULL] | YES |
| BIGSQL | COL_VCH2 | VARCHAR | 200 | [NULL] | YES |
+-------------+-------------+-----------+-------------+----------------+-------------+

1> select tabschema,tabname, card from sysstat.tables where tabname like 'NETZ%';
+-----------+--------------+--------+
| TABSCHEMA | TABNAME | CARD |
+-----------+--------------+--------+
| BIGSQL | NETZ_LOW_VAR | 827392 |
+-----------+--------------+--------+
1 rows in results(first row: 0.039s; total: 0.040s)

1> select tabschema, tabname, colname, colcard, high2key, low2key from sysstat.columns where tabname like 'NETZ_%' order by tabname;
+-----------+--------------+----------+---------+----------+---------+
| TABSCHEMA | TABNAME | COLNAME | COLCARD | HIGH2KEY | LOW2KEY |
+-----------+--------------+----------+---------+----------+---------+
| BIGSQL | NETZ_LOW_VAR | COL_VCH | 101 | '99' | '100' |
| BIGSQL | NETZ_LOW_VAR | COL_VCH2 | 101 | '99' | '100' |
| BIGSQL | NETZ_LOW_VAR | DBL | 101 | 1.2E2 | 2.0E1 |
| BIGSQL | NETZ_LOW_VAR | ID | 101 | 120 | 20 |
| BIGSQL | NETZ_LOW_VAR | SINT | 101 | 120 | 20 |
+-----------+--------------+----------+---------+----------+---------+
5 rows in results(first row: 0.028s; total: 0.030s)

1> select tabschema, tabname, colname, colcard, high2key, low2key from sysstat.columns where tabname='MY_SMALL_LOCAL';
+-----------+----------------+---------+---------+----------+---------+
| TABSCHEMA | TABNAME | COLNAME | COLCARD | HIGH2KEY | LOW2KEY |
+-----------+----------------+---------+---------+----------+---------+
| BIGSQL | MY_SMALL_LOCAL | COL_VCH | 100 | '9740' | '10453' |
| BIGSQL | MY_SMALL_LOCAL | ID | 100 | 32758 | 69 |
| BIGSQL | MY_SMALL_LOCAL | SINT | 64 | 98 | 0 |
+-----------+----------------+---------+---------+----------+---------+
3 rows in results(first row: 0.033s; total: 0.035s)

1> select tabschema,tabname, card from sysstat.tables where tabname='MY_SMALL_LOCAL';
+-----------+----------------+------+
| TABSCHEMA | TABNAME | CARD |
+-----------+----------------+------+
| BIGSQL | MY_SMALL_LOCAL | 100 |
+-----------+----------------+------+
1 row in results(first row: 0.040s; total: 0.041s)

1> select tabschema, tabname, colname, colcard, high2key, low2key from sysstat.columns where tabname='MY_LOCAL_TABLE';
+-----------+----------------+---------+---------+----------+---------+
| TABSCHEMA | TABNAME | COLNAME | COLCARD | HIGH2KEY | LOW2KEY |
+-----------+----------------+---------+---------+----------+---------+
| BIGSQL | MY_LOCAL_TABLE | COL_VCH | 31901 | '9999' | '0' |
| BIGSQL | MY_LOCAL_TABLE | ID | 33936 | 32767 | 0 |
| BIGSQL | MY_LOCAL_TABLE | SINT | 1000 | 999 | 0 |
+-----------+----------------+---------+---------+----------+---------+
3 rows in results(first row: 0.024s; total: 0.026s)

1> select tabschema,tabname, card from sysstat.tables where tabname='MY_LOCAL_TABLE';
+-----------+----------------+---------+
| TABSCHEMA | TABNAME | CARD |
+-----------+----------------+---------+
| BIGSQL | MY_LOCAL_TABLE | 4000000 |
+-----------+----------------+---------+
1 row in results(first row: 0.005s; total: 0.006s)

The query

We will be looking at two queries. The first query joins the nickname to MY_SMALL_LOCAL, a Hadoop table that has smaller cardinality than the nickname. The second joins the nickname to MY_LOCAL_TABLE, a Hadoop table with cardinality higher than the nickname.


1> select count(*) from netz_low_var n1 join my_small_local l1 on n1.id=l1.id;
+------+
| 1 |
+------+
| 8192 |
+------+
1 row in results(first row: 11.550s; total: 11.551s)

1> select count(*) from netz_low_var n1 join my_local_table l1 on n1.id=l1.id;
+-----------+
| 1 |
+-----------+
| 100270080 |
+-----------+
1 row in results(first row: 16.347s; total: 16.353s)

Execution analysis

One way to see if inter-partition parallelism is enabled is to check the query’s execution plan. We already know how to generate and retrieve plans from Part1.

The section we are looking at in the plan is the “Access Plan”. What we want to see is a BTQ(Broadcast Table Queue) above the SHIP operator. You will still remember the SHIP operator is the actual portion of the plan being run on the remote data source. A BTQ operator above it means the data is being fetched from the remote data source and distributed to the database partitions.

We will start by looking at what happens when we join our nickname to a small table


1> explain all for select count(*) from netz_low_var n1 join my_small_local l1 on n1.id=l1.id;
0 rows affected (total: 0.028s)

[bigsql]$ db2exfmt -o local_small_nick_trusted_join.plan -d bigsql -1
DB2 Universal Database Version 11.1, 5622-044 (c) Copyright IBM Corp. 1991, 2016
Licensed Material - Program Property of IBM
IBM DATABASE 2 Explain Table Format Tool

Connecting to the Database.
Connect to Database Successful.
Using SYSTOOLS schema for Explain tables.
Output is in local_small_nick_trusted_join.plan.
Executing Connect Reset -- Connect Reset was Successful.

federation plan with no inter-partition parallelism

In this access plan we can see a DTQ(Directed Table Queue) is on the Hadoop table leg. What this means is the data is being fetched from particular partitions to be joined to the nickname at the head node.

There are two reasons why parallelism might not be have been used:
– The nickname being accessed is using a trusted wrapper
– The Hadoop table we are using is too small

The nickname being accessed is using a trusted wrapper

From looking at our catalog earlier we know this is true. Inter-partition parallelism is not supported on trusted wrappers. We can either alter our wrapper or re-create the federation objects using a fenced wrapper. Remember that wrappers created implicitly by the create server statement will always be fenced. For now we will change our wrapper options to make our MY_ODBC a fenced wrapper.


1> alter wrapper my_odbc options (set DB2_FENCED 'Y');
0 rows affected (total: 0.019s)

The new plan generated below has changed slightly but the directed (in this case broadcasted) operation is still on the Hadoop table’s leg.

federation plan with no inter-partition parallelism over fenced wrapper

The Hadoop table we are using is too small

When parallelism is available one reason why it might not be used is because the local table is too small. Because each node has a very small amount of data there would not be a lot of processing to be done locally. This would cause the cost of transferring the data to out-weight any gains that could be had by processing the data in parallel.

For the final plan bellow we used the large Hadoop table. As you can see at the bottom of the plan which is shown below, the broadcast operator is now on the side of the nickname.


1> explain all for select count(*) from netz_low_var n1 join my_local_table l1 on n1.id=l1.id;
0 rows affected (total: 10.191s)

federation plan with inter-partition parallelism over fenced wrapper

Performance comparison

This performance comparison is for the following query.


select count(*) from netz_low_var n1 join my_local_table l1 on n1.id=l1.id

The query was run 11 times for both the trusted and the fenced wrapper. Execution times were captured using UNIX’s time utility, the values captured are from the ‘real’ output.
In the table you can see the times of the first run of each statement, which would not have the benefit of cache. And the average of the next 10 runs.

Query With Parallelism (Fenced) With no parallelism (Trusted)
1 0m13.781s 0m16.427s
[10] 0m3.47s 0m6.22s

Joining nicknames from different sources

When using the default configuration, a join between nicknames located in different data source will be executed at the head node. The head node will receive all the data and do all the processing itself.

It is possible to parallelize query processing by configuring computational groups. Computational groups will only be used when executing queries with joins or set operations that involve only nicknames. In a computational group configuration the head node is still the only node communicating with the remote data source. The data is retrieved and distributed between the members of the chosen computational group using hash-partitioning. The hash-partitioning guarantees that matching join keys will end up in the same partition. After all partial computations are completed, the results are sent back to the head node for final processing.

There is a database partition group created by default when BigSQL is installed called IBMDEFAULTGROUP, this group includes all partitions in your database and is managed automatically. For example if a worker node is decommissioned this group will automatically be altered to remove the decommissioned node.

It is possible to create and use an alternative partition group with only a subset of the nodes but you should be aware that this group would not be automatically managed, as such before decommissioning a node for example this group would need to be manually altered.

Catalog Review


1> select * from syscat.dbpartitiongroupdef;
+-----------------+----------------+--------+
| DBPGNAME | DBPARTITIONNUM | IN_USE |
+-----------------+----------------+--------+
| IBMCATGROUP | 0 | Y |
| IBMDEFAULTGROUP | 1 | Y |
| IBMDEFAULTGROUP | 2 | Y |
| IBMDEFAULTGROUP | 3 | Y |
| BIGSQLCATGROUP | 0 | Y |
| IBMDEFAULTGROUP | 0 | Y |
+-----------------+----------------+--------+
6 rows in results(first row: 0.021s; total: 0.024s)

The tables

We will be joining our two remote tables. Exactly like for queries involving local and remote data, the amount of data being joined makes a difference when deciding if computation groups should be used or not. For that reason we increased the data on our postgreSQL table so that now the statistics are as follow.


1> select tabschema,tabname, card from sysstat.tables where tabname='POST_LOW_VAR';
+-----------+--------------+--------+
| TABSCHEMA | TABNAME | CARD |
+-----------+--------------+--------+
| BIGSQL | POST_LOW_VAR | 819200 |
+-----------+--------------+--------+
1 row in results(first row: 0.038s; total: 0.038s)

1> select tabschema, tabname, colname, colcard, high2key, low2key from sysstat.columns where tabname='POST_LOW_VAR';
+-----------+--------------+----------+---------+----------+---------+
| TABSCHEMA | TABNAME | COLNAME | COLCARD | HIGH2KEY | LOW2KEY |
+-----------+--------------+----------+---------+----------+---------+
| BIGSQL | POST_LOW_VAR | COL_VCH | 100 | '99' | '1' |
| BIGSQL | POST_LOW_VAR | COL_VCH2 | 100 | '99' | '1' |
| BIGSQL | POST_LOW_VAR | DBL | 100 | 1.0E2 | 1.0E0 |
| BIGSQL | POST_LOW_VAR | ID | 100 | 100 | 1 |
| BIGSQL | POST_LOW_VAR | SINT | 100 | 100 | 1 |
+-----------+--------------+----------+---------+----------+---------+
5 rows in results(first row: 0.023s; total: 0.024s)

The query


1> select count(*) from post_low_var n1 left join netz_low_var n2 on n1.id =n2.sint;
+------------+
| 1 |
+------------+
| 5435973632 |
+------------+
1 row in results(first row: 20m28.514s; total: 20m28.519s)

Execution analysis

A good way to tell if a computational group will be used in the execution of a query is to look at it’s execution plan. The section we will look at is the “Access Plan”. What we want to see is DTQ operators on both of the join’s legs above the SHIP operators.

The existence of a database partition group is one of two steps required when setting up computational groups, the other is the setting of a register variable. Before we do that configuration change though, let’s look at what the plan looks like when no computational groups are defined.

With no computational group


1> explain all for select count(*) from post_low_var n1 left join netz_low_var n2 on n1.id =n2.sint;
0 rows affected (total: 10.124s)

[bigsql]$ db2exfmt -o nick_nick_join_nocp.plan -d bigsql -1
DB2 Universal Database Version 11.1, 5622-044 (c) Copyright IBM Corp. 1991, 2016
Licensed Material - Program Property of IBM
IBM DATABASE 2 Explain Table Format Tool

Connecting to the Database.
Connect to Database Successful.
Using SYSTOOLS schema for Explain tables.
Output is in nick_nick_join_nocp.plan.

federation join nicknames no computational group

As expected in the plan above there are no DTQ operators. That tells us that all data is retrieved from the data sources and joined at the head node.

With computational group

In our case, the cluster is quite small and we do want to use all the nodes for federation processing so we will not create a custom partition group. We will use the IBMDEFAULTGROUP group. Using this group has the added advantage of it being a partition group that is managed by the system.


[bigsql]$ db2set DB2_COMPPARTITIONGROUP=IBMDEFAULTGROUP

Register variables like DB2_COMPPARTITIONGROUP require an instance restart in order to come into effect. Let’s restart our BigSQL service from the ambari GUI.


1> explain all for select count(*) from post_low_var n1 left join netz_low_var n2 on n1.id =n2.sint;
0 rows affected (total: 0.086s)

[bigsql]$ db2exfmt -o nick_nick_join_cp.plan -d bigsql -1
DB2 Universal Database Version 11.1, 5622-044 (c) Copyright IBM Corp. 1991, 2016
Licensed Material - Program Property of IBM
IBM DATABASE 2 Explain Table Format Tool

Connecting to the Database.
Connect to Database Successful.
Using SYSTOOLS schema for Explain tables.
Output is in nick_nick_join_cp.plan.

At the bottom of our access plan, shown here, we can see that both legs have DTQ operators, meaning hash-partitioning is happening.

Federation nickname join with computational group

Alternative check for running queries

There is also a way to check if computational groups are being used by checking the application details in db2top while the query is running. You can start db2top by calling db2top -d bigsql on the command line.

The image below shows how to reach the application details starting on the top left corner, select sessions tab by pressing ‘l’. From the sessions select the session the query is running under.

In this case we only had one session connected so it was very easy. In a busier system you can use the values in “Application Name”, “DB User” and “ClientNetName” to find the connection you are looking for. If these values are not all on the display, use the left and rights arrows on your keyboard to navigate the properties list. Once you know which one is your connection, select it by pressing key “a” that will open a pop-up where you should insert the value in “Application Handle(Stat)”.

Once the application is opened you should see an output similar to the one at the bottom of the image. Notice the “#DBP” value, if computational groups are in use at least on of the query subsections will have a value corresponding to the number of nodes in the computational group we specified. In the example it’s section 1, running in 3 database partitions.

federation query on db2top

Performance comparison

The following table looks at performance over the number of nodes using the same query and data used is the “Execution Analysis” subsection.

The computational group used is the default partition group IBMDEFAULTGROUP composed of the head node plus a number of worker nodes (The remaining node were stopped). Column headers on the table are the number of worker nodes in the computational group. The “No CG” column captures execution time when no computational group is defined. If we were taking up form where we left off on the previous section, this could be done by un-setting the DB2_COMPPARTITIONGROUP register variable. To do that we would issue the command db2set DB2_COMPPARTITIONGROUP= and restart the instance.

Times were captured using the time utility on a RHEL system, the value captured is the ‘real’. For every data point the query is run once to ‘warm up’ the system then run 10 consecutive times. The values shown are the average running time of the 10 consecutive runs.

No CG 1 2 3
20:19.45 11:06.84 05:30.27 03:54.06

Conclusion

In this second part of the Federation Performance in BigSQL series we analyzed performance for joins and set operations involving nicknames. We have also explored some configurations that might help improve query performance for precisely these cases.

As with any performance improvement this is not without a cost, in this case the cost of performance is an increase in the number of running FMP processes. In both cases we looked at, in order to achieve greater performance we are making use of the BigSQL worker nodes. Although the actual work is being done at the worker node, data needs to be communicated between each worker node and the head node. This results in the spawning of processes on the head.

With these costs in mind and the information we have gone through it should be possible for administrator to configure Federation in BigSQL for a good performance.

I hope this series has been informative and will be useful for any experimenting or using BigSQL’s Federation feature.

Join The Discussion

Your email address will not be published. Required fields are marked *