Mahmoud Galal co-author

Introduction

Seamless integration between different systems has become an essential need for many customers. IBM Integration Bus (IIB), as one of the core IBM products of the integration portfolio, already provides out of the box connectivity to many systems such as SAP, Siebel and PeopleSoft by using adapter nodes.

IBM MQ is the premier messaging solution from IBM and it is optimized for use with IBM Integration Bus. IBM Integration Bus customers have automatic entitlement to use IBM MQ Advanced as a supporting program. Additionally, IBM Integration Bus connects to virtually any messaging solution.

For example, for one-time integration, you can use a Java Messaging Service (JMS). For a recurring integration, you can use user-defined nodes. The new, open and platform-independent connector framework that was announced as a new feature with IIB V10 is used for some of the new nodes in IBM Integration Bus V10, such as the MQTT and MQ nodes. The connector framework is intended to replace user-defined nodes, offering a simpler way to build reusable endpoints for on-premises systems one time, across many applications.

A connector can encapsulate complex interactions that you might otherwise achieve by using multiple message flow nodes. By using the connector framework in IBM Integration Bus, you can quickly create new connectors or source them from the IBM Business Partner community. This framework provides an easier way to implement user-defined nodes, especially where the nodes provide an interface to transport (input/output) operations.

In this article we have outlined a high level integration scenario and illustrated the steps required to create a customized module to integrate a system through IBM Integration Bus using the IBM Integration Bus connector framework. For the purposes of this scenario we’ve selected RabbitMQ, an open source messaging application, as an example of one of the many products that IBM Integration Bus V10 can connect to.

What you’ll need to build your application

·  Knowledge of and development skills for IBM Integration Bus

·  IBM Integration Bus V10 toolkit and runtime

·  Knowledge of RabbitMQ

·  RabbitMQ Java client API Java archive (JAR) file

Scenario overview

To interact with the RabbitMQ server and to send and receive messages, IBM Integration Bus must either consume or publish messages. The scenario in this tutorial demonstrates these two roles of the IBM Integration Bus connector:

·  IBM Integration Bus as a consumer. The message consumer is the receiver. It listens to the queue and consumes any message when it arrives from the subscriber to the queue as shown in the following figure.

Figure 1. Consuming the RabbitMQ queue

·  IBM Integration Bus as a publisher. Rather than sending the message to many queues, the producer sends only the message to an exchange. The exchange receives messages from producers and pushes them to different queues. This process is known as publish/subscribe. Exchanges route a message into zero or more queues based on a routing mechanism (called binding) by using a routing key. The following figure shows a message that is published to a RabbitMQ exchange.

Figure 2. Publishing to RabbitMQ exchange

The following figure shows how these two roles fit into the overall scenario that is used in this tutorial.

Figure 3. IBM Integration Bus and RabbitMQ server scenario

RabbitMQ connector for IBM Integration Bus

To achieve the required connectivity between IBM Integration Bus and the RabbitMQ server, the IBM Integration Bus V10 connector framework includes a RabbitMQ connector. The core connector interacts with the RabbitMQ server by using the Java API client for RabbitMQ.

RabbitMQ provides different clients to interact with the its own server. The connector implementation uses RabbitMQ Java client APIs. These APIs provide the following key classes and interfaces that are necessary to establish the connection:

·  Channel

·  Connection

·  ConnectionFactory

·  Consumer

The RabbitMQ connector has two nodes:

·  An input node, which receives input

·  An output node for publishing the RabbitMQ exchange. It’s based on a defined routing key. The RabbitMQ server must forward the registered queues.

Step 1: Create user-defined nodes in IBM Integration Bus

You create user-defined nodes to perform an action that IBM Integration Bus does not provide built-in nodes for. You add these nodes to the IBM Integration Toolkit to use them in your business-specific message flows.

In this step, you build two RabbitMQ interaction nodes: a consumer node and a publisher node. You add both nodes to the IBM Integration Bus toolkit message flow palette under the “WebSphere Adapters” section. You select this option, as shown in the following figure, when you create the user-defined node project

Figure 4. Creating a new user-defined node project

a. Create the RabbitMQ input node

An event-based input node is added at the start of the flow and triggers when a message is received on the defined RabbitMQ queue name.

·  Node name must be set as “ComIbmEventInput” to be identified as an event triggered input node in the IBM Integration Bus toolkit.

·  Node implementation must be selected as Java/C implementation as we are calling the Rabbitmq using the Java client.

The following figure shows how to identify the input node properties upon creation.

Figure 5. Creating the connector input node

For the input node to communicate with RabbitMQ and start consuming messages, you must define the connection properties. The following table shows the required properties that you must define at when you create the input node level.

You must also define an additional property that is hidden to the node user in the IBM Integration Bus message flow, which has the value of the connector’s implementation name for the correct reference at runtime.

Table 1. Node properties

The following figure shows the input node properties view after you create the IBM Integration Bus node.

Figure 6. Defining properties for the input node

