Watson Data API

Beta

Watson Data API

+ Day(s) remaining in the trial

Overview

Watson Data API provides a collection of REST APIs for data analysis, governance, and management. With the Watson Data API you can create projects or catalogs to collaborate with your colleagues using, discovering, analyzing, mapping, and shaping data. Moreover, you can govern data to implement, check, and control policy activities and settings, all of which enables you to identify and manage possible risks concerning different areas or roles of various responsibilities

Getting started

Creating an IAM bearer token

# Creating an IAM bearer token Before you can call a Watson Data API you must first create an [IAM](https://console.bluemix.net/docs/iam/index.html#iamoverview) bearer token. Each token is valid only for one hour, and after a token expires you must create a new one if you want to continue using the API. The recommended method to retrieve a token programmatically is to create an API key for your IBM Cloud identity and then use the IAM token API to exchange that key for a token. You can create a token in IBM Cloud or by using the IBM Cloud command line interface (CLI). To create a token in IBM Cloud: 1. Sign in to [IBM Cloud](https://console.bluemix.net) and select **Manage > Security > Platform API Keys**. 1. Create an API key for your own personal identity, copy the key value, and save it in a secure place. After you leave the page, you will no longer be able to access this value. 1. With your API key, set up Postman or another REST API tool and run the following command: ``` curl "https://iam.ng.bluemix.net/identity/token" \ -d "apikey=your-saved-value&grant_type=urn%3Aibm%3Aparams%3Aoauth%3Agrant-type%3Aapikey" \ -H "Content-Type: application/x-www-form-urlencoded" \ -H "Authorization: Basic Yng6Yng="``` This returns something like: ``` { "access_token": "eyJraWQiOiIyMDE3MDgwOS0wMDowMDowMCIsImFsZyI6...", "refresh_token": "zmRTQFKhASUdF76Av6IUzi9dtB7ip8F2XV5fNgoRQ0mbQgD5XCeWkQhjlJ1dZi8K...", "token_type": "Bearer", "expires_in": 3600, "expiration": 1505865282 } ``` 1. Use the value of the `access_token` property for your Watson Data API calls. Set the `access_token` value as the authorization header parameter for requests to the Watson Data APIs. The format is `Authorization: Bearer `. For example: ```Authorization: Bearer eyJraWQiOiIyMDE3MDgwOS0wMDowMDowMCIsImFsZyI6IlJTMjU2In0... ``` To create a token by using the IBM Cloud CLI: 1. Follow the steps to install the CLI, log in to IBM Cloud, and get the token described [here](https://console.bluemix.net/docs/services/cloud-monitoring/security/auth_iam.html#auth_iam). Remove `Bearer` from the returned IAM token value in your API calls.

Versioning

Watson Data API has a major, minor, and patch version, following industry conventions on semantic versioning: Using the version number format MAJOR.MINOR.PATCH, the MAJOR version is incremented when incompatible API changes are made, the MINOR version is incremented when functionality is added in a backwards-compatible manner, and the PATCH version is incremented when backwards-compatible bug fixes are made. The service major version is represented in the URL path.

Sorting

Some of the Watson Data API collections provide custom sorting support. Custom sorting is implemented using the 'sort' query parameter. Service collections can also support single-field or multi-field sorting. The 'sort' parameter in collections that support single-field sorting can contain any one of the valid sort fields. For example, the following expression would sort accounts on company name (ascending):GET /v2/accounts?sort=company_name. You can also add a + or - character, indicating “ascending” or “descending,” respectively. For example, the expression below would sort on the last name of the account owner, in descending order:GET /v2/accounts?sort=-owner.last_name. The 'sort' parameter in collections that support sorting on multiple fields can contain a comma-separated sequence of fields (each, optionally, with a + or -) in the same format as the single-field sorting. Sorts are applied to the data set in the order that they are provided. For example, the expression below would sort accounts first on company name (ascending) and second on owner last name (descending): GET /v2/accounts?sort=company_name,-owner.last_name

Filtering

Some of the Watson Data API collections provide filtering support. You can specify one or more filters where each supported field is required to match a specific value for basic filtering. The query parameter names for a basic filter must exactly match the name of a primitive field on a resource in the collection or a nested primitive field where the '.' character is the hierarchical separator. The only exception to this rule is for primitive arrays. In primitive arrays, such as tags, a singular form of the field is supported as a filter that matches the resource if the array contains the supplied value. Some of the Watson Data API collections can also support extended filtering comparisons for the following field types: Integer and float, date and date/time, identifier and enumeration, and string.

Searching

Watson Data APIs provide basic searching support. Basic searching is standardized in two ways: tokenization and matching. In the tokenization of search queries, whitespace and punctuation characters in search queries can be treated as token delimiters and otherwise ignored. Quoted strings (using either single or double ASCII quotation marks) can be evaluated starting from the beginning of a query; each quoted string should result in a single token. The contents of a quoted string (including whitespace and punctuation) are preserved exactly in the resulting token. In the matching logic, a resource should, without regard to case, include all of the tokens resulting from a query across a predefined set of that resource’s fields.

Rate Limiting

The following rate limiting headers are supported by some of the Watson Data service APIs: 1. X-RateLimit-Limit: If rate limiting is active, this header indicates the number of requests permitted per hour; 2. X-RateLimit-Remaining: If rate limiting is active, this header indicates the number of requests remaining in the current rate limit window; 3. X-RateLimit-Reset: If rate limiting is active, this header indicates the time at which the current rate limit window resets, as a UNIX timestamp.

Error Handling

Responses with 400-series or 500-series status codes are returned when a request cannot be completed. The body of these responses follows the error model, which contains a code field to identify the problem and a message field to explain how to solve the problem. Each individual endpoint has specific error messages. All responses with 500 or 503 status codes are logged and treated as a critical failure requiring an emergency fix.

Connections

# Using the Connectivity service API endpoints # Use the following APIs to build your own tools to create and use connections to supported data sources. ## List data source types ## Data sources are where data can be written or read and might include relational database systems, file systems, object storage systems and others. To list supported data source types, call the following GET method: ```GET https://{service_URL}/v2/datasource_types``` The response to the GET method includes information about each of the sources and targets that are currently supported. The response includes a unique ID property value `metadata.asset_id`, name, and a label. The `metadata.asset_id` property value should be used for the data source in other APIs that reference a data source type. Additional useful information such as whether that data source can be used as a source or target (or both) is also included. Use the `connection_properties=true` query parameter to return a set of properties for each data source type that is used to define a connection to it. Use the `interaction_properties=true` query parameter to return a set of properties for each data source type that is used to interact with a created connection. Interaction properties for a relational database might include the table name and schema from which to retrieve data. Use the `_sort` query parameter to order the list of data source type returned in the response. A default maximum of 100 data source type entries are returned per page of results. Use the `_limit` query parameter with an integer value to specify a lower limit. More data source types than those on the first page of results might be available. Additional properties generated from the page size initially specified with `_limit` are returned in the response. Call a GET method using the value of the `next.href` property to retrieve the next page of results. Call a GET method using the value in the `prev.href` property to retrieve the previous page of results. Call a GET method using the value in the `last.href` property to retrieve the last page of results. These URIs use the `_offset` and `_limit` query parameters to retrieve a specific block of data source types from the full list. Alternatively, you can use a combination of the `_offset` and `_limit` query parameters to retrieve a custom block of results. For more information about this method see the [REST API Reference](https://console.ng.bluemix.net/apidocs/681?&language=node#introduction). ## Create a connection ## Connections to any of the supported data source types returned by the previous method can be created and persisted in a catalog or project. To create a connection, call the following POST method: ```POST https://{service_URL}/v2/connections``` A new connection can be created in a catalog or project. Use the `catalog_id` **or** `project_id` query parameter to specify where to create the connection asset. Either `catalog_id` or `project_id` is required. The request body for the method is a UTF-8 encoded JSON document and includes the data source type ID (obtained in the `List data source types` section), its unique name in the catalog or project space, and a set of connection properties specific to the data source. Some connection properties are required. The following example shows the request body used for creating a connection to IBM dashDB: ```json { "datasource_type": "cfdcb449-1204-44ba-baa6-9a8a878e6aa7", "name":"My-DashDB-Connection", "properties": { "host":"dashDBhost.com", "port":"50001", "database":"MYDASHDB", "password": "mypassword", "username": "myusername" } } ``` By default, the physical connection to the data source is tested when the connection is created. Use the `test=false` query parameter to disable the connection test. A response payload containing a connection ID and other metadata is returned when a connection is successfully created. Use the connection ID as path parameter in other REST APIs when a connection resource must be referenced. For more information about this method see the [REST API Reference](https://console.ng.bluemix.net/apidocs/681?&language=node#introduction). ## Discover connection assets ## Data sources contain data and metadata describing the data they contain. To discover or browse the data or metadata in a data source, call the following GET method: ```GET https://{service_URL}/v2/connections/{connection_id}/assets?path=``` Use the `catalog_id` **or** `project_id` query parameter to specify where the connection asset was created. Either `catalog_id` or `project_id` is required. `connection_id` is the ID of the connection asset returned from the `POST https://{service_URL}/v2/connections` method, which created the connection asset. The `path` query parameter is required and is used to specify the hierarchical path of the asset within the data source to be browsed. In a relational database, for example, the path might represent a schema and table. For a file object, the path might represent a folder hierarchy. Each asset in the assets array returned by this method includes a property containing its path in the hierarchy to facilitate the next call to drill down deeper in the hierarchy. For example, starting at the root path in an RDBMS will return a list of schemas: ```json { "path": "/", "asset_types": [ { "type": "schema", "dataset": false, "dataset_container": true } ], "assets": [ { "id": "GOSALES", "type": "schema", "name": "GOSALES", "path": "/GOSALES" }, ], "fields": [], "first": { "href": "https://wdp-dataconnect-ys1dev.stage1.mybluemix.net/v2/connections/4b28b5c1-d818-4ad2-bcf9-7de08e776fde/assets?catalog_id=75a3062b-e40f-4bc4-9519-308ee1b5b251&_offset=0&_limit=100" }, "prev": { "href": "https://wdp-dataconnect-ys1dev.stage1.mybluemix.net/v2/connections/4b28b5c1-d818-4ad2-bcf9-7de08e776fde/assets?catalog_id=75a3062b-e40f-4bc4-9519-308ee1b5b251&_offset=0&_limit=100" }, "next": { "href": "https://wdp-dataconnect-ys1dev.stage1.mybluemix.net/v2/connections/4b28b5c1-d818-4ad2-bcf9-7de08e776fde/assets?catalog_id=75a3062b-e40f-4bc4-9519-308ee1b5b251&_offset=100&_limit=100" } } ``` Drill down into the GOSALES schema using the `path` property for the GOSALES schema asset to discover the list of table assets in the schema. ```GET https://{service_URL}/v2/connections/{connection_id}/assets?catalog_id={catalog_id}&path=/GOSALES``` The list of table type assets is returned in the response. ```json { "path": "/GOSALES", "asset_types": [ { "type": "table", "dataset": true, "dataset_container": false } ], "assets": [ { "id": "BRANCH", "type": "table", "name": "BRANCH", "description": "BRANCH contains address information for corporate offices and distribution centers.", "path": "/GOSALES/BRANCH" }, { "id": "CONVERSION_RATE", "type": "table", "name": "CONVERSION_RATE", "description": "CONVERSION_RATE contains currency exchange values.", "path": "/GOSALES/CONVERSION_RATE" } ], "fields": [], "first": { "href": "https://wdp-dataconnect-ys1dev.stage1.mybluemix.net/v2/connections/4b28b5c1-d818-4ad2-bcf9-7de08e776fde/assets?catalog_id=75a3062b-e40f-4bc4-9519-308ee1b5b251&_offset=0&_limit=100" }, "prev": { "href": "https://wdp-dataconnect-ys1dev.stage1.mybluemix.net/v2/connections/4b28b5c1-d818-4ad2-bcf9-7de08e776fde/assets?catalog_id=75a3062b-e40f-4bc4-9519-308ee1b5b251&_offset=0&_limit=100" }, "next": { "href": "https://wdp-dataconnect-ys1dev.stage1.mybluemix.net/v2/connections/4b28b5c1-d818-4ad2-bcf9-7de08e776fde/assets?catalog_id=75a3062b-e40f-4bc4-9519-308ee1b5b251&_offset=100&_limit=100" } } ``` Use the `fetch` query parameter with a value of either `data`, `metadata`, or both. Data can only be fetched for *data set* assets. In the response above, note the `asset_type` has the property `type` value of **table**. Its `dataset` property value is **true**. This means that data can be fetched from table type assets. However, if you fetched assets from the connection root, the response would contain schema asset types, which are not data sets and thus fetching this data is not relevant. A default maximum of 100 metadata assets are returned per page of results. Use the `_limit` query parameter with an integer value to specify a lower limit. More assets than those on the first page of results might be available. Additional properties generated from the page size initially specified with `_limit` are returned in the response. Call a GET method using the value of the `next.href` property to retrieve the next page of results. Call a GET method using the value in the `prev.href` property to retrieve the previous page of results. Call a GET method using the value in the `last.href` property to retrieve the last page of results. These URIs use the `_offset` and `_limit` query parameters to retrieve a specific block of assets from the full list. Alternatively, use a combination of the `_offset` and `_limit` query parameters to retrieve a custom block of results. For more information about this method see the [REST API Reference](https://console.ng.bluemix.net/apidocs/681?&language=node#introduction). ## Discover assets using a transient connection ## A data source's assets can be discovered without creating a persistent connection. To browse assets without first creating a persistent connection, call the following POST method: ```POST https://{service_URL}/v2/connections/assets?path=``` This method is identical in behavior to the GET method in the `Discover connection assets`section except for two differences: 1. You define the connection properties in the request body of the REST API. You do not reference the connection ID of a persistent connection with a query parameter. The same JSON object used to create a persistent connection is used in the request body. 1. You do not specify a catalog or project ID with a query parameter. For more information about this method see the [REST API Reference](https://console.ng.bluemix.net/apidocs/681?&language=node#introduction). ## Update a connection ## To modify the properties of a connection, call the following PATCH method: ```PATCH /v2/connections/{connection_id}``` `connection_id` is the ID of the connection asset returned from the `POST https://{service_URL}/v2/connections` method, which created the connection asset. Use the `catalog_id` **or** `project_id` query parameter to specify where the connection asset was created. Either `catalog_id` or `project_id` is required. Set the `Content-Type` header to `application/json-patch+json`. The request body contains the connection properties to update using a JSON object in [JSON Patch format](http://jsonpatch.com). Change the port number of the connection and add a description using this JSON Patch: ```json [ { "op": "add", "path": "/description", "value": "My new PATCHed description" }, { "op":"replace", "path":"/properties/port", "value":"40001" } ] ``` By default, the physical connection to the data source is tested when the connection is modified. Use the `test=false` query parameter to disable the connection test. For more information about this method see the [REST API Reference](https://console.ng.bluemix.net/apidocs/681?&language=node#introduction). ## Delete a connection ## To delete a persistent connection, call the following DELETE method: ```DELETE /v2/connections/{connection_id}``` `connection_id` is the ID of the connection asset returned from the `POST https://{service_URL}/v2/connections` method, which created the connection asset. Use the `catalog_id` **or** `project_id` query parameter to specify where the connection asset was created. Either `catalog_id` or `project_id` is required.

