IBM Support

How to use Flume in IOP with Message Hub - Hadoop Dev

Technical Blog Post


Abstract

How to use Flume in IOP with Message Hub - Hadoop Dev

Body

The following directions detail the manual installation of software into IBM Open Platform for Apache Hadoop. These directions, and any binaries that may be provided as part of this article (either hosted by IBM or otherwise), are provided for convenience and make no guarantees as to stability, performance, or functionality of the software being installed. Product support for this software will not be provided (including upgrade support for either IOP or the software described). Questions or issues encountered should be discussed on the BigInsights StackOverflow forum or the appropriate Apache Software Foundation mailing list for the component(s) covered by this article.

IBM Message Hub is a service on Bluemix that is based upon Apache Kafka. Flume 1.6.0 can support a Kafka source, sink, and channel. Can it connect to Message Hub? The answer is “yes”, and this article shows you how to use Flume in IBM Open Platform (IOP) 4.2 with Message Hub.

Prerequisites

  1. Create a Message Hub instance on Bluemix (https://console.ng.bluemix.net/).
  2. Ensure that the Flume host can connect to Message Hub brokers.
  3. If your IBM BigInsights cluster is an “on premises” setup, install Flume on the edge node, which can span across private and public networks.
  4. If you are using BigInsights 4.2 on Cloud, Flume installed on a data node can also connect to Message Hub, because data nodes use management nodes as the gateway in BigInsights on Cloud.

Running Flume agents from the Ambari web interface

The following steps are taken by the BigInsights on Cloud biadmin user.

  1. Download messagehub.login-1.0.0.jar from the Flume host.
    wget https://github.com/ibm-messaging/message-hub-samples/raw/master/kafka-0.9/message-hub-login-library/messagehub.login-1.0.0.jar
  2. Run the following command to make this JAR file readable.
    $ chmod 644 messagehub.login-1.0.0.jar
  3. Add messagehub.login-1.0.0.jar to the Flume class path. From the Ambari web interface, click the Flume “Configs” tab, expand “Advanced flume-env”, and append the following line:
    export FLUME_CLASSPATH=$FLUME_CLASSPATH:/home/biadmin/messagehub.login-1.0.0.jar
  4. Create a file named jaas.conf with the following content. Use credentials from the VCAP_SERVICES environment variable in the Message Hub instance.
    KafkaClient {     com.ibm.messagehub.login.MessageHubLoginModule required     serviceName="your message hub serviceName"     username="your username"     password="your password";  };
  5. Run the following command to make jaas.conf readable.
    $ chmod 644 jaas.conf
  6. From the Ambari web interface, click the Flume “Configs” tab, expand “Advanced flume-env”, uncomment the line for JAVA_OPTS, and change it to the following text:
    export JAVA_OPTS="-Xmx20m -Djava.security.auth.login.config=/home/biadmin/jaas.conf"
  7. Save the configuration changes, which take effect after you restart the Flume service.
  8. Create a Flume agent configuration for the scenario in which some data is published to Message Hub and then consumed. As an example, the following configuration includes two agents: tier1 reads data from files and puts it into Message Hub by using a Kafka sink; tier2 reads the data from Message Hub by using a Kafka source and then stores it in the HDFS. So the data flow is: spooldir->Message Hub->HDFS.

    Note: Replace <your_message_hub_bootstrap_server> with your Message Hub broker server address. Under your Message Hub service, create a topic named mytopic or use an existing one.

    tier2.sources = source1  tier2.channels = channel1  tier2.sinks = sink1  tier2.sources.source1.type = org.apache.flume.source.kafka.KafkaSource  tier2.sources.source1.kafka.bootstrap.servers =   tier2.sources.source1.kafka.topics = mytopic  tier2.sources.source1.kafka.consumer.group.id = test-consumer-group  tier2.sources.source1.channels = channel1  tier2.sources.source1.interceptors = i1  tier2.sources.source1.interceptors.i1.type = timestamp  tier2.sources.source1.kafka.consumer.timeout.ms = 100  tier2.sources.source1.kafka.consumer.security.protocol=SASL_SSL  tier2.sources.source1.kafka.consumer.ssl.protocol=TLSv1.2  tier2.sources.source1.kafka.consumer.ssl.enabled.protocols=TLSv1.2  tier2.sources.source1.kafka.consumer.ssl.truststore.location=/usr/jdk64/java-1.8.0-openjdk-1.8.0.45-28.b13.el6_6.x86_64/jre/lib/security/cacerts  tier2.sources.source1.kafka.consumer.ssl.truststore.password=changeit  tier2.sources.source1.kafka.consumer.ssl.truststore.type=JKS  tier2.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS  tier2.sources.source1.kafka.consumer.sasl.mechanism=PLAIN  tier2.channels.channel1.type = memory  tier2.channels.channel1.capacity = 10000  tier2.channels.channel1.transactionCapacity = 1000  tier2.sinks.sink1.type = hdfs  tier2.sinks.sink1.hdfs.path = /tmp/flume/mhub/  tier2.sinks.sink1.hdfs.rollInterval = 5  tier2.sinks.sink1.hdfs.rollSize = 0  tier2.sinks.sink1.hdfs.rollCount = 0  tier2.sinks.sink1.hdfs.fileType = DataStream  tier2.sinks.sink1.hdfs.kerberosPrincipal = [email protected]  tier2.sinks.sink1.hdfs.kerberosKeytab = /home/biadmin/biadmin.keytab  tier2.sinks.sink1.channel = channel1    tier1.sources = source1  tier1.channels = channel1  tier1.sinks = sink1  tier1.sources.source1.type = spooldir  tier1.sources.source1.spoolDir =/home/flume/test  tier1.sources.source1.fileHeader = false  tier1.sources.source1.channels = channel1  tier1.channels.channel1.type = memory  tier1.channels.channel1.capacity = 10000  tier1.channels.channel1.transactionCapacity = 1000  tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink  tier1.sinks.sink1.kafka.topic = mytopic  tier1.sinks.sink1.channel = channel1  tier1.sinks.sink1.kafka.flumeBatchSize = 20  tier1.sinks.sink1.kafka.bootstrap.servers =   tier1.sinks.sink1.kafka.producer.acks = 1  tier1.sinks.sink1.kafka.producer.linger.ms = 1  tier1.sinks.sink1.kafka.producer.compression.type = snappy  tier1.sinks.sink1.kafka.producer.timeout.ms = 10000  tier1.sinks.sink1.kafka.producer.security.protocol=SASL_SSL  tier1.sinks.sink1.kafka.producer.ssl.protocol=TLSv1.2  tier1.sinks.sink1.kafka.producer.ssl.enabled.protocols=TLSv1.2  tier1.sinks.sink1.kafka.producer.ssl.truststore.location=/usr/jdk64/java-1.8.0-openjdk-1.8.0.45-28.b13.el6_6.x86_64/jre/lib/security/cacerts  tier1.sinks.sink1.kafka.producer.ssl.truststore.password=changeit  tier1.sinks.sink1.kafka.producer.ssl.truststore.type=JKS  tier1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm=HTTPS  tier1.sinks.sink1.kafka.producer.sasl.mechanism=PLAIN
  9. From the Ambari web interface, click the Flume “Configs” tab, expand “flume.conf”, and input the agent configuration details from the previous step. Save the changes, then restart the Flume service to start agents tier1 and tier2.
  10. Test the agents. Create a file named text6.txt in the spooling directory.
    $ echo "testing for mhub" >> text6.txt

    The data in text6.txt will be published to the Message Hub by agent tier1. Agent tier2 will get the data and push it to the HDFS. Verify that the destination HDFS file is generated, as shown in the following example:

    $ hadoop fs -cat /tmp/flume/mhub/FlumeData.1464317463256  testing for mhub
  11. Look at the Flume log files if you encounter an error or get unexpected results. The following example log snippets are from the previous example, which completed successfully.

    /var/log/flume/flume-tier1.log:

    ...  27 May 2016 02:51:02,510 INFO  [pool-5-thread-1] (org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile:348)  - Preparing to move file /home/flume/test/text6.txt to /home/flume/test/text6.txt.COMPLETED  27 May 2016 02:51:59,400 INFO  [pool-6-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.run:143)  - Attributes for component SOURCE.source1  27 May 2016 02:51:59,401 INFO  [pool-6-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.processComponentAttributes:163)  - EventReceivedCount = 2  27 May 2016 02:51:59,401 INFO  [pool-6-thread-1]   ...

    /var/log/flume/flume-tier2.log:

    ...  27 May 2016 02:50:45,227 INFO  [pool-5-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.processComponentAttributes:163)  - StartTime = 1464316973348  27 May 2016 02:50:45,227 INFO  [pool-5-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.processComponentAttributes:163)  - EventPutAttemptCount = 1  27 May 2016 02:50:45,227 INFO  [pool-5-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.processComponentAttributes:163)  - EventPutSuccessCount = 1  27 May 2016 02:50:45,228 INFO  [pool-5-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.processComponentAttributes:163)  - StopTime = 0  27 May 2016 02:51:03,255 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSDataStream.configure:58)  - Serializer = TEXT, UseRawLocalFileSystem = false  27 May 2016 02:51:03,291 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.BucketWriter.open:234)  - Creating /tmp/flume/mhub//FlumeData.1464317463256.tmp  27 May 2016 02:51:08,372 INFO  [hdfs-sink1-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter.close:363)  - Closing /tmp/flume/mhub//FlumeData.1464317463256.tmp  