Node terminals

The input node terminals that need to be identified are Out, Failure and Catch. No Input terminal is identified because this node is not expected to receive input from a calling message flow.

Figure 7. Defining terminals for the input node

b. Create the RabbitMQ publish node

The RabbitMQ publish node function is to receive data from the IBM Integration Bus flow through its input terminal and publishes it to the identified exchange and routing key. The following figure shows how to identify the output node properties upon creation.

·  Node name must be set as “ComIbmOutput” to be identified as an output node in the IBM Integration Bus toolkit.

·  Node implementation must be selected as Java/C implementation as we are calling RabbitMQ using the Java client.

Figure 8. Creating the connector’s output node

In order for the publish node to start publishing messages to RabbitMQ, a set of connection properties is required. The following table shows the required properties that you must identify when you create the output node.

Again, you need to define an extra property as hidden from the node user in IBM Integration Bus message flow, which has the value of the connector’s implementation name for the correct reference at runtime.

Table 2. Node properties

The following figure shows the output node properties view after you create the node.

Figure 9. Defining properties for the output node

Node terminals

The following figure shows the output node terminals that you must define. They are In, Out, Failure, and Catch.

Figure 10. Defining terminals for the output node

Step 2: Develop the connector code

You must call the Java client APIs of RabbitMQ to implement the nodes in. To begin, you develop a Java class for each node that extends the suitable connector framework class. Then, you need to pack the Java classes into a JAR for deployment to the IBM Integration Bus runtime. For more information, see Developing Connectors

RabbitMQ Input Java

Construct a connection and a channel with the RabbitMQ server. Then, set the required properties for the input node.

inFactory = new ConnectionFactory();

inFactory.setHost(hostName);
inFactory.setPort(port);
inFactory.setUsername(userName);
inFactory.setPassword(password);
inFactory.setVirtualHost(virtualHost);

inConnection = inFactory.newConnection();
inChannel = inConnection.createChannel();

After constructing the connection, the core implementation of the input node depends on creating a queuing consumer, which subscribes to the RabbitMQ input queue. These types of consumers have messages that are delivered to them by using the push API of RabbitMQ:

consumer = new QueueingConsumer(inChannel)

The RabbitMQ input node continues to listen to the input queue and waits for the next message delivery to handle:

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

Upon receiving each message, the node passes the output to the IBM Integration Bus message flow control for processing:

getCallback().processInboundData(msg, props);

RabbitMQ Publish Java

For the publish node also you need to construct a connection and a channel with the RabbitMQ server and set the required properties from the publish node identified ones.

ConnectionFactory outFactory = new ConnectionFactory();
outFactory.setHost(hostName);
outFactory.setPort(port);
outFactory.setUsername(userName);
outFactory.setPassword(password);
outFactory.setVirtualHost(virtualHost);

outConnection = outFactory.newConnection();
outChannel = outConnection.createChannel();

After constructing the connection with the RabbitMQ server, the RabbitMQ Publish node sends the message payload along with header properties, set by IBM Integration Bus message flow to the RabbitMQ defined exchange using the defined routing key.

//get the content type in case were set in the local environment tree
contentType = props.getProperty("contentType");
contentEncoding = props.getProperty("contentEncoding");

BasicProperties publishProps = new BasicProperties.Builder().contentType(contentType).contentEncoding(contentEncoding).build();
//publish the message to exchange using routing key
outChannel.basicPublish(exchangeName, routingKey, publishProps,msg);

Set the parameters

If any node property needs to be set dynamically and known only at runtime, you can override its compile time value by setting its equivalent environment variable in a compute node before the RabbitMQ Publish node. Use the following local environment tree:

SET OutputLocalEnvironment.Destination.RabbitmqConnector.Output.propertyName

To publish on the RabbitMQ exchange, you can send the parameters as needed with the message payload. For example, you can set the message content type and content encoding in a compute node that outputs to the RabbitMQ Publish node.

In this case, you must set the properties under the same ESQL environment tree:

SET OutputLocalEnvironment.Destination.RabbitmqConnector.Output.contentType = 'application/soap+xml';
SET OutputLocalEnvironment.Destination.RabbitmqConnector.Output.contentEncoding = 'UTF-8';

Recovery after connection errors between IBM Integration Bus and RabbitMQ

Using the connector framework provided logger, key errors or exceptions can be reported in the log with severity level, customized messages and error codes.

IBM Integration Bus connector framework based nodes can throw a connector exception. If an exception is thrown, the message is directed to the failure terminal. Any unhandled exceptions are directed to the RabbitMQ node catch terminal for the calling IBM Integration Bus message flow to handle them.

The RabbitMQ Java client provides a connection factory recovery mechanism that you can use in both the input and publish nodes as follows:

setAutomaticRecoveryEnabled(true);
setTopologyRecoveryEnabled(true);
setNetworkRecoveryInterval(10000);

Manage errors and exceptions