Scheduling

# Introduction The Watson Data API provides scheduling support. The scheduling service is a micro-service which is deployed as part of the Watson Data API in IBM Bluemix. To use this service, Bluemix must be up and running. You can use the scheduling service to run a data flow, a notebook, a data profile, or any other given source more than once. The scheduling service is not tightly coupled with any other service in Watson Data. It supports various repeat types namely hour, day, week, month, and year with 2 repeat end options namely, end date and maximum number of runs. The scheduling service provides a collection of REST API's to create, update, delete, and get schedules. It also lets you list all the schedules in the catalog or project with a predefined limit by applying filtering and sorting techniques. ## Create a schedule To create a schedule in a specified catalog or project, call the following POST method: Http Method : POST URL :{GATEWAY_URL}/v2/schedules When a schedule is created, a message is published into the topic "v2.schedules.:guid.POST" on "WDP" exchange. ":guid" is the schedule_id of the created schedule. Before you create a schedule, you must consider the following points: 1. You must have a valid IAM token to make REST API calls and a project or catalog ID. 2. You must be authorized (be assigned the correct role) to create schedules in the catalog or project. 3. The start and end dates must be in the following format: `YYYY-MM-DDTHH:mm:ssZ` or `YYYY-MM-DDTHH:mm:ss.sssZ` (specified in RFC 3339). 4. The supported repeat types are `hour`, `day`, `week`, `month`, and `year`. 5. There are 2 repeat end options, namely `max_invocations` and `end_date`. 6. The supported repeat interval is 1. 7. There are 3 statuses for schedules, namely `enabled`, `disabled`, and `finished`. To create a schedule, the status must be `enabled`. The scheduling service updates the status to `finished` once it has finished running. You can stop or pause the scheduling service by updating the status to `disabled`. 8. You can update the endpoint URL in the target href. Supported target methods are POST, PUT, PATCH, DELETE, and GET. 9. Set `generate_iam_token=true`. When this option is set to true, the scheduling service generates an IAM token and passes it to the target URL at runtime. This IAM token is required to run schedules automatically at the scheduled intervals. This token is not to be confused with the IAM token required to make Watson Data REST API calls. This POST method creates a schedule in a catalog with a defined start and a given end date: { "catalog_id": "aeiou", "description": "aeiou", "name": "aeiou", "tags": ["aeiou"], "start_date": "2017-08-22T01:02:14.859Z", "status": "enabled", "repeat": { "repeat_interval": 1, "repeat_type": "hour" }, "repeat_end": { "end_date": "2017-08-24T01:02:14.859Z" }, "target": { "href": "https://wdp-profiling-ftq.stage1.mybluemix.net/v2/data_profiles?start=false", "generate_iam_token": true, "method": "POST", "payload": "aeiou", "headers": [ { "name": "content-type", "value": "application/json", "sensitive": false } ] } } For more information on the syntax and examples, see [Swagger documentation](https://wdp-scheduling-ftq.stage1.mybluemix.net/v2/schedules/api/explorer/#/) ## Get multiple schedules in a catalog or project To get all schedules in the specified catalog or project, call the following GET method: Http Method : GET URL :{GATEWAY_URL}/v2/schedules You need the following information to get multiple schedules: 1. A valid IAM token, the schedule ID, and the catalog or project ID. 2. You must be authorized to get schedules in the catalog or project. You can filter the returned results by using the options `entity.schedule.name` and `entity.schedule.status` and can filter matching types by using `StartsWith(starts:)` and `Equals(e:)`. You can sort the returned results either in ascending or descending order by using one or more of the following options: `entity.schedule.name`, `metadata.create_time`, and `entity.schedule.status`. For more information on the syntax and examples, see [Swagger documentation](https://wdp-scheduling-ftq.stage1.mybluemix.net/v2/schedules/api/explorer/#/) ## Get a schedule To get a schedule in the specified catalog or project, call the following GET method: Http Method : GET URL :{GATEWAY_URL}/v2/schedules/{schedule_id} You need the following information to get a schedule: 1. A valid IAM token, the schedule ID, and the catalog or project ID. 2. You must be authorized to get a schedule in the catalog or project. For more information, see [Usage of the catalog or project ID in the WDP Scheduling Service](https://github.ibm.com/dataconnect/wdp-scheduling/wiki/Usage-of-catalog_id---project_id-in-WDP-Scheduling-Service) ## Update a schedule To update a schedule in the specified catalog or project, call the following PATCH method: Http Method : PATCH URL :{GATEWAY_URL}/v2/schedules/{schedule_id} When a schedule is updated, a message is published into the topic "v2.schedules.:guid.PATCH" on "WDP" exchange. ":guid" represents the schedule_id of the updated schedule. You need the following information to update a schedule: 1. A valid IAM token, the schedule ID, and the catalog or project ID. 2. You must be authorized to update a schedule in the catalog or project. For more information, see [Usage of the catalog or project ID in the WDP Scheduling Service](https://github.ibm.com/dataconnect/wdp-scheduling/wiki/Usage-of-catalog_id---project_id-in-WDP-Scheduling-Service) You can update all the attributes under entity but can't update the attributes under metadata. Patch supports the `replace`, `add`, and `remove` operations. The `replace` operation can be used with all the attributes under entity. The `add` and `remove` operations can only be used with the repeat end options, namely `max_invocations` and `end_date`. The start and end dates must be in the following format: `YYYY-MM-DDTHH:mm:ssZ` or `YYYY-MM-DDTHH:mm:ss.sssZ` (specified in RFC 3339). This PATCH method replaces the repeat type, removes the max invocations and adds an end date: [ { "op": “remove”, "path": "/entity/schedule/repeat_end/max_invocations", "value": 20 }, { "op": “add”, "path": "/entity/schedule/repeat_end/end_date”, "value": date }, { "op": "replace", "path": "/entity/schedule/repeat/repeat_type", "value": "week" } ] More information on the syntax and examples, see [Swagger documentation](https://wdp-scheduling-ftq.stage1.mybluemix.net/v2/schedules/api/explorer/#/) ## Delete a schedule To delete a schedule in the specified catalog or project, call the following DELETE method: Http Method : DELETE URL :{GATEWAY_URL}/v2/schedules/{schedule_id} When a schedule is deleted, a message is published into the topic "v2.schedules.:guid.DELETE" on "WDP" exchange. ":guid" represents the schedule_id of the deleted schedule. You need the following information to delete a schedule: 1. A valid IAM token, the schedule ID, and the catalog or project ID. 2. You must be authorized to delete a schedule in the catalog or project. For more information, see [Usage of the catalog or project ID in the WDP Scheduling Service](https://github.ibm.com/dataconnect/wdp-scheduling/wiki/Usage-of-catalog_id---project_id-in-WDP-Scheduling-Service) ## Delete multiple schedules To delete multiple schedules in the specified catalog or project, call the following DELETE method: Http Method : DELETE URL :{GATEWAY_URL}/v2/schedules When a schedule is deleted, a message is published into the topic "v2.schedules.:guid.DELETE" on "WDP" exchange. ":guid" represents the schedule_id of the deleted schedule. You need the following information to delete multiple schedules: 1. A valid IAM token, the schedule ID, and the catalog or project ID. 2. You must be authorized to delete schedules in the catalog or project. For more information, see [Usage of the catalog or project ID in the WDP Scheduling Service](https://github.ibm.com/dataconnect/wdp-scheduling/wiki/Usage-of-catalog_id---project_id-in-WDP-Scheduling-Service) 3. A comma-separated list of the schedule IDs. If schedule IDs are not listed in the parameter `schedule_ids`, the scheduling service will delete all the schedules in the catalog or project. For more information on the syntax and examples, See [Swagger documentation](https://wdp-scheduling-ftq.stage1.mybluemix.net/v2/schedules/api/explorer/#/)

