The following directions detail the manual installation of software into IBM Open Platform for Apache Hadoop. These directions, and any binaries that may be provided as part of this article (either hosted by IBM or otherwise), are provided for convenience and make no guarantees as to stability, performance, or functionality of the software being installed. Product support for this software will not be provided (including upgrade support for either IOP or the software described). Questions or issues encountered should be discussed on the BigInsights StackOverflow forum or the appropriate Apache Software Foundation mailing list for the component(s) covered by this article.

IBM Message Hub is a service on Bluemix that is based upon Apache Kafka. Flume 1.6.0 can support a Kafka source, sink, and channel. Can it connect to Message Hub? The answer is “yes”, and this article shows you how to use Flume in IBM Open Platform (IOP) 4.2 with Message Hub.

Prerequisites

  1. Create a Message Hub instance on Bluemix (https://console.ng.bluemix.net/).
  2. Ensure that the Flume host can connect to Message Hub brokers.
  3. If your IBM BigInsights cluster is an “on premises” setup, install Flume on the edge node, which can span across private and public networks.
  4. If you are using BigInsights 4.2 on Cloud, Flume installed on a data node can also connect to Message Hub, because data nodes use management nodes as the gateway in BigInsights on Cloud.

Running Flume agents from the Ambari web interface

The following steps are taken by the BigInsights on Cloud biadmin user.

  1. Download messagehub.login-1.0.0.jar from the Flume host.
    wget https://github.com/ibm-messaging/message-hub-samples/raw/master/kafka-0.9/message-hub-login-library/messagehub.login-1.0.0.jar
  2. Run the following command to make this JAR file readable.
    $ chmod 644 messagehub.login-1.0.0.jar
  3. Add messagehub.login-1.0.0.jar to the Flume class path. From the Ambari web interface, click the Flume “Configs” tab, expand “Advanced flume-env”, and append the following line:
    export FLUME_CLASSPATH=$FLUME_CLASSPATH:/home/biadmin/messagehub.login-1.0.0.jar
  4. Create a file named jaas.conf with the following content. Use credentials from the VCAP_SERVICES environment variable in the Message Hub instance.
    KafkaClient {
       com.ibm.messagehub.login.MessageHubLoginModule required
       serviceName="your message hub serviceName"
       username="your username"
       password="your password";
    };
  5. Run the following command to make jaas.conf readable.
    $ chmod 644 jaas.conf
  6. From the Ambari web interface, click the Flume “Configs” tab, expand “Advanced flume-env”, uncomment the line for JAVA_OPTS, and change it to the following text:
    export JAVA_OPTS="-Xmx20m -Djava.security.auth.login.config=/home/biadmin/jaas.conf"
  7. Save the configuration changes, which take effect after you restart the Flume service.
  8. Create a Flume agent configuration for the scenario in which some data is published to Message Hub and then consumed. As an example, the following configuration includes two agents: tier1 reads data from files and puts it into Message Hub by using a Kafka sink; tier2 reads the data from Message Hub by using a Kafka source and then stores it in the HDFS. So the data flow is: spooldir->Message Hub->HDFS.

    Note: Replace <your_message_hub_bootstrap_server> with your Message Hub broker server address. Under your Message Hub service, create a topic named mytopic or use an existing one.

    tier2.sources = source1
    tier2.channels = channel1
    tier2.sinks = sink1
    tier2.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
    tier2.sources.source1.kafka.bootstrap.servers = 
    tier2.sources.source1.kafka.topics = mytopic
    tier2.sources.source1.kafka.consumer.group.id = test-consumer-group
    tier2.sources.source1.channels = channel1
    tier2.sources.source1.interceptors = i1
    tier2.sources.source1.interceptors.i1.type = timestamp
    tier2.sources.source1.kafka.consumer.timeout.ms = 100
    tier2.sources.source1.kafka.consumer.security.protocol=SASL_SSL
    tier2.sources.source1.kafka.consumer.ssl.protocol=TLSv1.2
    tier2.sources.source1.kafka.consumer.ssl.enabled.protocols=TLSv1.2
    tier2.sources.source1.kafka.consumer.ssl.truststore.location=/usr/jdk64/java-1.8.0-openjdk-1.8.0.45-28.b13.el6_6.x86_64/jre/lib/security/cacerts
    tier2.sources.source1.kafka.consumer.ssl.truststore.password=changeit
    tier2.sources.source1.kafka.consumer.ssl.truststore.type=JKS
    tier2.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS
    tier2.sources.source1.kafka.consumer.sasl.mechanism=PLAIN
    tier2.channels.channel1.type = memory
    tier2.channels.channel1.capacity = 10000
    tier2.channels.channel1.transactionCapacity = 1000
    tier2.sinks.sink1.type = hdfs
    tier2.sinks.sink1.hdfs.path = /tmp/flume/mhub/
    tier2.sinks.sink1.hdfs.rollInterval = 5
    tier2.sinks.sink1.hdfs.rollSize = 0
    tier2.sinks.sink1.hdfs.rollCount = 0
    tier2.sinks.sink1.hdfs.fileType = DataStream
    tier2.sinks.sink1.hdfs.kerberosPrincipal = biadmin@IBM.COM
    tier2.sinks.sink1.hdfs.kerberosKeytab = /home/biadmin/biadmin.keytab
    tier2.sinks.sink1.channel = channel1
    
    tier1.sources = source1
    tier1.channels = channel1
    tier1.sinks = sink1
    tier1.sources.source1.type = spooldir
    tier1.sources.source1.spoolDir =/home/flume/test
    tier1.sources.source1.fileHeader = false
    tier1.sources.source1.channels = channel1
    tier1.channels.channel1.type = memory
    tier1.channels.channel1.capacity = 10000
    tier1.channels.channel1.transactionCapacity = 1000
    tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
    tier1.sinks.sink1.kafka.topic = mytopic
    tier1.sinks.sink1.channel = channel1
    tier1.sinks.sink1.kafka.flumeBatchSize = 20
    tier1.sinks.sink1.kafka.bootstrap.servers = 
    tier1.sinks.sink1.kafka.producer.acks = 1
    tier1.sinks.sink1.kafka.producer.linger.ms = 1
    tier1.sinks.sink1.kafka.producer.compression.type = snappy
    tier1.sinks.sink1.kafka.producer.timeout.ms = 10000
    tier1.sinks.sink1.kafka.producer.security.protocol=SASL_SSL
    tier1.sinks.sink1.kafka.producer.ssl.protocol=TLSv1.2
    tier1.sinks.sink1.kafka.producer.ssl.enabled.protocols=TLSv1.2
    tier1.sinks.sink1.kafka.producer.ssl.truststore.location=/usr/jdk64/java-1.8.0-openjdk-1.8.0.45-28.b13.el6_6.x86_64/jre/lib/security/cacerts
    tier1.sinks.sink1.kafka.producer.ssl.truststore.password=changeit
    tier1.sinks.sink1.kafka.producer.ssl.truststore.type=JKS
    tier1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm=HTTPS
    tier1.sinks.sink1.kafka.producer.sasl.mechanism=PLAIN
  9. From the Ambari web interface, click the Flume “Configs” tab, expand “flume.conf”, and input the agent configuration details from the previous step. Save the changes, then restart the Flume service to start agents tier1 and tier2.
  10. Test the agents. Create a file named text6.txt in the spooling directory.
    $ echo "testing for mhub" >> text6.txt

    The data in text6.txt will be published to the Message Hub by agent tier1. Agent tier2 will get the data and push it to the HDFS. Verify that the destination HDFS file is generated, as shown in the following example:

    $ hadoop fs -cat /tmp/flume/mhub/FlumeData.1464317463256
    testing for mhub
  11. Look at the Flume log files if you encounter an error or get unexpected results. The following example log snippets are from the previous example, which completed successfully.

    /var/log/flume/flume-tier1.log:

    ...
    27 May 2016 02:51:02,510 INFO  [pool-5-thread-1] (org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile:348)  - Preparing to move file /home/flume/test/text6.txt to /home/flume/test/text6.txt.COMPLETED
    27 May 2016 02:51:59,400 INFO  [pool-6-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.run:143)  - Attributes for component SOURCE.source1
    27 May 2016 02:51:59,401 INFO  [pool-6-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.processComponentAttributes:163)  - EventReceivedCount = 2
    27 May 2016 02:51:59,401 INFO  [pool-6-thread-1] 
    ...

    /var/log/flume/flume-tier2.log:

    ...
    27 May 2016 02:50:45,227 INFO  [pool-5-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.processComponentAttributes:163)  - StartTime = 1464316973348
    27 May 2016 02:50:45,227 INFO  [pool-5-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.processComponentAttributes:163)  - EventPutAttemptCount = 1
    27 May 2016 02:50:45,227 INFO  [pool-5-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.processComponentAttributes:163)  - EventPutSuccessCount = 1
    27 May 2016 02:50:45,228 INFO  [pool-5-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.processComponentAttributes:163)  - StopTime = 0
    27 May 2016 02:51:03,255 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSDataStream.configure:58)  - Serializer = TEXT, UseRawLocalFileSystem = false
    27 May 2016 02:51:03,291 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.BucketWriter.open:234)  - Creating /tmp/flume/mhub//FlumeData.1464317463256.tmp
    27 May 2016 02:51:08,372 INFO  [hdfs-sink1-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter.close:363)  - Closing /tmp/flume/mhub//FlumeData.1464317463256.tmp
    