You can handle the errors and exceptions for many scenarios. Some of these errors are described in the following sections with suggested solutions.

·  Input node error handling. For input node error handling, do not stop the input queue consumer from listening to the queue when an error happens.

·  Server connection shutdown.  If the server connection is shutdown, you can create a new channel and connection as follows:

// add shutdown listener to the connection, if shutdown detected, try to reconnect
inConnection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) {
getLogger().throwing(className, null, cause);
try {
inConnection = inFactory.newConnection();
inChannel = inConnection.createChannel();
} catch (IOException | TimeoutException e) {
getLogger().logp(Level.SEVERE, className, "Run", "", e);
}
}
}

·  Shutdown signal while consuming queue. If a shutdown signal is received while consuming the input queue, before you retry to consume more messages, you can log the shutdown reason while the running thread is paused.

catch (ShutdownSignalException e) {
getLogger().logp(Level.SEVERE, className, "Run",
RabbitmqConnectorExceptionCodes.SHUTDOWN_SIGNAL_ERROR.getDesc(), e);
if (consumerThread != null) {
consumerThread.wait(10000);
}
}

·  Cancelled consumer. If the the RabbitMQ server cancels the subscribed consumer for any reason, you can create a new consumer and register it to the same input queue as shown in the following example:

consumer = new QueueingConsumer(inChannel) {
public void handleCancel(String consumerTag) throws IOException {
// consumer has been cancelled unexpectedly, creating a new one
if (inChannel != null){
consumer = new QueueingConsumer(inChannel);
}
}};

·  Publish node error handling. For publish node error handling, there can be different types of expected errors as described below.

Empty Exchange Name or Routing Key. Although the Empty Exchange Name or Routing Key properties were made mandatory on the node-level properties, you can still override them on the local environment variable to be with value null. In this case, you can create a new connector exception and log it with a customized severity level, error message, and error code.

RabbitmqConnectorException rmqE = new RabbitmqConnectorException(

RabbitmqConnectorExceptionCodes.MISSING_EXCHANGE_ROUTINGKEY);
getLogger().logp(Level.WARNING, className, "Send", "", rmqE);

Connection already closed. If the connection to RabbitMQ server is closed, as an exception, you can log it with customized severity level of error message and error code without breaking the flow.

Connection timeout or input output errors. If you experience connection timeout or input output errors, you can log them with exception details without breaking the flow.

Step 3. Deploy the RabbitMQ connector

Deploy the RabbitMQ user-defined nodes:

1.  Close any running toolkit.

2.  Copy the RabbitMQNodes_xxxx.jar plug-in JAR file into the dropin folder under your toolkit installation path in the following directory for your environment:

·  Windows, for example: C:\Program Files\IBM\IIB\10.0.1383.6\tools\dropins
·  Linux: install_dir/tools/dropins

3.  Start the toolkit by using –clean option

 

The plug-in nodes are displayed in the message flow node palette under the Websphere Adapters section. You can use these nodes in any message flow.

Figure 11. RabbitMQ nodes in IBM Integration Bus MF palette

Deploy the RabbitMQ Connector JAR file:

1.  Stop running integration node.

2.  Find the IBM Integration Bus working directory, you can find the work path by running the following command:
mqsireportbroker integrationNodeName
·  In Windows, it might be in a path similar to this example: C:\ProgramData\IBM\MQSI
·  In Linux, the path might differ. Therefore, after you run this mqsireportbroker command, go to the workpath/connectors directory.

3.  Locate the connectors folder connectors or create one if it doesn’t exist.

4.  Extract the connector Java classes JAR file, for example, the RabbitMQConnector.jar file.

5.  Add any dependency JAR file to the folder. In this case, you add the RabbitMQ-client.jar file, which has the RabbitMQ client interaction APIs. You should now have a directory similar to the following examples, depending on your environment:

·  Windows: C:\ProgramData\IBM\MQSI\connectors\RabbitMQConnector
·  Linux: workpath/connectors/ RabbitMQConnector

6.  Remove the RabbitMQConnector.jar file from parent directory.

7.  Start the integration node. To verify that the connector is installed to the IBM Integration Bus integration Node, run the following command:
mqsireportproperties <Node> -e <server> -o Connectors –r

Conclusion

This article covered the details of how to create a customized module to integrate a system such as RabbitMQ with any other system through IBM Integration Bus using IBM Integration Bus connector framework. A very similar customization based on your client needs can be created for a wide variety of different systems once you know their interaction mechanism.

 

About the authors;

Mahmoud Galal is an IBM  Certified Executive IT Specialist with the Cairo Software Services team in Egypt, he is as well an Open Group Distinguished IT Specialist. He has been in IT for 25 years, where 14 years of experience with IBM business integration portfolio. His role normally requires him to provide technical inputs at various stages of the project to ensure that the development, implementation and post-implementation are successfully executed; specializing in application development, systems analysis, business integration and IBM PureApplication

 

 

Join The Discussion

Your email address will not be published. Required fields are marked *