Running Flume agents with Message Hub from a shell script

  1. Log in to the Flume node. Within the login user’s home directory, you can create a subdirectory called flume_messagehub and then navigate to this directory.
    $ mkdir flume_messagehub  $ cd flume_messagehub
  2. Download the messagehub.login-1.0.0.jar file.
    wget https://github.com/ibm-messaging/message-hub-samples/raw/master/kafka-0.9/message-hub-login-library/messagehub.login-1.0.0.jar
  3. Create a file named jaas.conf with the following content. Specify your Bluemix Message Hub service credentials.
    KafkaClient {     com.ibm.messagehub.login.MessageHubLoginModule required     serviceName="your message hub serviceName"     username="your username"     password="your password";  };
  4. Create a file named flume-env.sh with the following content. Replace <login_user_name> with your login user name.
    export JAVA_HOME=/usr/jdk64/java-1.8.0-openjdk-1.8.0.77-0.b03.el6_7.x86_64  export JAVA_OPTS="-Xmx20m -Djava.security.auth.login.config=/home/<login_user_name>/flume_messagehub/jaas.conf"  export FLUME_CLASSPATH=$FLUME_CLASSPATH:/home/<login_user_name>/flume_messagehub/messagehub.login-1.0.0.jar
  5. Create Flume agent configuration files with the same content as those in Step 8 in the previous section.
    Create a file named tier1 with the configuration for agent tier1. Create a file named tier2 with the configuration for agent tier2.
    In this case, the spooling directory is as follows:

    tier1.sources.source1.spoolDir =/home//flume_messagehub/test
  6. Test the agents as described in Step 10 in the previous section.
    1. Create a directory named test in the current working directory (that is, ~/flume_messagehub) and navigate to it.
      $ mkdir test  $ cd test
    2. Create a test file named testfile1 within that directory that contains the following text:
      Testing Message Hub   hello messagehub
  7. Navigate to the flume_messagehub directory.
    $ cd ~/flume_messagehub
  8. Start the Flume agent tier1.
    flume-ng agent -f tier1 -n tier1 --conf . -Dflume.root.logger=INFO,console
  9. Open another session and start agent tier2.
    flume-ng agent -f tier2 -n tier2 --conf . -Dflume.root.logger=INFO,console
  10. Verify that the data that was published by agent tier1 to Message Hub is fetched by agent tier2 and placed inside an HDFS path.
    $ hadoop fs -cat /tmp/flume/mhub

    Alternatively, you can log in to the Ambari web console and use the File Browser to verify the existence of files in the /tmp/flume/mhub/ directory.

[{"Business Unit":{"code":"BU059","label":"IBM Software w\/o TPS"},"Product":{"code":"SSCRJT","label":"IBM Db2 Big SQL"},"Component":"","Platform":[{"code":"PF025","label":"Platform Independent"}],"Version":"","Edition":"","Line of Business":{"code":"LOB10","label":"Data and AI"}}]

UID

ibm16260035