The open source Object Storage toolkit is used to create and access files in Object Storage from Streams.  It is been updated with some new features and is also included in Streams 4.3.

The toolkit supports Object Storage services with S3 API like the IBM Cloud Object Storage service and provides the following features:

  • Create/Delete bucket
  • List objects
  • Put/get object
  • Delete object

New features

  • Write objects in Apache Parquet format.
  • You can now control the rolling policy when writing objects.¬†The rolling policy allows you to determine when to close the current file being written and open a new file. The¬† operator supports the following rolling policy types:
    • Size-based (parameter bytesPerObject)
    • Time-based (parameter timePerObject)
    • Tuple count-based (parameter tuplesPerObject)
    • Close object when receiving punctuation marker (default)
  • At least once processing (consistent region support) for ObjectStorageScan and ObjectStorageScan operator
  • Guaranteed processing with exactly-once semantics (consistent region support) with the ObjectStorageSink operator

Reading objects

  • Use the ObjectStorageScan operator similar to the DirectoryScan operator to scan for objects and retrieve the object names
  • Use the ObjectStorageSource operator to read objects line by line or in binary format

Writing objects

Create objects with the ObjectStorageSink operator in line format, blob format or parquet format. It includes Parquet formatted data with Partitioning that is compatible with the IBM Cloud SQL Query service allowing Streams data written to COS to be queried using SQL.

Client selection

Different clients are utilized by the toolkit operators. You may easily select one of the two S3 clients by specifying appropriate protocol in the objectStorageURI parameter:

  • s3a (hadoop-aws)
  • cos (stocator)

The URI should be in cos://<bucket>/ or s3a://<bucket>/ format. Replace <bucket> with the name of the bucket you created. For example, cos://streams-sample-001/ with bucket name streams-sample-001.

For ObjectStorageScan and ObjectStorageSource operator there is no big difference regarding the S3 client and you can select any of both clients.

The ObjectStorageSink works differently depending on the client selection.

Recommendation for client selection when writing objects

  • Your application creates large objects (raw or parquet format) one after another, then select s3a protocol, because large objects are uploaded in multiple parts in parallel.
  • Your application creates many objects within a narrow time frame, then select cos protocol, because multiple threads are uploading the entire object per thread in parallel.
  • Your application creates large objects and ObjectStorageSink is part of a consistent region, then select s3a protocol and benefit from multi-part upload reducing the object close time at drain state of the region.
  • Select the s3a protocol to prevent buffering on disk prior upload. When cos protocol is selected then data is written to disk before uploading the objects to object storage. The s3a protocol uses memory buffer per default.

Quick start with IBM Cloud Object Storage (COS) service

You’ll need:

  • An instance of the IBM COS service
  • IBM COS service credentials created
  • A bucket created¬†that you will either read to or write from.

Create bucket

Steps to complete

Create an application configuration

The credentials for the COS service are stored in the Streams instance in an application configuration.  Create an application configuration by following these steps.

Get your endpoint (optional)

If your bucket is not created in us-geo location, then select the endpoint for the corresponding region. Specify the endpoint using the endpoint parameter.

When running your application in the Streaming Analytics service (IBM Cloud), you can use private endpoints to access your bucket. Otherwise the public endpoints shall be used.

Develop your application

Configure the ObjectStorageSink/Scan/Source operators with the right parameters.  For example, the application below uses the ObjectStorageSink to write objects in Parquet format, creating a new file every timePerObject seconds.

Launch the application

Since the COS credentials are retrieved from the application configuration, you need to launch the application in distributed mode.¬†If you are not using the default endpoint,¬† you can specify the endpoint as the submission parameter “os-endpoint” in the sample application.


composite Main {
   param
      expression<rstring> $objectStorageURI: getSubmissionTimeValue("os-uri", "cos://streams-sample-001/");
      expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3-api.us-geo.objectstorage.softlayer.net");
      expression<float64> $timePerObject: 10.0; // Objects are created in parquet format after $timePerObject in seconds

   type
      S3ObjectStorageSinkOut_t = tuple<rstring objectName, uint64 size>;

   graph

      stream<rstring username, uint64 id> SampleData = Beacon() {
         param
            period: 0.1;
         output
            SampleData : username = "Test"+(rstring) IterationCount(), id = IterationCount() ;
      }

      // Operator reads IAM credentials from application configuration.
      // Ensure that cos application configuration with property cos.creds has been created.
      
      stream<S3ObjectStorageSinkOut_t> ObjStSink = com.ibm.streamsx.objectstorage::ObjectStorageSink(SampleData) {
         param
         objectStorageURI: $objectStorageURI;
         endpoint : $endpoint;
         objectName: "sample_%TIME.snappy.parquet";
         timePerObject : $timePerObject;
         storageFormat: "parquet";
         parquetCompression: "SNAPPY";
      }

      () as SampleSink = Custom(ObjStSink as I) {
         logic
         onTuple I: {
            printStringLn("Object with name '" + I.objectName + "' of size '" + (rstring)I.size + "' has been created.");
         }
      }
}

View Created Files

After running the application for a few minutes, you can view or download the created files from the IBM Cloud dashboard.

Select your Object Storage instance in your IBM Cloud dashboard and select your bucket “streams-sample-001” in the buckets menu in order to list the objects in the bucket.¬†

 

Further samples are available in the samples directory on GitHub. Have a look at the samples demonstrating how to use each of the operators and functions.

 

Join The Discussion