Profiling

# Introduction The Watson Data API provides data profiling support. The profiling service is a micro-service which is deployed as part of the Watson Data API in IBM Bluemix. To use this service, Bluemix must be up and running. You can use the profiling service APIs to create data profiles which include the classification of your data and information about the distribution of your data, which helps you to understand your data better and make the appropriate data shaping decisions. The profiling service is automatically triggered when a data set is added to a governed catalog and a profile is created for that data set. The profile summary helps you in analyzing your data more closely and in deciding which cleansing operations on your data will provide the best results for your use-case. You can also perform CRUD operations on data profiles for data sets in non-governed catalogs or projects as well. ## Create a data profile You can use this API in one of two ways: * To create a data profile * To create and execute a data profile To create a data profile for a data set in a specified catalog or project and not execute it, call the following POST method: POST {GATEWAY_URL}/v2/data_profiles?start=false OR POST {GATEWAY_URL}/v2/data_profiles To create a data profile for a data set in a specified catalog or project and execute it, call the following POST method: POST {GATEWAY_URL}/v2/data_profiles?start=true The minimal request payload required to create a data profile is as follows: { "metadata": { "dataset_id": "{DATASET_ID}", "catalog_id": "{CATALOG_ID}" } } OR { "metadata": { "dataset_id": "{DATASET_ID}", "project_id": "{PROJECT_ID}" } } The request payload can have an `entity` part which is optional: { "metadata": { "dataset_id": "{DATASET_ID}", "catalog_id": "{CATALOG_ID}" }, "entity": { "data_profile": { "options": { "profile_mode":"{MODE_OF_PROFILING}", "max_row_count": {MAX_ROW_COUNT_VALUE}, "max_distribution_size": {MAX_SIZE_OF_DISTRIBUTIONS}, "max_numeric_stats_bins": {MAX_NUMBER_OF_STATIC_BINS}, "classification_options": { "disabled": {BOOLEAN_TO_ENABLE_OR_DISABLE_CLASSIFICATION_OPTIONS}, "class_codes": {DATA_CLASS_CODE}, "items": {ITEMS} } } } } The following parameters are required in the URI and the payload: 1. `start`: Specifies whether to start the profiling service immediately after the data profile is created. The default is `false`. 2. `profile_mode`: Specifies whether the profile mode should be batch mode or interactive mode. The default is `batch` mode. 3. `max_row_count`: Specifies the maximum number of rows to perform profiling on. If no value is provided or if the value is invalid (negative), the default is to calculate the total number of rows. 4. `row_percentage`: Specifies the percentage of rows to perform profiling on. If no value is provided or if the value is invalid (<0 or >100), the default is to take 100% of the rows. 5. `max_distribution_size`: Specifies the maximum size of various distributions produced by the profiling process. If no value is provided, the default is 100. 6. `max_numeric_stats_bins`: Specifies the maximum number of bins to use in the numerical statistics. If no bin size is provided, the default is 100 bins. 7. `classification_options`: Specifies the various options available for classification. (i). `disabled`: If true, the classification options are disabled and default values are used. (ii). `class_codes`: Specifies the data class code to consider during profiling. (iii). `items`: Specifies the items. **Note:** You can get various data class codes through the data class service. To create a data profile for a data set, the following steps must be completed: 1. You must have a valid IAM token to make REST API calls and a project or catalog ID. 2. You must have an IBM Cloud Object Storage bucket, which must be associated with your catalog in the project. 3. The data set must be added to your catalog in the project. 4. Construct a request payload to create a data profile with the values required in the payload. 5. Send a POST request to create a data profile. When you call the method, the payload is validated. If a required value is not specified or a value is invalid, you get a response message with an HTTP status code of 400 and information about the invalid or missing values. The response of the method includes a location header with a value that indicates the location of the profile that was created. The response body also includes a field `href` which contains the location of the created profile. The following example shows a success response: { "metadata": { "guid": "7d24f7bd-4c96-4582-b939-1044366a5253", "asset_id": "7d24f7bd-4c96-4582-b939-1044366a5253", "dataset_id": "6ce6c37e-d5fd-4b03-9c29-a6e371e78e63", "url": "https://apsx-api-dev.stage1.ng.bluemix.net/v2/data_profiles/7d24f7bd-4c96-4582-b939-1044366a5253?catalog_id=e92b6729-2dc7-49af-861f-a96202667f3b&dataset_id=6ce6c37e-d5fd-4b03-9c29-a6e371e78e63", "catalog_id": "e92b6729-2dc7-49af-861f-a96202667f3b", "created_at": "2017-10-04T04:25:24.475Z", "accessed_at": "2017-10-04T04:25:24.475Z", "owner": "pgoyal89@in.ibm.com" }, "entity": { "data_profile": { "options": { "disable_profiling": false, "max_distribution_size": 100, "max_numeric_stats_bins": 100, "classification_options": { "disabled": false, "use_all_ibm_classes": true, "use_all_custom_classes": false, "ibm_class_codes": [], "custom_class_codes": [] } }, "execution": { "status": "submitted" }, "columns": [] } }, "href": "https://apsx-api-dev.stage1.ng.bluemix.net/v2/data_profiles/7d24f7bd-4c96-4582-b939-1044366a5253?catalog_id=e92b6729-2dc7-49af-861f-a96202667f3b&dataset_id=6ce6c37e-d5fd-4b03-9c29-a6e371e78e63" } The `execution.status` of the profile is `none` if the `start` parameter is not set or is set to `false`. Otherwise, it is in `submitted` state or any other state depending on the profiling execution status. The following are possible response codes for this API call: | Response HTTP status | Cause | Possible Scenarios | | -------------------- | ------------ | ----------------------- | | 201 | Created| A data profile was created. | | 400 | Bad Request | The request payload either had some invalid values or invalid/unwanted parameters. | | 401 | Unauthorized | Invalid IAM token was provided in the request header. | | 403 | Forbidden | User is not allowed to create a data profile. | | 500 | Internal Server Error | Some runtime error occurred. | If something goes wrong, see [Troubleshooting](https://github.ibm.com/dataconnect/wdp-profiling/wiki/Troubleshooting-if-something-goes-wrong#troubleshooting-your-way-out-if-something-goes-wrong). For more information, see the [Profiling Swagger documentation](https://wdp-profiling-ys1dev.stage1.mybluemix.net/v2/data_profiles/api/explorer/#!/Data_Profiles/createDataProfile). ## Get a data profile To get a data profile for a data set in a specified catalog or project, call the following GET method: GET {GATEWAY_URL}/v2/data_profiles/{PROFILE_ID}?catalog_id={CATALOG_ID}&dataset_id={DATASET_ID} OR GET {GATEWAY_URL}/v2/data_profiles/{PROFILE_ID}?project_id={PROJECT_ID}&dataset_id={DATASET_ID} The value of `PROFILE_ID` is the value of `metadata.guid` from the successful response payload of the create data profile call. The following example shows a success response with an HTTP status code of 200 indicating that a data profile was created but wasn't executed: { "metadata": { "guid": "4d30b7d9-5923-43e1-a7c9-ce1f5a27a6d7", "asset_id": "4d30b7d9-5923-43e1-a7c9-ce1f5a27a6d7", "dataset_id": "6ce6c37e-d5fd-4b03-9c29-a6e371e78e63", "url": "https://apsx-api-dev.stage1.ng.bluemix.net/v2/data_profiles/4d30b7d9-5923-43e1-a7c9-ce1f5a27a6d7?catalog_id=e92b6729-2dc7-49af-861f-a96202667f3b&dataset_id=6ce6c37e-d5fd-4b03-9c29-a6e371e78e63", "catalog_id": "e92b6729-2dc7-49af-861f-a96202667f3b", "created_at": "2017-10-04T04:34:47.891Z", "accessed_at": "2017-10-04T04:34:47.891Z", "owner": "pgoyal89@in.ibm.com" }, "entity": { "data_profile": { "options": { "disable_profiling": false, "max_distribution_size": 100, "max_numeric_stats_bins": 100, "classification_options": { "disabled": false, "use_all_ibm_classes": true, "use_all_custom_classes": false, "ibm_class_codes": [], "custom_class_codes": [] } }, "execution": { "status": "none" }, "columns": [] } }, "href": "https://apsx-api-dev.stage1.ng.bluemix.net/v2/data_profiles/4d30b7d9-5923-43e1-a7c9-ce1f5a27a6d7?catalog_id=e92b6729-2dc7-49af-861f-a96202667f3b&dataset_id=6ce6c37e-d5fd-4b03-9c29-a6e371e78e63" } The following example shows a success response with an HTTP status code of 202 indicating that a data profile was created and executed, but that the profiling status is `running`: { "metadata": { "asset_id": "7d24f7bd-4c96-4582-b939-1044366a5253", "dataset_id": "6ce6c37e-d5fd-4b03-9c29-a6e371e78e63", "url": "https://apsx-api-dev.stage1.ng.bluemix.net/v2/data_profiles/7d24f7bd-4c96-4582-b939-1044366a5253?catalog_id=e92b6729-2dc7-49af-861f-a96202667f3b&dataset_id=6ce6c37e-d5fd-4b03-9c29-a6e371e78e63", "catalog_id": "e92b6729-2dc7-49af-861f-a96202667f3b", "created_at": "2017-10-04T04:25:24.475Z", "accessed_at": "2017-10-04T04:25:24.475Z", "owner": "pgoyal89@in.ibm.com" }, "entity": { "data_profile": { "attachment_id": "a4702c88-4434-4f0e-848a-c31c617e9d61", "attachment_url": "https://s3-api.us-geo.objectstorage.softlayer.net/test3fdca4402ca844c3b32a9a4628538b2f/e92b6729-2dc7-49af-861f-a96202667f3b/6ce6c37e-d5fd-4b03-9c29-a6e371e78e63/a4702c88-4434-4f0e-848a-c31c617e9d61?uploadId=0100015e-e5a1-9563-8d8b-a097683ddb6d&partNumber=1&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20171004T042525Z&X-Amz-SignedHeaders=host&X-Amz-Expires=86400&X-Amz-Credential=HbrfDnbIcc3Oxjdewh1j%2F20171004%2Fus-standard%2Fs3%2Faws4_request&X-Amz-Signature=6c1e0cbc9d42917a58d8173dba2209874418503572b3d180c8d257c1bd860bc2", "options": { "disable_profiling": false, "max_distribution_size": 100, "max_numeric_stats_bins": 100, "classification_options": { "disabled": false, "use_all_ibm_classes": true, "use_all_custom_classes": false, "ibm_class_codes": [], "custom_class_codes": [] } }, "execution": { "status": "running", "activity_id": "e7d3cad8-a0f0-4e62-ac7e-9be412da3d18", "activity_run_id": "c055ba06-b434-418d-a902-617373483d07" }, "columns": [] } }, "href": "https://apsx-api-dev.stage1.ng.bluemix.net/v2/data_profiles/7d24f7bd-4c96-4582-b939-1044366a5253?catalog_id=e92b6729-2dc7-49af-861f-a96202667f3b&dataset_id=6ce6c37e-d5fd-4b03-9c29-a6e371e78e63" } The following example shows a success response with an HTTP status code of 200 indicating that a data profile was created and executed, and the profiling status is `finished`: { "metadata": { "guid": "string", "asset_id": "string", "dataset_id": "string", "url": "string", "project_id": "string", "catalog_id": "string", "created_at": "2017-08-03T04:09:21.891Z", "accessed_at": "2017-08-03T04:09:21.891Z", "owner": "string", "last_updater": "string" }, "entity": { "data_profile": { "attachment_id": "string", "attachment_url": "string", "connection_id": "string", "connection_path": "string", "score": 0, "options": { "profile_mode": "string", "max_row_count": 0, "row_percentage": 0, "max_distribution_size": 0, "max_numeric_stats_bins": 0, "classification_options": { "disabled": true, "class_codes": [ "string" ] } }, "execution": { "status": "finished", "activity_id": "string", "activity_run_id": "string" }, "summary": { "version": "string", "row_count": 0, "score": 0, "score_stats": { "n": 0, "mean": 0, "variance": 0, "stddev": 0, "min": 0, "max": 0, "sum": 0 }, "problem_distribution": [ { "value": "UniquenessViolation", "count": 0 } ], "type_distribution": [ { "value": "UNKNOWN", "count": 0 } ], "logical_type_distribution": [ { "value": "UNKNOWN", "count": 0 } ], "class_distribution": [ { "value": { "code": "string", "name": "string" }, "count": 0 } ] }, "columns": [ { "name": "string", "original_type": { "length": 0, "nullable": false, "scale": 0, "type": "bigint" }, "overwritten_type": { "type": { "length": 0, "precision": 0, "scale": 0, "type": "UNKNOWN" }, "nullable": true, "user": "string" }, "overwritten_class": { "class": { "code": "string", "name": "string" }, "user": "string" }, "overwritten_format": { "format": "string", "user": "string" }, "value_analysis": { "distinct_count": 0, "null_count": 0, "empty_count": 0, "unique_count": 0, "max_value_frequency": 0, "min_string": "string", "max_string": "string", "min_number": 0, "max_number": 0, "max_integer_length": 0, "min_date": "string", "max_date": "string", "inferred_type": { "type": { "length": 0, "precision": 0, "scale": 0, "type": "UNKNOWN" } }, "inferred_class": { "class": { "code": "string", "name": "string" }, "confidence": 0, "priority": 0 }, "inferred_format": { "format": "string" }, "value_distribution": [ { "value": "string", "count": 0 } ], "class_distribution": [ { "value": { "code": "string", "name": "string" }, "count": 0, "confidence": 0, "priority": 0 } ], "type_distribution": [ { "value": { "length": 0, "precision": 0, "scale": 0, "type": "UNKNOWN" }, "count": 0, "distinct_count": 0 } ], "logical_type_distribution": [ { "value": "UNKNOWN", "count": 0, "distinct_count": 0 } ], "format_distribution": [ { "value": "string", "count": 0, "distinct_count": 0 } ], "bigram_distribution": [ { "value": "string", "count": 0, "distinct_count": 0 } ], "length_distribution": [ { "value": 0, "count": 0, "distinct_count": 0 } ], "precision_distribution": [ { "value": 0, "count": 0, "distinct_count": 0 } ], "scale_distribution": [ { "value": 0, "count": 0, "distinct_count": 0 } ], "case_distribution": [ { "value": "AnyCase", "count": 0, "distinct_count": 0 } ], "word_distribution": [ { "value": "string", "count": 0, "distinct_count": 0 } ], "numeric_stats": { "count": 0, "mean": 0, "variance": 0, "bins": [ { "count": 0, "mean": 0, "min": 0, "max": 0 } ] }, "word_stats": { "n": 0, "mean": 0, "variance": 0, "stddev": 0, "min": 0, "max": 0, "sum": 0 }, "alpha_chatacter_stats": { "n": 0, "mean": 0, "variance": 0, "stddev": 0, "min": 0, "max": 0, "sum": 0 }, "digit_chatacter_stats": { "n": 0, "mean": 0, "variance": 0, "stddev": 0, "min": 0, "max": 0, "sum": 0 }, "other_chatacter_stats": { "n": 0, "mean": 0, "variance": 0, "stddev": 0, "min": 0, "max": 0, "sum": 0 }, "length_stats": { "n": 0, "mean": 0, "variance": 0, "stddev": 0, "min": 0, "max": 0, "sum": 0 } }, "quality_analysis": { "score": 0, "score_stats": { "n": 0, "mean": 0, "variance": 0, "stddev": 0, "min": 0, "max": 0, "sum": 0 }, "problem_distribution": [ { "value": "UniquenessViolation", "count": 0, "distinct_count": 0, "confidence": 0 } ] } } ] } "href": "string" } For other runtime errors, you might get an HTTP status code of 500 indicating that profiling didn't finished as expected. The following are possible response codes for this API call: | Response HTTP status | Cause | Possible Scenarios | | -------------------- | --------------- | -------------------- | | 200 | Success | Data profile is created and executed. | | 202 | Accepted | Data profile is created and under execution. | | 401 | Bad Request | Invalid IAM token was provided in the request header. | | 403 | Forbidden | User is not allowed to get the data profile. | | 404 | Not Found | The data profile specified was not found. | | 500 | Internal Server Error | Some runtime error occurred. | If something goes wrong, see [Troubleshooting](https://github.ibm.com/dataconnect/wdp-profiling/wiki/Troubleshooting-if-something-goes-wrong#troubleshooting-your-way-out-if-something-goes-wrong). For more information, see the [Profiling Swagger documentation](https://wdp-profiling-ys1dev.stage1.mybluemix.net/v2/data_profiles/api/explorer/#!/Data_Profiles/getDataProfile). ## Update a data profile To update a data profile for a data set in a specified catalog or project, call the following PATCH method: PATCH {GATEWAY_URL}/v2/data_profiles/{PROFILE_ID}?catalog_id={CATALOG_ID}&dataset_id={DATASET_ID} OR PATCH {GATEWAY_URL}/v2/data_profiles/{PROFILE_ID}?project_id={PROJECT_ID}&dataset_id={DATASET_ID} The value of `PROFILE_ID` is the value of `metadata.guid` from the successful response payload of the create data profile call. The JSON request payload must be as follows: [ { "op": "add", "path": "string", "from": "string", "value": {} } ] During update, the entire data profile is replaced, apart from any read-only or response-only attributes. If profiling processes are running and the start parameter is set to true, then a data profile is only updated if the `stop_in_progress_runs parameter` is set to true. The updates must be specified by using the JSON patch format, described in RFC 6902. The following example shows a success response: { "metadata": { "guid": "string", "asset_id": "string", "dataset_id": "string", "url": "string", "project_id": "string", "catalog_id": "string", "created_at": "2017-08-03T04:09:21.891Z", "accessed_at": "2017-08-03T04:09:21.891Z", "owner": "string", "last_updater": "string" }, "entity": { "data_profile": { "attachment_id": "string", "attachment_url": "string", "connection_id": "string", "connection_path": "string", "score": 0, "options": { "profile_mode": "string", "max_row_count": 0, "row_percentage": 0, "max_distribution_size": 0, "max_numeric_stats_bins": 0, "classification_options": { "disabled": true, "class_codes": [ "string" ] } }, "execution": { "status": "none", "activity_id": "string", "activity_run_id": "string" }, "summary": { "version": "string", "row_count": 0, "score": 0, "score_stats": { "n": 0, "mean": 0, "variance": 0, "stddev": 0, "min": 0, "max": 0, "sum": 0 }, "problem_distribution": [ { "value": "UniquenessViolation", "count": 0 } ], "type_distribution": [ { "value": "UNKNOWN", "count": 0 } ], "logical_type_distribution": [ { "value": "UNKNOWN", "count": 0 } ], "class_distribution": [ { "value": { "code": "string", "name": "string" }, "count": 0 } ], "dataprofile_classification": [ { "id": "string", "name": "string" } ] }, "columns": [ { "name": "string", "original_type": { "length": 0, "nullable": false, "scale": 0, "type": "bigint" }, "overwritten_type": { "type": { "length": 0, "precision": 0, "scale": 0, "type": "UNKNOWN" }, "nullable": true, "user": "string" }, "overwritten_class": { "class": { "code": "string", "name": "string" }, "user": "string" }, "overwritten_format": { "format": "string", "user": "string" }, "value_analysis": { "distinct_count": 0, "null_count": 0, "empty_count": 0, "unique_count": 0, "max_value_frequency": 0, "min_string": "string", "max_string": "string", "min_number": 0, "max_number": 0, "max_integer_length": 0, "min_date": "string", "max_date": "string", "inferred_type": { "type": { "length": 0, "precision": 0, "scale": 0, "type": "UNKNOWN" } }, "inferred_class": { "class": { "code": "string", "name": "string" }, "confidence": 0, "priority": 0 }, "inferred_format": { "format": "string" }, "value_distribution": [ { "value": "string", "count": 0 } ], "class_distribution": [ { "value": { "code": "string", "name": "string" }, "count": 0, "confidence": 0, "priority": 0 } ], "type_distribution": [ { "value": { "length": 0, "precision": 0, "scale": 0, "type": "UNKNOWN" }, "count": 0, "distinct_count": 0 } ], "logical_type_distribution": [ { "value": "UNKNOWN", "count": 0, "distinct_count": 0 } ], "format_distribution": [ { "value": "string", "count": 0, "distinct_count": 0 } ], "bigram_distribution": [ { "value": "string", "count": 0, "distinct_count": 0 } ], "length_distribution": [ { "value": 0, "count": 0, "distinct_count": 0 } ], "precision_distribution": [ { "value": 0, "count": 0, "distinct_count": 0 } ], "scale_distribution": [ { "value": 0, "count": 0, "distinct_count": 0 } ], "case_distribution": [ { "value": "AnyCase", "count": 0, "distinct_count": 0 } ], "word_distribution": [ { "value": "string", "count": 0, "distinct_count": 0 } ], "numeric_stats": { "count": 0, "mean": 0, "variance": 0, "bins": [ { "count": 0, "mean": 0, "min": 0, "max": 0 } ] }, "word_stats": { "n": 0, "mean": 0, "variance": 0, "stddev": 0, "min": 0, "max": 0, "sum": 0 }, "alpha_chatacter_stats": { "n": 0, "mean": 0, "variance": 0, "stddev": 0, "min": 0, "max": 0, "sum": 0 }, "digit_chatacter_stats": { "n": 0, "mean": 0, "variance": 0, "stddev": 0, "min": 0, "max": 0, "sum": 0 }, "other_chatacter_stats": { "n": 0, "mean": 0, "variance": 0, "stddev": 0, "min": 0, "max": 0, "sum": 0 }, "length_stats": { "n": 0, "mean": 0, "variance": 0, "stddev": 0, "min": 0, "max": 0, "sum": 0 } }, "quality_analysis": { "score": 0, "score_stats": { "n": 0, "mean": 0, "variance": 0, "stddev": 0, "min": 0, "max": 0, "sum": 0 }, "problem_distribution": [ { "value": "UniquenessViolation", "count": 0, "distinct_count": 0, "confidence": 0 } ] } } ] } }, "href": "string" } If something goes wrong, see [Troubleshooting](https://github.ibm.com/dataconnect/wdp-profiling/wiki/Troubleshooting-if-something-goes-wrong#troubleshooting-your-way-out-if-something-goes-wrong). For more information, see the [Profiling Swagger documentation](https://wdp-profiling-ys1dev.stage1.mybluemix.net/v2/data_profiles/api/explorer/#!/Data_Profiles/patchDataProfile). ## Modify asset level classification This API is used for CRUD operations on asset level classification. To modify the asset level classification details in the `data_profile` parameter for a data set in a specified catalog or project, call the following PATCH method: PATCH {GATEWAY_URL}/v2/data_profiles/classification?catalog_id={CATALOG_ID}&dataset_id={DATASET_ID} OR PATCH {GATEWAY_URL}/v2/data_profiles/classification?project_id={PROJECT_ID}&dataset_id={DATASET_ID} The JSON request payload must be structured in the following way: [ { "op": "add", "path": "/data_classification", "value": [ { "id":"{ASSET_LEVEL_CLASSIFICATION_ID}", "name":"{ASSET_LEVEL_CLASSIFICATION_NAME}" } ] } ] The `path` attribute must be set to what is written in the previous JSON request payload, otherwise you will get a validation error with an HTTP status code of 400. The values of `ASSET_LEVEL_CLASSIFICATION_ID` and `ASSET_LEVEL_CLASSIFICATION_NAME` can be: `PII` and `PII details` respectively. The data updates must be specified by using the JSON patch format, described in RFC 6902 [https://tools.ietf.org/html/rfc6902]. For more details about JSON patch, see [http://jsonpatch.com]. A successful response has an HTTP status code of 200 and lists the asset level classifications. The following are possible response codes for this API call: | Response HTTP status | Cause | Possible Scenarios | | -------------------- | ----- | ----------------- | | 200 | Success| Asset Level Classification is added to the asset. | | 400 | Bad Request | The request payload either had some invalid values or invalid/unwanted parameters. | | 401 | Unauthorized | Invalid IAM token was provided in the request header. | | 403 | Forbidden | User is not allowed to add asset level classification to the asset. | | 500 | Internal Server Error | A runtime error occurred. If something goes wrong, see [Troubleshooting](https://github.ibm.com/dataconnect/wdp-profiling/wiki/Troubleshooting-if-something-goes-wrong#troubleshooting-your-way-out-if-something-goes-wrong). For more information, see the [Profiling Swagger documentation](https://wdp-profiling-ys1dev.stage1.mybluemix.net/v2/data_profiles/api/explorer/#!/Data_Profiles/patchAssetClassification). ## Delete a data profile To delete a data profile for a data set in a specified catalog or project, call the following DELETE method: DELETE {GATEWAY_URL}/v2/data_profiles/{PROFILE_ID}?catalog_id={CATALOG_ID}&dataset_id={DATASET_ID}&stop_in_progress_profiling_runs=false OR DELETE {GATEWAY_URL}/v2/data_profiles/{PROFILE_ID}?project_id={PROJECT_ID}&dataset_id={DATASET_ID}&stop_in_progress_profiling_runs=true The value of `PROFILE_ID` is the value of `metadata.guid` from the successful response payload of the create data profile call. You can't delete a profile if the profiling execution status is in `running` state and the query parameter `stop_in_progress_profiling_runs` is set to false. A successful response has an HTTP status code of 204. If something goes wrong, see [Troubleshooting](https://github.ibm.com/dataconnect/wdp-profiling/wiki/Troubleshooting-if-something-goes-wrong#troubleshooting-your-way-out-if-something-goes-wrong). For more information, see the [Profiling Swagger documentation](https://wdp-profiling-ys1dev.stage1.mybluemix.net/v2/data_profiles/api/explorer/#!/Data_Profiles/deleteDataProfile).

