Tutorial

Building custom data preparation transforms in Data Prep Kit (DPK)

Streamline data preparation for your AI applications by building custom transforms in DPK

By

Maroun Touma,

Shahrokh Daijavad

In the Data Prep Kit (DPK), a transform defines one stage of the data preparation pipeline. Transforms can be used for annotating a document, filtering a document, or otherwise transforming its content.

There are two types of transforms: Ingest transforms and Processing transforms.

  • An Ingest transform takes in one or more documents in their native formats (such as html, pdf, or markdown) and produces a parquet file.
  • A Processing transform takes in a parquet file and produces a parquet file.

In general, within a parquet file, each row holds a document and while transforms receive a bulk table (from parquet), they operate for the most part at a row level on each document.

While Data Prep Kit includes several built-in transforms, you can also build custom data preparation transforms to meet your needs.

In this tutorial, you will implement a Processing transform, called digest, that calculates a signature value for a document and stores the signature as part of the metadata associated with the document.

For the purposes of this tutorial, we will focus on the transform structure and logic and integration with Python runtime. Use of the Ray runtime, unit testing, integration with the CI/CD, and integration with Kubeflow pipelines will be covered in a separate tutorial.

Prerequisites

  • A public GitHub account.
  • Add a public SSH key to your public GitHub account. Follow the instructions in the GitHub docs.

Steps

Step 1. Clone the repo

Clone the repo using git or http URL. Fork the repo and clone the fork locally. This can be done from the github repo using the web browser or using the one of the following commands:

$ git clone git@github.com:IBM/data-prep-kit.git

Or:

$ git clone https://github.com/IBM/data-prep-kit.git

Step 2. Create a folder for the new transform

The DPK transforms are currently organized in three categories: code (transforms that are used specifically for programming languages), language (transforms that are used specifically for natural languages), and universal (transforms that are used for both language and code).

Because our transform will be used for calculating the hash for natural languages text and programming languages alike, we will add it to the universal subfolder. We will also create the python module, named dpk_digest, and a skeleton of the code including a notebook and readme.md file to this subfolder.

A typical implementation would have the following file structure:

data-prep-kit
│
└───transforms
│   |
│   └───universal
│       │
│       └───digest
│            |
│            └───dpk_digest
│            |      │
│            |      │ __init__.py
│            |      │ transform.py
│            |      | runtime.py
│            │      |
│            | requirements.txt
│            | digest.ipynb
│            | README.md

Although our transform does not require additional packages, we need to create an empty requirements.txt file.

cd data-prep-kit/transforms/universal/digest
touch requirements.txt

Step 3. Implement AbstractTableTransform

The dpk_digest/transform.py file implements the key logic for the transform. It receives a pyarrow table with a list of documents in the data set and appends a new column with a digest value. We will describe the contents of the file in 2 sections.

The first portion of the file includes a set of import statements for the library modules that will be needed for invoking this transform.

from typing import Any

import pyarrow as pa
import hashlib

from data_processing.transform import AbstractTableTransform
from data_processing.utils import TransformUtils

AbstractTableTransform defines a template for the APIs that will be invoked by the runtime to trigger the transform. TransformUtils provides a number of shortcuts commonly used by the transforms for data conversions, table manipulations, etc.

The second section of the file implements the Transform method that will be called by the framework when new data is available for annotation.


class DigestTransform(AbstractTableTransform):
    def __init__(self, config: dict[str, Any]):
        super().__init__(config)

        ## If the algorithm is not specified, use sha256
        self.algorithm = config.get('digest_algorithm', "sha256")

    def transform(self,
                  table: pa.Table,
                  file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]:

        ## calculate digest for the content column
        tf_digest = []
        for elt in table['contents'].to_pylist():
            h = hashlib.new(self.algorithm)
            h.update(elt.encode('utf-8'))
            tf_digest.append(h.hexdigest())

        ## digest as a new column to the existing table
        table = TransformUtils.add_column(table=table,
                                           name='digest',
                                           content=tf_digest)

        metadata = {"nrows": len(table)}
        return [table], metadata

Finally, we implement a class constructor and a transform method:

  • The _init_ class constructor receives a dictionary that represents the different configuration parameters specified by the user. In our case, the only parameter used is the string value representing the name of digest. If the user does not specify a digest, we will use default value of "sha256".
  • The transform() method implements the callback that the runtime uses when it identifies new data to be processed by this transform. It receives a pyarrow table, calculates the digest for each row in the table and appends the digest as a new column to the same table.

Step 4. Implement TransformConfiguration

The dpk_digest/runtime.py file implements 3 classes, the first being TransformConfiguration. It defines two user defined methods that must be implemented by the developer for each transform:

  • The add_input_params() method is called by the framework to validate the presence of all required configuration parameters for this transform and specifies guidance to the user if any is missing
  • The apply_input_params() method is called by the framework to validate the values associated with the configuration parameter.
