Win $20,000. Help build the future of education. Answer the Call for Code. Learn more

Push custom analytics to Netezza database

In this article, you will learn what the ibmdpy4nps package is and how it could help machine learning engineers and data scientists execute their custom ML and analytics functions inside Netezza. Both Netezza on-prem and Netezza cloud versions support this functionality, offering the seamless One Netezza experience across platforms.

Introduction

Why is in-database analytics needed?

The need for in-database analytics is well known. The power of harnessing MPP hardware for faster processing and the convenience of not moving the data out of database makes it very attractive for ML engineers and data scientists to run analytics inside an MPP database like Netezza®.

How ibmdbpy4nps is different from Netezza in-database analytics

Netezza in-database analytics (INZA) is an extremely powerful and comprehensive analytics package that provides several SQL routines sufficient for handling most ML steps. However, users are limited to what’s available in the package. For example, a user may want to use the latest algorithm available in his favorite Python ML library, rather than the available INZA algorithms for solving an ML problem. Or he might want to apply some custom transformation on the dataset, which is not available in INZA. That’s where the ibmdbpy4nps package comes in. Built on top of the Netezza analytics executable technology, it allows the user to execute custom ML code directly inside the database through simple Python interfaces. With ibmdpy4nps, there is flexibility (users are not constrained in terms of what to execute inside the database), as well as speed (you still don’t run it at the client end but rather inside the database). Users can also connect to database tables with pandas dataframe-style abstraction and use it for data exploration as well (SQL translations for several standard pandas dataframe operations are already included).

An architectural overview is shown below.

flow

The three architectural layers together (explained below) provide the user with a seamless experience in pushing the client code as analytics executables inside the database. Users will need to interact only with the analytics executable client layer through Python module invocations provided in the ibmdbpy4nps package.

  • Client layer — Provides the Python modules users can import and invoke with the required parameters.
  • Generation layer — Translates the client’s Python invocation into analytics executable-based code and adds the SQL needed for invoking the pre-existing user-defined functions.
  • Execution layer — Provides the pre-existing user-defined functions, executes the SQL, and runs the analytics executable launcher that handles the execution.

What this package provides for in-database analytics

  1. Allows the user to connect to database tables with a pandas-style dataframe abstraction.
  2. Supports in-database data exploration using the built-in SQL translations (again using the pandas dataframe style).
  3. Supports the execution of custom ML code directly in the database (through simple Python interfaces)

Use cases

Scenarios

While it is technically feasible to push any custom code in the database, it may not be optimal all the time. Netezza is known for its massively parallel processing capabilities. Data is usually distributed on different data slices when the table is created, and SQL operations are executed in parallel on each of these data slices (on worker nodes) before they get aggregated on the host. It is ideal when the use case is targeted to leverage Netezza’s parallelism. Some sample scenarios:

  1. Let’s say you need to apply a custom data transformation on each and every record of the database table. This does not require data aggregation in one place before the routine gets applied and so it can be executed in parallel on different data slices. This is an optimal scenario.
  2. Let’s say you need to build an ML model against the entire dataset. This requires that the data be aggregated in one place before applying the model building code. This limits Netezza’s scope to run things in parallel. This is not an optimal scenario.
  3. Let’s say your data can be partitioned, and your goal is to build ML models for each partition, with each being an independent dataset. This does not require data aggregation in one place before the model building routine gets applied. You can exploit Netezza parallelism by building models for each partition in parallel. This is an optimal scenario.
  4. Let’s say you need to explore the data, such as gathering statistics on the dataset. This requires that the data be aggregated in one place before the routine gets applied and limits Netezza’s scope to run things in parallel. This is not an optimal scenario. However, given that this is a common task, we have already provided in-database SQL implementations for most data exploration operations with pandas dataframe abstraction, and users can benefit from those.