Running Flume agents with Message Hub from a shell script

  1. Log in to the Flume node. Within the login user’s home directory, you can create a subdirectory called flume_messagehub and then navigate to this directory.
    $ mkdir flume_messagehub
    $ cd flume_messagehub
  2. Download the messagehub.login-1.0.0.jar file.
    wget https://github.com/ibm-messaging/message-hub-samples/raw/master/kafka-0.9/message-hub-login-library/messagehub.login-1.0.0.jar
  3. Create a file named jaas.conf with the following content. Specify your Bluemix Message Hub service credentials.
    KafkaClient {
       com.ibm.messagehub.login.MessageHubLoginModule required
       serviceName="your message hub serviceName"
       username="your username"
       password="your password";
    };
  4. Create a file named flume-env.sh with the following content. Replace <login_user_name> with your login user name.
    export JAVA_HOME=/usr/jdk64/java-1.8.0-openjdk-1.8.0.77-0.b03.el6_7.x86_64
    export JAVA_OPTS="-Xmx20m -Djava.security.auth.login.config=/home/<login_user_name>/flume_messagehub/jaas.conf"
    export FLUME_CLASSPATH=$FLUME_CLASSPATH:/home/<login_user_name>/flume_messagehub/messagehub.login-1.0.0.jar
  5. Create Flume agent configuration files with the same content as those in Step 8 in the previous section.
    Create a file named tier1 with the configuration for agent tier1. Create a file named tier2 with the configuration for agent tier2.
    In this case, the spooling directory is as follows:

    tier1.sources.source1.spoolDir =/home//flume_messagehub/test
  6. Test the agents as described in Step 10 in the previous section.
    1. Create a directory named test in the current working directory (that is, ~/flume_messagehub) and navigate to it.
      $ mkdir test
      $ cd test
    2. Create a test file named testfile1 within that directory that contains the following text:
      Testing Message Hub 
      hello messagehub
  7. Navigate to the flume_messagehub directory.
    $ cd ~/flume_messagehub
  8. Start the Flume agent tier1.
    flume-ng agent -f tier1 -n tier1 --conf . -Dflume.root.logger=INFO,console
  9. Open another session and start agent tier2.
    flume-ng agent -f tier2 -n tier2 --conf . -Dflume.root.logger=INFO,console
  10. Verify that the data that was published by agent tier1 to Message Hub is fetched by agent tier2 and placed inside an HDFS path.
    $ hadoop fs -cat /tmp/flume/mhub

    Alternatively, you can log in to the Ambari web console and use the File Browser to verify the existence of files in the /tmp/flume/mhub/ directory.

1 comment on"How to use Flume in IOP with Message Hub"

  1. chrissnow May 09, 2017

    These instructions only appear to work for BigInsights on Cloud 4.2 Enterprise? Do you have instructions for BigInsights on Cloud 4.2 Basic (Paygo)?

Join The Discussion

Your email address will not be published. Required fields are marked *