Data Flows

# Introduction Use the Watson Data API data flows service to create and run data flows in a runtime engine. A flow can read data from a large variety of sources, process that data using pre-defined operations or custom code, and then write it to one or more targets. The runtime engine can handle large amounts of data so it's ideally suited for reading, processing, and writing data at volume. The sources and targets that are supported include both Cloud and on-premises offerings as well as data assets in projects. Cloud offerings include IBM Cloud Object Storage, Amazon S3, and Azure, among others. On-premises offerings include IBM Db2, Microsoft SQL Server, and Oracle, among others. For a list of the supported connectivity and the properties they support, see [IBM Watson Data API Data Flows Service - Data Asset and Connection Properties](https://api.dataplatform.ibm.com/v2/data_flows/doc/dataasset_and_connection_properties.html). ## Environments ## The data flows service is currently deployed only to the US South region of IBM Cloud. Use this environment URL in place of {service_URL} in the examples below: * US south `https://api.dataplatform.ibm.com` ## Creating a data flow ## The following example shows how to create a data flow that reads data from a table on IBM Db2 Warehouse on Cloud (previously called IBM dashDB), filters the data, and writes the data to a data asset in the Watson Data API project. The data flow created for this example will contain a linear pipeline, although in the general case, the pipeline forms a directed asymmetric graph (DAG). #### Create a connection #### Begin by creating a connection to an existing IBM Db2 Warehouse on Cloud instance to use as the source of the data flow. For further information on the connections service, see [Watson Data API documentation](https://console.bluemix.net/apidocs/1084-watson-data-platform-core-services-beta-#versioning). The following example shows a request to create a connection: ```POST https://api.dataplatform.ibm.com/v2/connections?project_id=ff1ab70b-0553-409a-93f9-ccc31471c218``` Request payload: ```json { "name":"my_dashdb", "origin_country":"us", "datasource_type":"cfdcb449-1204-44ba-baa6-9a8a878e6aa7", "properties":{ "database": "BLUDB", "password": "********", "host": "awh-yp-small03.services.dal.bluemix.net", "username": "dash******" } } ``` Response payload: ```json { "metadata": { "asset_id": "85be3e09-1c71-45d3-8d5d-220d6a6ea850", "asset_type": "connection", "project_id": "ff1ab70b-0553-409a-93f9-ccc31471c218", "create_time": "2017-11-27T10:17:08.000Z", "creator": "demo_dataflow_user@mailinator.com", "usage": { "last_access_time": "2017-11-27T10:17:08.685Z", "last_accessor": "demo_dataflow_user@mailinator.com", "access_count": 0 } }, "entity": { "datasource_type": "cfdcb449-1204-44ba-baa6-9a8a878e6aa7", "name": "my_dashdb", "flags": [], "origin_country": "us", "rov": { "mode": 0 }, "properties": { "database": "BLUDB", "password": "********", "host": "awh-yp-small03.services.dal.bluemix.net", "username": "dashdash******", "sg_service_name": "DataRefinery" } } } ``` In the response, you can see that the connection was created with an ID of `85be3e09-1c71-45d3-8d5d-220d6a6ea850`, which you'll need to use later and specify in the data flow you create. #### Defining a source in a data flow #### A data flow can contain one or more data sources. A data source is defined as a *binding node* in the data flow *pipeline*, which has one output and no inputs. The *binding node* must reference either a connection or a data asset. Depending on the type of connection or data asset, additional *properties* might also need to be specified. Refer to [IBM Watson Data API Data Flows Service - Data Asset and Connection Properties](https://api.dataplatform.ibm.com/v2/data_flows/doc/dataasset_and_connection_properties.html) to determine which properties are applicable for a given connection, and which of those are required. For IBM Db2 Warehouse on Cloud both `select_statement` and `table_name` are required, so you must include values for those in the data flow. For the following example, reference the connection you created earlier. The *binding node* for the data flow's source is: ```json { "id":"source1", "type":"binding", "output":{ "id":"source1Output" }, "connection":{ "properties":{ "schema_name":"GOSALESHR", "table_name":"EMPLOYEE" }, "ref":"85be3e09-1c71-45d3-8d5d-220d6a6ea850" } } ``` The `output` attribute declares the ID of the *output port* of this source as `source1Output` so that other nodes can read from it. You can see the schema and table name have been defined, and that the connection with ID `85be3e09-1c71-45d3-8d5d-220d6a6ea850` is being referenced. #### Defining an operation in a data flow #### A data flow can contain zero or more operations, with a typical operation having one or more inputs and one or more outputs. An operation input is linked to the output of a source or another operation. An operation can also have additional parameters which define how the operation performs its work. An operation is defined as an *execution node* in the data flow *pipeline*. The following example creates a filter operation so that only rows with value greater than `2010-01-01` in the `DATE_HIRED` field are retained. The *execution node* for our filter operation is: ```json { "id":"operation1", "type":"execution_node", "op":"com.ibm.wdp.transformer.FreeformCode", "parameters":{ "FREEFORM_CODE":"filter(DATE_HIRED>'2010-01-01*')" }, "inputs":[ { "id":"inputPort1", "links":[ { "node_id_ref":"source1", "port_id_ref":"source1Output" } ] } ], "outputs":[ { "id":"outputPort1" } ] } ``` The `inputs` attribute declares an *input port* with ID `inputPort1` which references the *output port* of the source node (node ID `source1` and port ID `source1Output`). The `outputs` attribute declares the ID of the *output port* of this operation as `outputPort1` so that other nodes can read from it. For this example, the operation is defined as a freeform operation, denoted by the `op` attribute value of `com.ibm.wdp.transformer.FreeformCode`. A freeform operation has only a single parameter named `FREEFORM_CODE` whose value is a snippet of Sparklyr code. In this snippet of code, a filter function is called with the arguments to retain only those rows with value greater than `2010-01-01` in the `DATE_HIRED` field. The `outputs` attribute declares the ID of the output of this operation as `outputPort1` so that other nodes can read from it. #### Defining a target in a data flow #### A data flow can contain zero or more targets. A target is defined as a *binding node* in the data flow *pipeline* which has one input and no outputs. As with the source, the *binding node* must reference either a connection or a data asset. When using a data asset as a target, specify either the ID or name of an existing data asset. In the following example, a data asset is referenced by its name. The *binding node* for the data flow's target is: ```json { "id":"target1", "type":"binding", "input":{ "id":"target1Input", "link":{ "node_id_ref":"operation1", "port_id_ref":"outputPort1" } }, "data_asset":{ "properties":{ "name":"my_shapedFile.csv" } } } ``` The `input` attribute declares an *input port* with ID `target1Input` which references the *output port* of our operation node (node ID `operation1` and port ID `outputPort1`). The name of the data asset to create or update is specified as `my_shapedFile.csv`. Unless otherwise specified, this data asset is assumed to be in the same catalog or project as that which contains the data flow. #### Creating the data flow #### Putting it all together, you can now call the API to create the data flow with the following POST method: ```POST https://{service_URL}/v2/data_flows``` The new data flow can be stored in a catalog or project. Use either the `catalog_id` **or** `project_id` query parameter, depending on where you want to store the data flow asset. An example request to create a data flow is shown below: ```POST https://api.dataplatform.ibm.com/v2/data_flows?project_id=ff1ab70b-0553-409a-93f9-ccc31471c218``` Request payload: ```json { "name":"my_dataflow", "pipeline":{ "doc_type":"pipeline", "version":"1.0", "primary_pipeline":"pipeline1", "pipelines":[ { "id":"pipeline1", "runtime":"Spark", "nodes":[ { "id":"source1", "type":"binding", "output":{ "id":"source1Output" }, "connection":{ "properties":{ "schema_name":"GOSALESHR", "table_name":"EMPLOYEE" }, "ref":"85be3e09-1c71-45d3-8d5d-220d6a6ea850" } }, { "id":"operation1", "type":"execution_node", "op":"com.ibm.wdp.transformer.FreeformCode", "parameters":{ "FREEFORM_CODE":"filter(DATE_HIRED>'2010-01-01*')" }, "inputs":[ { "id":"inputPort1", "links":[ { "node_id_ref":"source1", "port_id_ref":"source1Output" } ] } ], "outputs":[ { "id":"outputPort1" } ] }, { "id":"target1", "type":"binding", "input":{ "id":"target1Input", "link":{ "node_id_ref":"operation1", "port_id_ref":"outputPort1" } }, "data_asset":{ "properties":{ "name":"my_shapedFile.csv" } } } ] } ] } } ``` Response payload: ```json { "metadata": { "asset_id": "cfdacdb4-3180-466f-8d4c-be7badea5d64", "asset_type": "data_flow", "project_id": "ff1ab70b-0553-409a-93f9-ccc31471c218", "create_time": "2017-11-27T16:54:37.000Z", "creator": "demo_dataflow_user@mailinator.com", "href": "https://api.dataplatform.ibm.com/v2/data_flows/cfdacdb4-3180-466f-8d4c-be7badea5d64?project_id=ff1ab70b-0553-409a-93f9-ccc31471c218", "usage": { "last_modification_time": "2017-11-27T16:54:37.807Z", "last_modifier": "demo_dataflow_user@mailinator.com", "last_access_time": "2017-11-27T16:54:37.807Z", "last_accessor": "demo_dataflow_user@mailinator.com", "access_count": 0 } }, "entity": { "name": "my_dataflow", "pipeline": { "doc_type": "pipeline", "version": "1.0", "primary_pipeline": "pipeline1", "pipelines": [ { "id": "pipeline1", "runtime": "Spark", "nodes": [ { "id": "source1", "type": "binding", "output": { "id": "source1Output" }, "connection": { "properties": { "schema_name": "GOSALESHR", "table_name": "EMPLOYEE" }, "ref": "85be3e09-1c71-45d3-8d5d-220d6a6ea850" } }, { "id": "operation1", "type": "execution_node", "op": "com.ibm.wdp.transformer.FreeformCode", "parameters": { "FREEFORM_CODE": "filter(DATE_HIRED>'2010-01-01*')" }, "inputs": [ { "id": "inputPort1", "links": [ { "node_id_ref": "source1", "port_id_ref": "source1Output" } ] } ], "outputs": [ { "id": "outputPort1" } ] }, { "id": "target1", "type": "binding", "input": { "id": "target1Input", "link": { "node_id_ref": "operation1", "port_id_ref": "outputPort1" } }, "data_asset": { "properties": { "name": "my_shapedFile.csv" } } } ] } ] }, "tags": [] } } ``` The response shows that the data flow was created with an ID of `cfdacdb4-3180-466f-8d4c-be7badea5d64`, which you will need later to run the data flow you created. ## Working with data flow runs ## #### What is a data flow run? #### Each time a data flow is run, a new data flow run asset is created and stored in the project or catalog to record this event. This asset stores detailed metrics such as how many rows were read and written, a copy of the data flow that was run, and any logs from the engine. During a run, the information in the asset is updated to reflect the current state of the run. When the run completes (successfully or not), the information in the asset is updated one final time. If and when the data flow is deleted, any run assets of that data flow are also deleted. There are four components of a data flow run, which are accessible using different APIs. - Summary (`GET /v2/data_flows/{data_flow_id}/runs/{data_flow_run_id}`). A quick, at-a-glance view of a run with a summary of how many rows in total were read and written. - Detailed metrics (`GET /v2/data_flows/{data_flow_id}/runs/{data_flow_run_id}/metrics`). Detailed metrics for each binding node in the data flow (link sources and targets). - Data flow (`GET /v2/data_flows/{data_flow_id}/runs/{data_flow_run_id}/origin`). A copy of the data flow that was run at that point in time. (Remember that data flows can be modified between runs.) - Logs (`GET /v2/data_flows/{data_flow_id}/runs/{data_flow_run_id}/logs`). The logs from the engine, which are useful for diagnosing run failures. #### Run state life cycle #### A data flow run has a defined life cycle, which is shown by its `state` attribute. The `state` attribute can have one of the following values: - `starting` The run was created but was not yet submitted to the engine. - `queued` The run was submitted to the engine and it is pending. - `running` The run is currently in progress. - `finished` The run finished and was successful. - `error` The run did not complete. An error occurred either before the run was sent to the engine or while the run was in progress. - `stopping` The run was canceled but it is still running. - `stopped` The run is no longer in progress. The run states that define phases of progress are: `starting`, `queued`, `running`, `stopping`. The run states that define states of completion are: `finished`, `error`, `stopped`. The following are typical state transitions you would expect to see: 1. The run completed successfully: `starting` -> `queued` -> `running` -> `finished`. 2. The run failed (for example, connection credentials were incorrect): `starting` -> `queued` -> `running` -> `error`. 3. The run could not be sent to the engine (for example, the connection referenced does not exist): `starting` -> `error`. 4. The run was stopped (for example, at users request): `starting` -> `queued` -> `running` -> `stopping` -> `stopped`. #### Run a data flow #### To run a data flow, call the following POST API: ``` POST https://{service_URL}/v2/data_flows/{data_flow_id}/runs?project_id={project_id} ``` The value of `data_flow_id` is the `metadata.asset_id` from your data flow. An example response from this API call might be: ```json { "metadata": { "asset_id": "ed09488c-6d51-48c4-b190-7096f25645d5", "asset_type": "data_flow_run", "create_time": "2017-12-21T10:51:47.000Z", "creator": "demo_dataflow_user@mailinator.com", "href": "https://api.dataplatform.ibm.com/v2/data_flows/cfdacdb4-3180-466f-8d4c-be7badea5d64/runs/ed09488c-6d51-48c4-b190-7096f25645d5?project_id=ff1ab70b-0553-409a-93f9-ccc31471c218", "project_id": "ff1ab70b-0553-409a-93f9-ccc31471c218", "usage": { "last_modification_time": "2017-12-21T10:51:47.923Z", "last_modifier": "demo_dataflow_user@mailinator.com", "last_access_time": "2017-12-21T10:51:47.923Z", "last_accessor": "demo_dataflow_user@mailinator.com", "access_count": 0 } }, "entity": { "data_flow_ref": "cfdacdb4-3180-466f-8d4c-be7badea5d64", "name": "my_dataflow", "rov": { "mode": 0, "members": [] }, "state": "starting", "tags": [] } } ``` #### Get a data flow run summary #### To retrieve the latest summary of a data flow run, call the following GET method: ``` GET https://{service_URL}/v2/data_flows/{data_flow_id}/runs/{data_flow_run_id}?project_id={project_id} ``` The value of `data_flow_id` is the `metadata.asset_id` from your data flow. The value of `data_flow_run_id` is the `metadata.asset_id` from your data flow run. An example response from this API call might be: ```json { "metadata": { "asset_id": "ed09488c-6d51-48c4-b190-7096f25645d5", "asset_type": "data_flow_run", "create_time": "2017-12-21T10:51:47.000Z", "creator": "demo_dataflow_user@mailinator.com", "href": "https://api.dataplatform.ibm.com/v2/data_flows/cfdacdb4-3180-466f-8d4c-be7badea5d64/runs/ed09488c-6d51-48c4-b190-7096f25645d5?project_id=ff1ab70b-0553-409a-93f9-ccc31471c218", "project_id": "ff1ab70b-0553-409a-93f9-ccc31471c218", "usage": { "last_modification_time": "2017-12-21T10:51:47.923Z", "last_modifier": "demo_dataflow_user@mailinator.com", "last_access_time": "2017-12-21T10:51:47.923Z", "last_accessor": "demo_dataflow_user@mailinator.com", "access_count": 0 } }, "entity": { "data_flow_ref": "cfdacdb4-3180-466f-8d4c-be7badea5d64", "engine_state": { "session_cookie": "route=Spark; HttpOnly; Secure", "engine_run_id": "804d17bd-5ed0-4d89-ba38-ab7890d61e45" }, "name": "my_dataflow", "rov": { "mode": 0, "members": [] }, "state": "finished", "summary": { "completed_date": "2018-01-03T16:58:05.726Z", "engine_elapsed_secs": 9, "engine_completed_date": "2018-01-03T16:58:05.360Z", "engine_started_date": "2018-01-03T16:57:56.211Z", "engine_status_date": "2018-01-03T16:58:05.360Z", "engine_submitted_date": "2018-01-03T16:57:46.044Z", "total_bytes_read": 95466, "total_bytes_written": 42142, "total_rows_read": 766, "total_rows_written": 336 }, "tags": [] } } ``` #### Troubleshooting a failed run #### If a data flow run fails, the `state` attribute is set to the value `error`. In addition to this, the run asset itself has an attribute called `error` which is set to a concise description of the error (where available from the engine). If this information is not available from the engine, a more general message is set in the `error` attribute. This means that the `error` attribute is never left unset if a run fails. The following example shows the `error` payload produced if a schema specified in a source connection's properties doesn't exist: ```json { "error": { "trace": "1c09deb8-c3f9-4dc1-ad5a-0fc4e7c97071", "errors": [ { "code": "runtime_failed", "message": "While the process was running a fatal error occurred in the engine (see logs for more details): SCAPI: CDICO2005E: Table could not be found: \"BADSCHEMAGOSALESHR.EMPLOYEE\" is an undefined name.. SQLCODE=-204, SQLSTATE=42704, DRIVER=4.20.4\ncom.ibm.connect.api.SCAPIException: CDICO2005E: Table could not be found: \"BADSCHEMAGOSALESHR.EMPLOYEE\" is an undefined name.. SQLCODE=-204, SQLSTATE=42704, DRIVER=4.20.4\n\tat com.ibm.connect.jdbc.JdbcInputInteraction.init(JdbcInputInteraction.java:158)\n\t...", "extra": { "account": "2d0d29d5b8d2701036042ca4cab8b613", "diagnostics": "[PROJECT_ID-ff1ab70b-0553-409a-93f9-ccc31471c218] [DATA_FLOW_ID-cfdacdb4-3180-466f-8d4c-be7badea5d64] [DATA_FLOW_NAME-my_dataflow] [DATA_FLOW_RUN_ID-ed09488c-6d51-48c4-b190-7096f25645d5]", "environment_name": "ypprod", "http_status": 400, "id": "CDIWA0129E", "source_cluster": "NULL", "service_version": "1.0.471", "source_component": "WDP-DataFlows", "timestamp": "2017-12-19T19:52:09.438Z", "transaction_id": "71c7d19b-a91b-40b1-9a14-4535d76e9e16", "user": "demo_dataflow_user@mailinator.com" } } ] } } ``` To get the logs produced by the engine, use the following API: ``` GET https://{service_URL}/v2/data_flows/{data_flow_id}/runs/{data_flow_run_id}/logs?project_id={project_id} ``` An example response from this API call might be: ```json { "logs": [ { "date": "2017-12-19T19:52:03.000Z", "event_id": "0", "message_text": "Job requested for activity 'eba3432d-9838-4864-9575-842b8e55d435' with run id 'b1321048-d4ee-4a47-85b5-bf74fbc282cf' by user '*****'", "type": "info" }, { "date": "2017-12-19T19:52:03.000Z", "event_id": "1", "message_text": "Job submitted to cluster 'spark'", "type": "info" }, { "date": "2017-12-19T19:52:03.000Z", "event_id": "2", "message_text": "Job execution started", "type": "info" }, { "date": "2017-12-19T19:52:04.000Z", "event_id": "4", "message_text": "SparkContext: Running Spark version 2.1.0", "type": "info" }, { "date": "2017-12-19T19:52:04.000Z", "event_id": "5", "message_text": "StatusManager: Job executing on node 'dev-dw-spark-dal09-env4-0004.bluemix.net'", "type": "info" }, { "date": "2017-12-19T19:52:06.000Z", "event_id": "8", "message_text": "SCAPI: \"BADSCHEMAGOSALESHR.EMPLOYEE\" is an undefined name.. SQLCODE=-204, SQLSTATE=42704, DRIVER=4.20.4\ncom.ibm.db2.jcc.am.SqlSyntaxErrorException: \"BADSCHEMAGOSALESHR.EMPLOYEE\" is an undefined name.. SQLCODE=-204, SQLSTATE=42704, DRIVER=4.20.4\n\tat com.ibm.db2.jcc.am.kd.a(Unknown Source)\n\tat com.ibm.db2.jcc.am.kd.a(Unknown Source)\n\tat com.ibm.db2.jcc.am.kd.a(Unknown Source)\n\tat com.ibm.db2.jcc.am.fp.c(Unknown Source)\n\tat com.ibm.db2.jcc.am.fp.d(Unknown Source)\n\tat com.ibm.db2.jcc.am.fp.a(Unknown Source)\n\tat com.ibm.db2.jcc.am.gp.a(Unknown Source)\n\tat com.ibm.db2.jcc.t4.bb.h(Unknown Source)\n\tat com.ibm.db2.jcc.t4.bb.b(Unknown Source)\n\tat com.ibm.db2.jcc.t4.p.a(Unknown Source)\n\tat com.ibm.db2.jcc.t4.vb.i(Unknown Source)\n\tat com.ibm.db2.jcc.am.fp.kb(Unknown Source)\n\tat com.ibm.db2.jcc.am.gp.xc(Unknown Source)\n\tat com.ibm.db2.jcc.am.gp.b(Unknown Source)\n\tat com.ibm.db2.jcc.am.gp.kc(Unknown Source)\n\tat com.ibm.db2.jcc.am.gp.executeQuery(Unknown Source)\n\tat com.ibm.connect.jdbc.JdbcInputInteraction.init(JdbcInputInteraction.java:151)\n\tat com.ibm.connect.jdbc.JdbcInputInteraction.(JdbcInputInteraction.java:85)\n\tat com.ibm.connect.jdbc.dashdb.DashdbInputInteraction.(DashdbInputInteraction.java:46)\n\tat com.ibm.connect.jdbc.dashdb.DashdbConnector.getInputInteraction(DashdbConnector.java:107)\n\tat com.ibm.connect.api.DelegatingConnector.getInputInteraction(DelegatingConnector.java:75)\n\tat com.ibm.connect.api.pool.PooledConnector.getInputInteraction(PooledConnector.java:109)\n\tat com.ibm.connect.spark.ConnectorFactory$.inputInteraction(ConnectorFactory.scala:93)\n\tat com.ibm.connect.spark.ConnectorReader$.rowDefinition(ConnectorReader.scala:24)\n\tat com.ibm.connect.spark.ConnectorRelation$$anonfun$schema$1.apply(ConnectorRelation.scala:26)\n\tat com.ibm.connect.spark.ConnectorRelation$$anonfun$schema$1.apply(ConnectorRelation.scala:19)\n\tat scala.Option.getOrElse(Option.scala:121)\n\tat com.ibm.connect.spark.ConnectorRelation.schema(ConnectorRelation.scala:19)\n\tat org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:40)\n\tat org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:389)\n\tat org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)\n\tat org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat sparklyr.Invoke$.invoke(invoke.scala:102)\n\tat sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)\n\tat sparklyr.StreamHandler$.read(stream.scala:62)\n\tat sparklyr.BackendHandler.channelRead0(handler.scala:52)\n\tat sparklyr.BackendHandler.channelRead0(handler.scala:14)\n\tat io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)\n\tat io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)\n\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)\n\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)\n\tat io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)\n\tat java.lang.Thread.run(Thread.java:748)", "type": "fatal" }, { "date": "2017-12-19T19:52:06.000Z", "event_id": "9", "message_id": "CDICO2005E", "message_text": "SCAPI: CDICO2005E: Table could not be found: \"BADSCHEMAGOSALESHR.EMPLOYEE\" is an undefined name.. SQLCODE=-204, SQLSTATE=42704, DRIVER=4.20.4\ncom.ibm.connect.api.SCAPIException: CDICO2005E: Table could not be found: \"BADSCHEMAGOSALESHR.EMPLOYEE\" is an undefined name.. SQLCODE=-204, SQLSTATE=42704, DRIVER=4.20.4\n\tat com.ibm.connect.jdbc.JdbcInputInteraction.init(JdbcInputInteraction.java:158)\n\tat com.ibm.connect.jdbc.JdbcInputInteraction.(JdbcInputInteraction.java:85)\n\tat com.ibm.connect.jdbc.dashdb.DashdbInputInteraction.(DashdbInputInteraction.java:46)\n\tat com.ibm.connect.jdbc.dashdb.DashdbConnector.getInputInteraction(DashdbConnector.java:107)\n\tat com.ibm.connect.api.DelegatingConnector.getInputInteraction(DelegatingConnector.java:75)\n\tat com.ibm.connect.api.pool.PooledConnector.getInputInteraction(PooledConnector.java:109)\n\tat com.ibm.connect.spark.ConnectorFactory$.inputInteraction(ConnectorFactory.scala:93)\n\tat com.ibm.connect.spark.ConnectorReader$.rowDefinition(ConnectorReader.scala:24)\n\tat com.ibm.connect.spark.ConnectorRelation$$anonfun$schema$1.apply(ConnectorRelation.scala:26)\n\tat com.ibm.connect.spark.ConnectorRelation$$anonfun$schema$1.apply(ConnectorRelation.scala:19)\n\tat scala.Option.getOrElse(Option.scala:121)\n\tat com.ibm.connect.spark.ConnectorRelation.schema(ConnectorRelation.scala:19)\n\tat org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:40)\n\tat org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:389)\n\tat org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)\n\tat org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat sparklyr.Invoke$.invoke(invoke.scala:102)\n\tat sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)\n\tat sparklyr.StreamHandler$.read(stream.scala:62)\n\tat sparklyr.BackendHandler.channelRead0(handler.scala:52)\n\tat sparklyr.BackendHandler.channelRead0(handler.scala:14)\n\tat io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)\n\tat io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)\n\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)\n\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)\n\tat io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)\n\tat java.lang.Thread.run(Thread.java:748)", "type": "fatal" }, { "date": "2017-12-19T19:52:07.000Z", "event_id": "10", "message_id": "CDICO2005E", "message_text": "Error: com.ibm.connect.api.SCAPIException: CDICO2005E: Table could not be found: \"BADSCHEMAGOSALESHR.EMPLOYEE\" is an undefined name.. SQLCODE=-204, SQLSTATE=42704, DRIVER=4.20.4\n\tat com.ibm.connect.jdbc.JdbcInputInteraction.init(JdbcInputInteraction.java:158)\n\tat com.ibm.connect.jdbc.JdbcInputInteraction.(JdbcInputInteraction.java:85)\n\tat com.ibm.connect.jdbc.dashdb.DashdbInputInteraction.(DashdbInputInteraction.java:46)\n\tat com.ibm.connect.jdbc.dashdb.DashdbConnector.getInputInteraction(DashdbConnector.java:107)\n\tat com.ibm.connect.api.DelegatingConnector.getInputInteraction(DelegatingConnector.java:75)\n\tat com.ibm.connect.api.pool.PooledConnector.getInputInteraction(PooledConnector.java:109)\n\tat com.ibm.connect.spark.ConnectorFactory$.inputInteraction(ConnectorFactory.scala:93)\n\tat com.ibm.connect.spark.ConnectorReader$.rowDefinition(ConnectorReader.scala:24)\n\tat com.ibm.connect.spark.ConnectorRelation$$anonfun$schema$1.apply(ConnectorRelation.scala:26)\n\tat com.ibm.connect.spark.ConnectorRelation$$anonfun$schema$1.apply(ConnectorRelation.scala:19)\n\tat scala.Option.getOrElse(Option.scala:121)\n\tat com.ibm.connect.spark.ConnectorRelation.schema(ConnectorRelation.scala:19)\n\tat org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:40)\n\tat org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:389)\n\tat org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)\n\tat org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat sparklyr.Invoke$.invoke(invoke.scala:102)\n\tat sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)\n\tat sparklyr.StreamHandler$.read(stream.scala:62)\n\tat sparklyr.BackendHandler.channelRead0(handler.scala:52)\n\tat sparklyr.BackendHandler.channelRead0(handler.scala:14)\n\tat io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)\n\tat io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)\n\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)\n\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)\n\tat io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)\n\tat java.lang.Thread.run(Thread.java:748)", "type": "fatal" }, { "date": "2017-12-19T19:52:07.000Z", "event_id": "11", "message_text": "Job execution ended", "type": "info" } ], "first": { "href": "https://api.dataplatform.ibm.com/v2/data_flows/cfdacdb4-3180-466f-8d4c-be7badea5d64/runs/ed09488c-6d51-48c4-b190-7096f25645d5?project_id=ff1ab70b-0553-409a-93f9-ccc31471c218/logs?project_id=ff1ab70b-0553-409a-93f9-ccc31471c218&limit=100&offset=0" }, "last": { "href": "https://api.dataplatform.ibm.com/v2/data_flows/cfdacdb4-3180-466f-8d4c-be7badea5d64/runs/ed09488c-6d51-48c4-b190-7096f25645d5?project_id=ff1ab70b-0553-409a-93f9-ccc31471c218&limit=100&offset=0" }, "limit": 100, "offset": 0, "count": 9 } ``` ## Resources ## For more information, see [Watson Data API](https://developer.ibm.com/api/view/dataplatform-prod:watson-data-platform:title-Watson_Data__beta_).