import sys
from dpk_digest.transform import DigestTransform
from data_processing.transform import TransformConfiguration
from data_processing.utils import ParamsUtils, CLIArgumentProvider, get_logger
from argparse import ArgumentParser, Namespace
from data_processing.runtime.pure_python import PythonTransformLauncher
from data_processing.runtime.pure_python.runtime_configuration import (
    PythonTransformRuntimeConfiguration,)

logger = get_logger(__name__)

class DigestConfiguration(TransformConfiguration):
    def __init__(self):
        ## Identify the transform implementation that this configuration relates to
        super().__init__(name='digest', transform_class=DigestTransform,)

    def add_input_params(self, parser: ArgumentParser) -> None:
        ## Define how varius configuration parameters should be parsed
        parser.add_argument(
            "--digest_algorithm",
            type=str,
            help="Specify the hash algorithm to use for calculating the digest value.",
        )

    def apply_input_params(self, args: Namespace) -> bool:
        ## Validate each of the configuration parameters received from the user
        captured = CLIArgumentProvider.capture_parameters(args, 'digest', True)
        self.params = self.params | captured
        if captured.get('digest_algorithm') not in ['sha256','SHA512', 'MD5']:
            logger.error(f"Parameter digest_algorithm cannot be other than ['sha256','SHA512', 'MD5']. \
            You specified {args.digest_algorithm}")
            return False
        return True

Step 5. Implement PythonTransformConfiguration

The dpk_digest/runtime.py file also implements two other classes: DigestRuntime and Digest.

The DigestRuntime class implements PythonTransformRuntimeConfiguration and wires the transform into the Python orchestrator and allows the framework to instantiate, configure, and invoke the transform.


class DigestRuntime(PythonTransformRuntimeConfiguration):

    def __init__(self):
        super().__init__(transform_config=DigestConfiguration())

if __name__ == "__main__":
    launcher = PythonTransformLauncher(DigestRuntime())
    launcher.launch()

The Digest class implements a wrapper that simplifies how the transform is invoked and hides some of the complexity that is inherited by the runtime orchestrator.

class Digest:
    def __init__(self, **kwargs):
        self.params = {}
        for key in kwargs:
            self.params[key] = kwargs[key]
        # if input_folder and output_folder are specified, then assume it is represent data_local_config
        try:
            local_conf = {k: self.params[k] for k in ("input_folder", "output_folder")}
            self.params["data_local_config"] = ParamsUtils.convert_to_ast(local_conf)
            del self.params["input_folder"]
            del self.params["output_folder"]
        except:
            pass

    def transform(self):
        sys.argv = ParamsUtils.dict_to_req(d=(self.params))
        launcher = PythonTransformLauncher(DigestRuntime())
        return_code = launcher.launch()
        return return_code

Step 6. Run the transform

You can run the transform in a notebook or via the CLI.

The notebook should show how to run the notebook from the current folder. This is a simple notebook for our digest transform.

To run the transform from the CLI, you need to set up venv with dependencies for testing and then run the transform with –help option:


$ python -m venv venv && source venv/bin/activate

$ pip install "data-prep-toolkit>=0.2.3" pandas

$ python -m dpk_digest.runtime --help

For testing, we will use a simple parquet test file, which can be found in the cloned repo, under the test-data/input directory. The following Python script shows a portion of this test file content with a document_id and a column. Let’s take a look at the input file:

$ python -c "import pandas as pd; import pyarrow.parquet as pq; \
print(pq.read_table('test-data/input/test1.parquet').to_pandas()\
[['document_id','contents']].head(5))"

Output:

Input file, a simple parquet test file

Let’s run the transform:

$python -m dpk_digest.runtime \
--digest_algorithm sha256 \
--data_local_config "{ 'input_folder' : 'test-data/input', 'output_folder' : 'output'}"

We should see the result of the run:

Results from running our digest transform

We now check the results and confirm that the output parquet contains a digest column for each row.

$python -c "import pandas as pd; import pyarrow.parquet as pq; \
print(pq.read_table('output/test1.parquet').to_pandas()\
[['document_id','contents','digest']].head(5))"

Output:

Output parquet file containing a digest column for each row

Step 7. Provide a README.md file

The README file for the transform should have, at a minimum, the following sections:

  • Summary
  • Contributors
  • Configuration
  • Command line options, including an example of how to run from command line
  • A link to a notebook.

If applicable, it should have more sections on troubleshooting, transforming data using the transform image, and sections on Ray or Spark versions of the transform. See this minimal README.md for our digest transform.

Summary

This tutorial described how to build a custom data preparation transform for DPK. The source code used for this tutorial can be found in this forked data-prep-kit repo.