The following directions detail the manual installation of software into IBM Open Platform for Apache Hadoop. These directions, and any binaries that may be provided as part of this article (either hosted by IBM or otherwise), are provided for convenience and make no guarantees as to stability, performance, or functionality of the software being installed. Product support for this software will not be provided (including upgrade support for either IOP or the software described). Questions or issues encountered should be discussed on the BigInsights StackOverflow forum or the appropriate Apache Software Foundation mailing list for the component(s) covered by this article.
IBM Message Hub is a service on Bluemix that is based upon Apache Kafka. Flume 1.6.0 can support a Kafka source, sink, and channel. Can it connect to Message Hub? The answer is “yes”, and this article shows you how to use Flume in IBM Open Platform (IOP) 4.2 with Message Hub.
Prerequisites
- Create a Message Hub instance on Bluemix (https://console.ng.bluemix.net/).
- Ensure that the Flume host can connect to Message Hub brokers.
- If your IBM BigInsights cluster is an “on premises” setup, install Flume on the edge node, which can span across private and public networks.
- If you are using BigInsights 4.2 on Cloud, Flume installed on a data node can also connect to Message Hub, because data nodes use management nodes as the gateway in BigInsights on Cloud.
Running Flume agents from the Ambari web interface
The following steps are taken by the BigInsights on Cloud biadmin user.
- Download messagehub.login-1.0.0.jar from the Flume host.
wget https://github.com/ibm-messaging/message-hub-samples/raw/master/kafka-0.9/message-hub-login-library/messagehub.login-1.0.0.jar
- Run the following command to make this JAR file readable.
$ chmod 644 messagehub.login-1.0.0.jar
- Add messagehub.login-1.0.0.jar to the Flume class path. From the Ambari web interface, click the Flume “Configs” tab, expand “Advanced flume-env”, and append the following line:
export FLUME_CLASSPATH=$FLUME_CLASSPATH:/home/biadmin/messagehub.login-1.0.0.jar
- Create a file named jaas.conf with the following content. Use credentials from the VCAP_SERVICES environment variable in the Message Hub instance.
KafkaClient { com.ibm.messagehub.login.MessageHubLoginModule required serviceName="your message hub serviceName" username="your username" password="your password"; };
- Run the following command to make jaas.conf readable.
$ chmod 644 jaas.conf
- From the Ambari web interface, click the Flume “Configs” tab, expand “Advanced flume-env”, uncomment the line for JAVA_OPTS, and change it to the following text:
export JAVA_OPTS="-Xmx20m -Djava.security.auth.login.config=/home/biadmin/jaas.conf"
- Save the configuration changes, which take effect after you restart the Flume service.
- Create a Flume agent configuration for the scenario in which some data is published to Message Hub and then consumed. As an example, the following configuration includes two agents: tier1 reads data from files and puts it into Message Hub by using a Kafka sink; tier2 reads the data from Message Hub by using a Kafka source and then stores it in the HDFS. So the data flow is: spooldir->Message Hub->HDFS.
Note: Replace <your_message_hub_bootstrap_server> with your Message Hub broker server address. Under your Message Hub service, create a topic named mytopic or use an existing one.
tier2.sources = source1 tier2.channels = channel1 tier2.sinks = sink1 tier2.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier2.sources.source1.kafka.bootstrap.servers = tier2.sources.source1.kafka.topics = mytopic tier2.sources.source1.kafka.consumer.group.id = test-consumer-group tier2.sources.source1.channels = channel1 tier2.sources.source1.interceptors = i1 tier2.sources.source1.interceptors.i1.type = timestamp tier2.sources.source1.kafka.consumer.timeout.ms = 100 tier2.sources.source1.kafka.consumer.security.protocol=SASL_SSL tier2.sources.source1.kafka.consumer.ssl.protocol=TLSv1.2 tier2.sources.source1.kafka.consumer.ssl.enabled.protocols=TLSv1.2 tier2.sources.source1.kafka.consumer.ssl.truststore.location=/usr/jdk64/java-1.8.0-openjdk-1.8.0.45-28.b13.el6_6.x86_64/jre/lib/security/cacerts tier2.sources.source1.kafka.consumer.ssl.truststore.password=changeit tier2.sources.source1.kafka.consumer.ssl.truststore.type=JKS tier2.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS tier2.sources.source1.kafka.consumer.sasl.mechanism=PLAIN tier2.channels.channel1.type = memory tier2.channels.channel1.capacity = 10000 tier2.channels.channel1.transactionCapacity = 1000 tier2.sinks.sink1.type = hdfs tier2.sinks.sink1.hdfs.path = /tmp/flume/mhub/ tier2.sinks.sink1.hdfs.rollInterval = 5 tier2.sinks.sink1.hdfs.rollSize = 0 tier2.sinks.sink1.hdfs.rollCount = 0 tier2.sinks.sink1.hdfs.fileType = DataStream tier2.sinks.sink1.hdfs.kerberosPrincipal = [email protected] tier2.sinks.sink1.hdfs.kerberosKeytab = /home/biadmin/biadmin.keytab tier2.sinks.sink1.channel = channel1 tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.sources.source1.type = spooldir tier1.sources.source1.spoolDir =/home/flume/test tier1.sources.source1.fileHeader = false tier1.sources.source1.channels = channel1 tier1.channels.channel1.type = memory tier1.channels.channel1.capacity = 10000 tier1.channels.channel1.transactionCapacity = 1000 tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink tier1.sinks.sink1.kafka.topic = mytopic tier1.sinks.sink1.channel = channel1 tier1.sinks.sink1.kafka.flumeBatchSize = 20 tier1.sinks.sink1.kafka.bootstrap.servers = tier1.sinks.sink1.kafka.producer.acks = 1 tier1.sinks.sink1.kafka.producer.linger.ms = 1 tier1.sinks.sink1.kafka.producer.compression.type = snappy tier1.sinks.sink1.kafka.producer.timeout.ms = 10000 tier1.sinks.sink1.kafka.producer.security.protocol=SASL_SSL tier1.sinks.sink1.kafka.producer.ssl.protocol=TLSv1.2 tier1.sinks.sink1.kafka.producer.ssl.enabled.protocols=TLSv1.2 tier1.sinks.sink1.kafka.producer.ssl.truststore.location=/usr/jdk64/java-1.8.0-openjdk-1.8.0.45-28.b13.el6_6.x86_64/jre/lib/security/cacerts tier1.sinks.sink1.kafka.producer.ssl.truststore.password=changeit tier1.sinks.sink1.kafka.producer.ssl.truststore.type=JKS tier1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm=HTTPS tier1.sinks.sink1.kafka.producer.sasl.mechanism=PLAIN
- From the Ambari web interface, click the Flume “Configs” tab, expand “flume.conf”, and input the agent configuration details from the previous step. Save the changes, then restart the Flume service to start agents tier1 and tier2.
- Test the agents. Create a file named text6.txt in the spooling directory.
$ echo "testing for mhub" >> text6.txt
The data in text6.txt will be published to the Message Hub by agent tier1. Agent tier2 will get the data and push it to the HDFS. Verify that the destination HDFS file is generated, as shown in the following example:
$ hadoop fs -cat /tmp/flume/mhub/FlumeData.1464317463256 testing for mhub
- Look at the Flume log files if you encounter an error or get unexpected results. The following example log snippets are from the previous example, which completed successfully.
/var/log/flume/flume-tier1.log:
... 27 May 2016 02:51:02,510 INFO [pool-5-thread-1] (org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile:348) - Preparing to move file /home/flume/test/text6.txt to /home/flume/test/text6.txt.COMPLETED 27 May 2016 02:51:59,400 INFO [pool-6-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.run:143) - Attributes for component SOURCE.source1 27 May 2016 02:51:59,401 INFO [pool-6-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.processComponentAttributes:163) - EventReceivedCount = 2 27 May 2016 02:51:59,401 INFO [pool-6-thread-1] ...
/var/log/flume/flume-tier2.log:
... 27 May 2016 02:50:45,227 INFO [pool-5-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.processComponentAttributes:163) - StartTime = 1464316973348 27 May 2016 02:50:45,227 INFO [pool-5-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.processComponentAttributes:163) - EventPutAttemptCount = 1 27 May 2016 02:50:45,227 INFO [pool-5-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.processComponentAttributes:163) - EventPutSuccessCount = 1 27 May 2016 02:50:45,228 INFO [pool-5-thread-1] (org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink$TimelineMetricsCollector.processComponentAttributes:163) - StopTime = 0 27 May 2016 02:51:03,255 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSDataStream.configure:58) - Serializer = TEXT, UseRawLocalFileSystem = false 27 May 2016 02:51:03,291 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.BucketWriter.open:234) - Creating /tmp/flume/mhub//FlumeData.1464317463256.tmp 27 May 2016 02:51:08,372 INFO [hdfs-sink1-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter.close:363) - Closing /tmp/flume/mhub//FlumeData.1464317463256.tmp
Running Flume agents with Message Hub from a shell script
- Log in to the Flume node. Within the login user’s home directory, you can create a subdirectory called flume_messagehub and then navigate to this directory.
$ mkdir flume_messagehub $ cd flume_messagehub
- Download the messagehub.login-1.0.0.jar file.
wget https://github.com/ibm-messaging/message-hub-samples/raw/master/kafka-0.9/message-hub-login-library/messagehub.login-1.0.0.jar
- Create a file named jaas.conf with the following content. Specify your Bluemix Message Hub service credentials.
KafkaClient { com.ibm.messagehub.login.MessageHubLoginModule required serviceName="your message hub serviceName" username="your username" password="your password"; };
- Create a file named flume-env.sh with the following content. Replace <login_user_name> with your login user name.
export JAVA_HOME=/usr/jdk64/java-1.8.0-openjdk-1.8.0.77-0.b03.el6_7.x86_64 export JAVA_OPTS="-Xmx20m -Djava.security.auth.login.config=/home/<login_user_name>/flume_messagehub/jaas.conf" export FLUME_CLASSPATH=$FLUME_CLASSPATH:/home/<login_user_name>/flume_messagehub/messagehub.login-1.0.0.jar
- Create Flume agent configuration files with the same content as those in Step 8 in the previous section.
Create a file named tier1 with the configuration for agent tier1. Create a file named tier2 with the configuration for agent tier2.
In this case, the spooling directory is as follows:tier1.sources.source1.spoolDir =/home//flume_messagehub/test
- Test the agents as described in Step 10 in the previous section.
- Create a directory named test in the current working directory (that is, ~/flume_messagehub) and navigate to it.
$ mkdir test $ cd test
- Create a test file named testfile1 within that directory that contains the following text:
Testing Message Hub hello messagehub
- Create a directory named test in the current working directory (that is, ~/flume_messagehub) and navigate to it.
- Navigate to the flume_messagehub directory.
$ cd ~/flume_messagehub
- Start the Flume agent tier1.
flume-ng agent -f tier1 -n tier1 --conf . -Dflume.root.logger=INFO,console
- Open another session and start agent tier2.
flume-ng agent -f tier2 -n tier2 --conf . -Dflume.root.logger=INFO,console
- Verify that the data that was published by agent tier1 to Message Hub is fetched by agent tier2 and placed inside an HDFS path.
$ hadoop fs -cat /tmp/flume/mhub
Alternatively, you can log in to the Ambari web console and use the File Browser to verify the existence of files in the /tmp/flume/mhub/ directory.