Stream Flows

## IBM Cloud Data Services Streams Flow Service ### Introduction The IBM Watson Data Platform Streams Flow service provides APIs to create, update, delete, list, start, and stop stream flows. A _streams flow_ is a continuous flow of massive volumes of moving data that real-time analytics can be applied to. A streams flow can read data from a variety of sources, process that data by using analytic operations or your custom code, and then write it to one or more targets. You can access and analyze massive amounts of changing data as it is created. Regardless of whether the data is structured or unstructured, you can leverage data at scale to drive real-time analytics for up-to-the-minute business decisions. The sources that are supported include Kafka, Message Hub, MQTT, and Watson IoT. Targets that are supported include Db2 Warehouse on Cloud, Cloud Object Storage, and Redis. Analytic operators that are supported include Aggregation, Python Machine Learning, Code, and Geofence. ### Environments The Streams Flow service is currently deployed only to the US South region of IBM Cloud. Use this environment URL in place of {service_URL} in the examples below: US south https://api.dataplatform.ibm.com ### Authorization Authorization is done via Identity Access Management (IAM) bearer token. All API calls will require this Bearer token in the header. ### Create a Streams Flow ##### 1. Streaming Analytics instance ID The streams flow is submitted to a Streaming Analytics service for compilation and running. When creating a flow, the Streaming Analytics instance ID must be provided. The instance ID can be found in the service credentials, which can be accessed from the service dashboard. ##### 2. The pipeline graph The streams flow represents it's source, targets, and operations in a pipeline graph. The pipeline graph can be generated by choosing the relevant operators in the Streams Designer canvas. To retrieve a pipeline graphcreated by the Streams Designer, use: ```GET https://api.dataplatform.ibm.com/v2/streams_flows/85be3e09-1c71-45d3-8d5d-220d6a6ea850?project_id=ff1ab70b-0553-409a-93f9-ccc31471c218``` This will return a streams flow containing a pipeline field in the entity. This pipeline object can be copied and submitted into another flow via: ```POST https://api.dataplatform.ibm.com/v2/streams_flows/?project_id=ff1ab70b-0553-409a-93f9-ccc31471c218``` Request Payload: ```json { "name": "My Streams Flow", "description": "A Sample Streams Flow.", "engines": { "streams": { "instance_id": "8ff81caa-1076-41ce-8de1-f4fe8d79e30e" } } "pipeline": { "activity": { "docType": "activity", "name": "test123_Pipeline", "inputDocument": { "pipeline": { "name": "test123_Pipeline" }, "classifications": [], "sources": [ { "connection": { "id": "connection0", "type": "ibm/streams/sources/messagehub", "schemaMapping": [ { "name": "eventTime", "sourceElemName": ".eventTime", "type": "varchar", "length": 255 }, { "name": "customerId", "sourceElemName": ".customerId", "type": "varchar", "length": 255 }, { "name": "latitude", "sourceElemName": ".latitude", "type": "double" }, { "name": "longitude", "sourceElemName": ".longitude", "type": "double" } ], "connectionRef": { "asset_id": "EXAMPLE_MESSAGE_HUB_SERVICE_KEY", "project_id": "EXAMPLE_MESSAGE_HUB_SERVICE_KEY" }, "asset": { "name": "geofenceSampleData", "id": "geofenceSampleData", "type": "topic", "path": "/geofenceSampleData" } }, "tables": [ { "name": "table0", "datasetBindings": [ { "datasetRef": "dataset0", "port": "target" } ], "displayMetadata": { "canvasPosition": { "x": 70, "y": 70 }, "annotations": [] } } ] } ], "targets": [ { "connection": { "id": "connection1", "type": "ibm/streams/targets/objectstorage_v2", "format": "csv", "delimiter": ",", "connectionRef": { "asset_id": "$OSInstance", "project_id": "$OSInstance" }, "asset": { "name": "/DefaultProject-unibot_il_ibm_com/file1", "id": "/DefaultProject-unibot_il_ibm_com/file1", "type": "file", "path": "/DefaultProject-unibot_il_ibm_com/file1" } }, "tables": [ { "name": "table1", "datasetRef": "dataset0", "port": "source", "displayMetadata": { "canvasPosition": { "x": 560, "y": 70 }, "annotations": [] } } ] }, { "connection": { "id": "connection2", "type": "ibm/streams/targets/objectstorage_v2", "format": "csv", "delimiter": ",", "headerRow": true, "connectionRef": { "asset_id": "$OSInstance", "project_id": "$OSInstance" }, "asset": { "name": "/eliezerproj/file2", "id": "/eliezerproj/file2", "type": "file", "path": "/eliezerproj/file2" } }, "tables": [ { "name": "table2", "datasetRef": "dataset0", "port": "source", "displayMetadata": { "canvasPosition": { "x": 570, "y": 210 }, "annotations": [] } } ] } ] } }, "pipeline": { "docType": "pipeline", "name": "test123_Pipeline", "inputDocument": { "schema": [ { "columns": [ { "name": "eventTime", "type": { "type": "varchar" }, "length": 255 }, { "name": "customerId", "type": { "type": "varchar" }, "length": 255 }, { "name": "latitude", "type": { "type": "double" } }, { "name": "longitude", "type": { "type": "double" } } ], "id": "schema0", "name": "schema0" } ], "datasets": [ { "id": "dataset0", "schemaRef": "schema0", "type": "source" } ], "operations": [] } } } ``` ### Streams Flow Lifecycle After a Streams Flow is created it will be in the STOPPED state unless it's been submitted as a job to be started. When starting a job, a Cloudant asset is created to track the status of the streams flow run. The start job operation can take up to minute to complete, during which time the streams flow will be in the STARTING state. Once the submission and compilation has completed, the streams flow will be in the RUNNING state. To change the run state use the POST api: ```POST https://api.dataplatform.ibm.com/v2/streams_flows/85be3e09-1c71-45d3-8d5d-220d6a6ea850/runs?project_id=ff1ab70b-0553-409a-93f9-ccc31471c218``` Request Payload: ```json { state: started, allowStart: true } ``` - For starting the streams flow run, use { state: started }. To stop the flows run, use { state: stopped }. - Specify "allowStart" to start the Streaming Analytics service in the event that it is stopped. The start job operation triggers a long running process on the Streaming Analytics service instance. During this time the progress/status of this job can be viewed : ```GET https://api.dataplatform.ibm.com/v2/streams_flows/85be3e09-1c71-45d3-8d5d-220d6a6ea850/runs?project_id=ff1ab70b-0553-409a-93f9-ccc31471c218``` A version of the pipeline that has been deployed is saved to represent the Runtime Pipeline. The streams flow can still be edited in the Streams Designer, and it will not have an impact on the Runtime Pipeline that has been deployed. After the streams flow is in the Running state, the operator metrics can be retrieved: ```GET https://api.dataplatform.ibm.com/v2/streams_flows/85be3e09-1c71-45d3-8d5d-220d6a6ea850/runs/85be3e09-1c71-45d3-8d5d-220d6a6ea850/metrics?project_id=ff1ab70b-0553-409a-93f9-ccc31471c218``` Response: ```json { "operator_0": { "operator_1": { "n_bytes_processed": 1222, "n_bytes_submitted": 1222, "n_exceptions_caught": 0, "n_tuples_dropped": 0, "n_tuples_processed": 12, "n_tuples_submitted": 12 }, "operator_2": { "n_bytes_processed": 0, "n_bytes_submitted": 0, "n_exceptions_caught": 0, "n_tuples_dropped": 0, "n_tuples_processed": 0, "n_tuples_submitted": 0 } } ``` This JSON represents the metrics provided on the edges. There are 2 edges in this case, from operator_0 to operator_1, and from operator_0 to operator_2.

Security

Keys

Pick a key to use with this API. Make sure you are logged in with your IBM id for your keys to be populated in the dropdown below. By selecting a key, it will be pre-filled for each endpoint in the Documentation section that can be used with the built-in testing. If you want to change which key to use for a particular endpoint, you can do so at the endpoint in the Documentation section.
You can manage your API keys in the <MyAPIs> section. API keys authenticate you to your subscription, so make sure to keep them secret. Do not share the X-IBM-Client-Secret portion of any API key in publicly accessible places such as GitHub, or client-side code.



Manage your keys
 

Documentation

Watson Data API:

Loading content...