Installation

  1. On client side (in a Python environment): Install ibmdbpy4nps package using pip install ibmdbpy4nps.
  2. On server side (Netezza server): Install any INZA version starting with 11.2.1.0: INZA customizations (AE launcher, template, preregistered SQL functions, etc.) to support ibmdbpy4nps package invocations are available in INZA versions starting in 11.2.1.0.
  3. Set up a ODBC or JDBC connection at client side

ODBC connection: Set up and configure a ODBC data source connection following the steps in Installing and Configuring ODBC section.

For example, the image below (in Windows) shows a data source “weather” that is set up for a “weather” database. flow

JDBC connection: The IBM Knowledge Center includes a description of how to install the Netezza JDBC driver on your client in the section Installing and configuring JDBC. Once you have downloaded and installed the file nzjdbc3.jar, you have to include its location in the value of the CLASSPATH environment variable with:

export CLASSPATH=<path-to-nzjdbc3.jar>:$CLASSPATH

Connecting to the database

Users can access a database table with a dataframe abstraction as shown below:

Import packages

from ibmdbpy4nps, `import IdaDataBase, IdaDataFrame

Connect to a database (weather is the data source name)

idadb = IdaDataBase('weather', 'admin', 'password')

Connect to a table within the database (WEATHER is the database table name)

idadf = IdaDataFrame(idadb, 'WEATHER')

Note: Following are the steps to load the weather CSV into the weather table on Netezza server.

  1. Create a table:
    create table weather (date Date, Location VARCHAR(100), MinTemp REAL, MaxTemp REAL, Rainfall REAL, Evaporation REAL,Sunshine REAL, WindGustDir VARCHAR(20), WindGustSpeed INTEGER, WindDir9am VARCHAR(10), WindDir3pm VARCHAR(10), WindSpeed9am INTEGER, WindSpeed3pm INTEGER, Humidity9am INTEGER, Humidity3pm INTEGER, Pressure9am REAL, Pressure3pm REAL, cloud9am VARCHAR(10), cloud3pm VARCHAR(10), Temp9am REAL, Temp3pm REAL, RainToday VARCHAR(10), RISK_MM REAL, RainTomorrow VARCHAR(10));
    
  2. Load the CSV into the table:
    nzload -df weatherAUS.csv -t weather -db weather -pw password -nullValue NA  -boolStyle Yes_No -skipRows 1 -delim , -dateStyle MDY -dateDelim '/'
    
  3. For tables that do not have an identity column (like the weather table above), create an identity column and set the value with rowid:
            alter table weather add column id bigint; 
            update weather set id=rowid;
    

Exploring data with built-in SQL translations

This section corresponds to the fourth use case where the user is interested in exploring or gathering some statistics on the data. The ibmdbpy4nps package builds on top of the ibmdbpy package and supports several built-in pandas-style dataframe operations. A few examples are shown below. For more information, refer to PyPI.org.

idadf.head() — Returns the first n rows

Figure

idadf.describe() — Returns various statistics on the columns

Figure

idadf.corr() — Returns pair-wise correlation of columns

Figure

The code snippet above generates a visual heatmap using the correlation values (just like a local pandas df.corr()).

Executing custom analytics/ML functions inside the database

In this section, you will see how to execute custom ML code inside the database (use cases 1-3).

NZInstall – Install packages on Netezza

One important utility developers need is the ability to install Python packages before using those in their ML functions. NZInstall helps with that. It takes in a package name and returns an output code to indicate whether package is installed or not. Output of 0 indicates that the package is successfully installed on Netezza.

from ibmdbpy4nps.ae.install import NZInstall
# specify the package_name depending on your requirement
package_name=’pandas’ 
idadb = IdaDataBase('weather', 'admin', 'password')
nzinstall = NZInstall(idadb, package_name)
result = nzinstall.getResultCode()

NZFunApply

Apply function on each row of the table data. This corresponds to the first use case, where the user is interested in applying a custom data transformation on each record of the database table.

Sample scenario: Convert the MAXTEMP column in the weather table from Celsius to Fahrenheit

The user function you want to execute can assume two parameters by default: self, which represents the analytics-executable context, and x, which represents the row of the table. You can use x to operate on selected columns. In the example below, the third column (x[2]) was retrieved and processed further to generate a new value. For the columns that you would want to generate as output, build them as a list and use self.output to populate the result. Alternatively, if you have only a single result, you can directly send it to self.output.

from ibmdbpy4nps import IdaDataBase, IdaDataFrame
from ibmdbpy4nps.ae import  NZFunApply

idadb = IdaDataBase('weather', 'admin', 'password', verbose=True)

idadf = IdaDataFrame(idadb, 'WEATHER')


def apply_fun(self, x):
            from math import sqrt
            max_temp = x[3]
            id = x[24]
            fahren_max_temp = (max_temp*1.8)+32
            row = [id, max_temp,  fahren_max_temp]
            self.output(row)

output_signature = {'ID': 'int', 'MAX_TEMP': 'float', 'FAHREN_MAX_TEMP': 'float'}
nz_apply = NZFunApply(df=idadf, fun_ref = code_str_apply, output_table="temp_conversion",output_signature=output_signature, merge_output_with_df=True)
result_idadf = nz_apply.get_result()
result = result_idadf.as_dataframe()
idadb.drop_table(result_idadf.tablename)
print(result)

In notebook environments where you cannot send the function as reference, wrap your function in quotes and assign it to a string variable. If your function code is being sent as a string, you have to mention the function name as well in the NZApply call. An example is provided below.

Note: To generate indented analytics-executable code for the back-end SQL function, you need to have the function name immediately after the quotes, but not on the next line.

from ibmdbpy4nps import IdaDataBase, IdaDataFrame
from ibmdbpy4nps.ae import  NZFunApply

idadb = IdaDataBase('weather', 'admin', 'password', verbose=True)

idadf = IdaDataFrame(idadb, 'WEATHER')

code_str_apply = """def apply_fun(self, x):
    from math import sqrt
    max_temp = x[3]
    id = x[24]
    fahren_max_temp = (max_temp*1.8)+32
    row = [id, max_temp,  fahren_max_temp]
    self.output(row)
    """
output_signature = {'ID': 'int', 'MAX_TEMP': 'float', 'FAHREN_MAX_TEMP': 'float'}
nz_apply = NZFunApply(df=idadf, code_str=code_str_apply, fun_name='apply_fun', output_table="temp_conversion",output_signature=output_signature, merge_output_with_df=True)
result_idadf = nz_apply.get_result()
result = result_idadf.as_dataframe()
idadb.drop_table(result_idadf.tablename)
print(result)

Expected results (142193 rows x 27 columns):

MAX_TEMP FAHREN_MAX_TEMP DATE RISK_MM RAINTOMORROW ID
0 27.700001 81.860001 2008-12-16 0.0 No 448014
1 33.000000 91.400002 2008-12-22 0.0 No 448020
2 32.700001 90.860001 2008-12-28 0.0 No 448026
3 28.799999 83.839996 2009-01-03 0.0 No 448032
4 28.400000 83.120003 2009-01-09 0.0 No 448038
142188 32.900002 91.220001 2015-12-02 0.0 No 589631
142189 37.099998 98.779999 2015-12-08 0.2 No 589637
142190 39.500000 103.099998 2015-12-14 3.8 Yes 589643
142191 30.299999 86.540001 2015-12-20 4.8 Yes 589649
142192 36.299999 97.339996 2016-02-06 0.0 No 589692

NZFunTApply

This corresponds to the second use case where the user is interested in executing a complex function — building an ML model against the entire available rows, for example. This leads us to two variations as described in the following sections, building a model against the slice data or building a model against the entire table data.

a) Apply the function on each slice of the data

A data slice is a piece of the table data. Netezza distributes data to different slices based on the column specified at the table creation time. If no column was specified, the first column would be considered the distribution column. Hence, it is important to first identify how the data was distributed before using NZFunTApply. Otherwise, you may have unintended results with the function being applied to slices you didn’t intend.

Sample scenario: Transform the data, build an ML model, and measure the accuracy of the model

Let’s use the weather dataset for writing a function that addresses the above scenario. The weather dataset has 10 years of daily weather observations from multiple locations across Australia. There are 48 unique locations in the dataset, and RainTomorrow is the target variable to predict (did it rain the next day?). The next-day rain prediction is done by training classification models on the target variable RainTomorrow. Our goal is to write a function to perform three steps: transform the data (impute the columns by assigning default values for null values), build an ML model (build a decision-tree classifier for the transformed data), then measure the accuracy (compute a three-fold CV accuracy score).

Note that the function gets two parameters by default: self (which represents the analytics-executable context) and df (dataframe for the incoming slice data). The resulting size of the dataset, location of the first record, and accuracy is printed with self.output.

from ibmdbpy4nps import IdaDataBase, IdaDataFrame
from ibmdbpy4nps.ae import  NZFunTApply

idadb = IdaDataBase('weather', 'admin', 'password', verbose=True)

idadf = IdaDataFrame(idadb, 'WEATHER')

code_str_host_spus="""def decision_tree_ml(self, df):
    from sklearn.model_selection import cross_val_score
    from sklearn.impute import SimpleImputer
    from sklearn.tree import DecisionTreeClassifier

    from sklearn.preprocessing import LabelEncoder
    import numpy as np

    location = df.LOCATION[0]

    # data preparation
    imputed_df = df.copy()
    ds_size = len(imputed_df)
    imputed_df['CLOUD9AM'] = imputed_df.CLOUD9AM.astype('str')
    imputed_df['CLOUD3PM'] = imputed_df.CLOUD3PM.astype('str')
    imputed_df['SUNSHINE'] = imputed_df.SUNSHINE.astype('float')
    imputed_df['EVAPORATION'] = imputed_df.EVAPORATION.astype('float')


    #remove columns which have only null values
    columns = imputed_df.columns
    for column in columns:
        if imputed_df[column].isnull().sum()==len(imputed_df):
            imputed_df=imputed_df.drop(column, 1)

    columns = imputed_df.columns

    for column in columns:

        if (imputed_df[column].dtype == 'float64' or imputed_df[column].dtype == 'int64'):
            imp = SimpleImputer(missing_values=np.nan, strategy='mean')
            imputed_df[column] = imp.fit_transform(imputed_df[column].values.reshape(-1, 1))

        if (imputed_df[column].dtype == 'object'):
            # impute missing values for categorical variables
            imp = SimpleImputer(missing_values=None, strategy='constant', fill_value='missing')
            imputed_df[column] = imp.fit_transform(imputed_df[column].values.reshape(-1, 1))
            imputed_df[column] = imputed_df[column].astype('str')
            le = LabelEncoder()
            #print(imputed_df[column].unique())

            le.fit(imputed_df[column].unique())
            # print(le.classes_)
            imputed_df[column] = le.transform(imputed_df[column])



    X = imputed_df.drop(['RISK_MM', 'RAINTOMORROW'], axis=1)
    y = imputed_df['RAINTOMORROW']

    # Create a decision tree
    dt = DecisionTreeClassifier(max_depth=5)

    cvscores_3 = cross_val_score(dt, X, y, cv=3)

    self.output(ds_size, location, np.mean(cvscores_3))
"""

Since we want to apply the function on each data slice, we will choose parallel=True in the module invocation.

output_signature = {'DATASET_SIZE': 'int', 'LOCATION':'str', 'CLASSIFIER_ACCURACY':'double'}

nz_fun_tapply = NZFunTApply(df=idadf, code_str=code_str_host_spus, fun_name ="decision_tree_ml", parallel=True, output_signature=output_signature)
result = nz_fun_tapply.get_result()
result_idadf = nz_apply.get_result()
result = result_idadf.as_dataframe()
idadb.drop_table(result_idadf.tablename)
print(result)

Notice that we have multiple rows in the result (corresponding to the number of assigned data slices at the time of table creation). The first column is the size of the dataset, the second column is the location of the first record in the dataset, and the third column is the accuracy of the classifier built.

DATASET_SIZE LOCATION CLASSIFIER_ACCURACY
0 23673 Albury 0.824822
1 23734 Albury 0.827126
2 23686 Albury 0.813898
3 23706 Albury 0.818485
4 23739 Albury 0.832175
5 23655 Albury 0.826168

b) Function to apply on the dataset

It is possible that you want the function to be applied to the entire dataset and not on slices. This means that data needs to be aggregated in a single place before the function can be applied. This is not an optimal scenario, but we do provide this option (use parallel=False) for the sake of completeness.

output_signature = {'DATASET_SIZE': 'int', 'LOCATION':'str', 'CLASSIFIER_ACCURACY':'double'}
nz_fun_tapply = NZFunTApply(df=idadf, code_str=code_str_host_spus, fun_name ="decision_tree_ml", parallel=False, output_signature=output_signature)
result = nz_fun_tapply.get_result()
result_idadf = nz_apply.get_result()
result = result_idadf.as_dataframe()
idadb.drop_table(result_idadf.tablename)
print(result)

Because we chose parallel=False to perform the function on the complete dataset, you see only one row in the result. The first column is the size of the dataset, the second column is the location of the first record in the dataset, and the third column is the accuracy of the classifier built.

DATASET_SIZE LOCATION CLASSIFIER_ACCURACY
142193 Albury 0.827115

Note: For complex functions, it is better to first test those on the client before running on the server. One could download a small subset (say, 1,000 records) with a select query (select * from weather limit 1000), test the function on the client dataframe, then push the function to the server to execute against the entire dataset.

NZFunGroupedApply

Apply the function on each partition that gets computed at runtime based on the user’s selection.

This section corresponds to the fourth use case where the user is interested in executing a complex function, such as building ML models for the partitions of his choice.

In this scenario, your function will be applied on each partition computed at runtime using the input column (index parameter) you provide to NZFunGroupedApply. Netezza data slices will be regenerated at runtime such that each slice contains one or more groups of only the specified column. Since this doesn’t require data to be aggregated in one place before applying the function, this is an optimal scenario. Further, a real-world ML setting needs control in defining groups. So this is the most recommended option in harnessing Netezza parallelism for complex ML functions. Each and every group/partition is treated as independent dataset and the function is executed against these partitions in parallel.

While NZFunTApply also applies the function to the data slices in parallel (parallel=True) option, there are some differences:

  • NZFunTApply uses static slices, meaning that those slices may not be the required data arrangement for your scenario.
  • The function is executed against the entire slice and not on the groups in the slices.
Sample scenario: Transform the data, build an ML model, and score the model

Let’s use the weather dataset again for writing a function that addresses the above scenario. Remember that there are 48 unique locations (resulting in 48 partitions) in the dataset and RainTomorrow is the target variable to predict. Our goal is to write a function to perform three steps for each partition: transform the data (impute the columns by assigning default values for null values), build an ML model (build a decision-tree classifier for the transformed data), then score the model (predict the values for RAINTOMORROW).

Note that the function gets two parameters by default: self, which represents the analytics-executable context) and df (dataframe for the incoming slice data); the resulting ID, size of the dataset, location column value of the first record, and prediction value is printed with self.output.

from ibmdbpy4nps import IdaDataBase, IdaDataFrame
from ibmdbpy4nps.ae import  NZFunGroupedApply

idadb = IdaDataBase('weather', 'admin', 'password', verbose=True)

idadf = IdaDataFrame(idadb, 'WEATHER')

code_str_host_spus="""def decision_tree_ml(self, df):
            from sklearn.model_selection import cross_val_score
            from sklearn.impute import SimpleImputer
            from sklearn.tree import DecisionTreeClassifier
            from sklearn.model_selection import train_test_split

            from sklearn.preprocessing import LabelEncoder
            import numpy as np



            # data preparation
            imputed_df = df.copy()
            ds_size = len(imputed_df)
            temp_dict = dict()


            columns = imputed_df.columns

            for column in columns:
                if column=='ID':
                    continue

                if (imputed_df[column].dtype == 'float64' or imputed_df[column].dtype == 'int64'):
                  if imputed_df[column].isnull().sum()==len(imputed_df):
                     imputed_df[column] = imputed_df[column].fillna(0)

                  else :

                     imp = SimpleImputer(missing_values=np.nan, strategy='mean')
                     transformed_column = imp.fit_transform(imputed_df[column].values.reshape(-1, 1))         
                     imputed_df[column] = transformed_column

                if (imputed_df[column].dtype == 'object'):
                    # impute missing values for categorical variables
                    imp = SimpleImputer(missing_values=None, strategy='constant', fill_value='missing')
                    imputed_df[column] = imp.fit_transform(imputed_df[column].values.reshape(-1, 1))
                    imputed_df[column] = imputed_df[column].astype('str')
                    le = LabelEncoder()

                    le.fit(imputed_df[column])
                    # print(le.classes_)
                    imputed_df[column] = le.transform(imputed_df[column])
                    temp_dict[column] = le



            # Create a decision tree
            dt = DecisionTreeClassifier(max_depth=5)
            X = imputed_df.drop(['RISK_MM', 'RAINTOMORROW'], axis=1)
            y = imputed_df['RAINTOMORROW']
            X_train, X_test, y_train, y_test = train_test_split(X,y, test_size = 0.25, random_state=42, stratify=y)


            dt.fit(X_train, y_train)

            accuracy = dt.score(X_test, y_test)    

            pred_df = X_test.copy()

            y_pred= dt.predict(X_test)

            pred_df['RAINTOMORROW'] = y_pred
            pred_df['DATASET_SIZE'] = ds_size
            pred_df['CLASSIFIER_ACCURACY']=round(accuracy,2)

            original_columns = pred_df.columns

            for column in original_columns:

             if column in temp_dict:   
               pred_df[column] = temp_dict[column].inverse_transform(pred_df[column])
               #print(pred_df)

            def print_output(x):
                row = [x['ID'], x['RAINTOMORROW'], x['DATASET_SIZE'], x['CLASSIFIER_ACCURACY']]
                self.output(row)


            pred_df.apply(print_output, axis=1)


"""


output_signature = {'ID':'int', 'RAINTOMORROW_PRED' :'str',  'DATASET_SIZE':'int', 'CLASSIFIER_ACCURACY':'float'}

nz_groupapply = NZFunGroupedApply(df=idadf,  code_str=code_str_host_spus, index='LOCATION', fun_name="decision_tree_ml", output_signature=output_signature, merge_output_with_df=True)

result_idadf = nz_apply.get_result()
result = result_idadf.as_dataframe()
idadb.drop_table(result_idadf.tablename)
print(result)

You should see a result like the one shown below. Notice that the result columns were merged with the original df columns with the merge_output_with_df=True option.

Figure

NZInstall

Install packages on Netezza. One important utility developers need is the ability to install Python packages before using those in their ML functions. NZInstall helps with that. It takes in a package name and returns an output code to indicate whether package is installed. An output of 0 indicates that the package is successfully installed on Netezza.

Example:

from ibmdbpy4nps.ae.install import NZInstall
idadb = IdaDataBase('weather', 'admin', 'password')
nzinstall = NZInstall(idadb, package_name)
result = nzinstall.getResultCode()

Conclusion

In this article, we have shown how to push custom ML into Netezza (whether on-premise or cloud version). As shown in the examples, with ibmdbpy4nps, users will be able to seamlessly run their custom code inside the database as they would have run inside their favorite IDEs or Notebook environments. This offers users the performance of in-database analytics, the convenience of not moving data out of database, and the flexibility of coding custom Python functions. We conclude that users should consider the push-down approach for their use-case if:

  • Data has partitions and if each partition is treated as an independent dataset.
  • Model building or analytics needs to be executed on such partitioned datasets in parallel.
  • Partition count is high enough (greater than or equal to the number of worker nodes in the Netezza configuration) to harness the parallelism.