This article demonstrates how to use some of the features from the RxJava library with WebSphere Liberty, while taking advantage of Java EE 7 Concurrent APIs, WebSocket and Java SE 8 lambdas.

One of the top programming models today is called reactive programming, focusing on data flows and propagation of change. The Java language itself has reactive interfaces such as java.util.Observable (and the java.util.stream package in Java 8), but they lack the capabilities to efficiently manipulate highly concurrent streams. Enter ReactiveX, a library for composing asynchronous and event-based programs by using observable sequences, and RxJava, the Java implementation of ReactiveX.

This article demonstrates how to use some of the features from the RxJava library with WebSphere Liberty, while taking advantage of Java EE 7 Concurrent APIs, WebSockets, and Java SE 8 lambdas!

The full source code referenced from this article is available from the RxJava sample’s GitHub repository.

The first step in this article is to download WebSphere Liberty. You’ll need the concurrent-1.0, cdi-1.2, and websocket-1.1 features for this article. If your runtime does not have those you can use the installUtility command to install them.

The second step is optional, but highly recommended: download the WebSphere Developer Tools (WDT) and use them to create a new Dynamic project. You can create the project using another tool too but, as you’ll see, WDT is very helpful in linking the code to the appropriate Concurrent and WebSocket spec JARs from Liberty, and allows you to deploy the application into the runtime very easily from within the IDE.

As a preview, here’s a diagram showing the architecture of this application environment:


architecture_overview

We will create a single file, a WebSocket endpoint, which uses RxJava APIs. The first step is to add the RxJava jar as a dependent jar.

I named my application RxJavaApp, so the project initially looks like this:


package_explorer

In this application, we’ll simulate a Vacation Destination database. Clients type in a list of destinations and the server-side will return information such as the average temperature, the last five reviewers, and the latest vacation ranking. The client and server communicate over a WebSocket, and the server-side accomplishes the task by using RxJava APIs, together with Java SE 8 Lambdas and Java EE 7 Concurrency support.

We’ll start the server-side file by declaring a simple WebSocket endpoint using the Java EE 7 javax.websocket.server.ServerEndpoint annotation. For this sample we want to have our WebSocket available at /RxJavaApp, so we set our endpoint annotation value as /:

@ServerEndpoint("/")
public class RxJavaSocket {

Next, we need to get some concurrency resources injected, which we will use later with RxJava APIs. In this sample we’re going to use javax.enterprise.concurrent.ManagedScheduledExecutorService objects, but you could also use other Java EE 7 java.util.concurrent.Executor implementations instead. These executor services allow you to create and manage asynchronous tasks:

    @Resource
    private ManagedScheduledExecutorService subscriberExecutor;
    
    @Resource
    private ManagedScheduledExecutorService observerExecutor;

For simplicity, we’ll assume just a single javax.websocket.Session, so we’ll keep a single observable field that will keep track of the on-going list of destinations:

	private Observable<DestinationInfo> mainObservable;

If you wanted to extend this sample to support multiple sessions at a time, you could use a map such as:

    private Map<Session, Observable<DestinationInfo>> observables;

If you choose to keep a map of rx.Observable you will need to populate this map from the @OnOpen method. For our sample, we just assume the same session is calling us, so we don’t need to worry about keeping track of it.

You’ll notice that the code referenced a POJO called DestinationInfo, which is a simple container of the information pertaining to a particular destination:

public class DestinationInfo {
    private String destination;
    private int averageTemp;
    private String last5Reviews;
    private int ranking;
    // ...getters and setters...
}

With some of boilerplate out of the way, let’s start to handle the request! We will accept a message string that contains either a single destination or multiple destinations separated by commas. So we start by splitting the message and setting up the rx.Scheduler objects. These rx.Scheduler instances will use the Java EE 7 ManagedScheduledExecutorService that Liberty injected into our code to carry out the work from the RxJava APIs in separate threads. As you can guess, we will separate the reactive observer work from the reactive subscriber work:

    @OnMessage
    public void processDestinations(String message, Session session) {
    	String[] destinations = message.split(",");
    	Scheduler observeOnScheduler = Schedulers.from(observerExecutor);
    	Scheduler subscribeOnScheduler = Schedulers.from(subscriberExecutor);

The next step is to create an rx.Observable object from our array of destinations and apply a transformation to that resulting observable. It is a common pattern in Reactive Programming to chain an observable’s result with other operations that will transform the data and create a new observable over the new transformed data.

That’s exactly what we’re going to do: we will transform each destination java.lang.String, which was emitted by the Observable<String> that we created from the array, into our DestinationInfo POJO using the map operator. We will use the observeOn operation to perform this work using the observer’s custom scheduler, which is delegating the asynchronous task to our Java EE 7 executor under the covers.

You’ll also notice that we are using Java 8’s lambda support, as input to the map operator. Java 8 lambdas allows you to pass a single-method interface implementation without all the usual verbose code that would otherwise accompany it. It fits RxJava like a glove by keeping the code down to a minimum while easily readable and very similar to other reactive implementations.

The chained operations described above can be seen here:

    	//Map incoming destination strings into Destination objects, which are observed on the specific scheduler
    	Observable<DestinationInfo> newDestinations = Observable.from(destinations)
  			  											.map(destination -> new DestinationInfo(destination))
  			  											.observeOn(observeOnScheduler);

The result is a new Observable<DestinationInfo> which subscribers can use to work with each item, as a DestinationInfo POJO instead of a simple java.lang.String. This is perhaps one of the most used features in RxJava, since it lets you transform a stream of data into another type of stream.

Here’s a diagram demonstrating what just happened:


first_phase

Now that we have an observable that we can work with, we will merge it with any current on-going observables that we already have for this user. This is accomplished by using the concatWith operator, which allows you to easily merge two observables:

    	if (mainObservable == null) {
    		mainObservable = newDestinations;
    	} else {
    		mainObservable = mainObservable.concatWith(newDestinations);  
    	}

Another useful feature in RxJava is the ability to sort a stream of data, based on any criteria. In our case we will sort the stream of DestinationInfo objects using the destination name as the criteria for the toSortedList operator, so we create an alphabetically ordered list. You will notice that the output from toSortedList is an Observable<List<DestinationInfo>>, so we will need to flatten that list back into the single-stream Observable<DestinationInfo> that we can work with. This is accomplished by using the flatMap operator, which flattens a list of observables into a single stream:

    	mainObservable = mainObservable.toSortedList((destination1, destination2) -> destination1.getDestination().compareToIgnoreCase(destination2.getDestination()))  //sort in alphabetical order
		  .flatMap(myDestinations -> Observable.from(myDestinations))
		  .subscribeOn(subscribeOnScheduler);

The last step of the chained statement above sets up a custom scheduler for the subscribers, which is used to fetch the destination results and send the results asynchronously back to the client via the WebSocket. The code below will call methods to fetch the ranking, the average temperature, and the last 5 reviews of the destination. This is where your custom code could make a call out to a database (local or remote) to calculate the results, but for this sample we are leaving the implementations of these database-related methods as abstract:

    	mainObservable.subscribe(destinationInfo -> sendText("Destination: " + destinationInfo.getDestination() + " | Ranking: " + destinationInfo.getRanking()));
    	mainObservable.subscribe(destinationInfo -> sendText("Destination: " + destinationInfo.getDestination() + " | Average Temp: " + destinationInfo.getAverageTemp()));
    	mainObservable.subscribe(destinationInfo -> sendText("Destination: " + destinationInfo.getDestination() + " | Last 5 Reviews: " + destinationInfo.getLast5Reviews()));

The lambda in each subscriber uses the method sendText, which simply relays the results back to the client:

	private void sendText(String string)  {
		session.getAsyncRemote().sendText(string);
	}

The following diagram illustrates the steps above:


second_phase

Now that our server-side code is complete, let’s deploy this into WebSphere Liberty! If you chose to use WDT you can simply point to an existing Liberty installation and create a new server. As mentioned at the beginning of the article, the features you’ll need in your server.xml are:

    <featureManager>
        <feature>websocket-1.1</feature>
        <feature>concurrent-1.0</feature>
        <feature>cdi-1.2</feature>
    </featureManager>

With WDT you can simply right-click on the project and choose Run As > Run on Server. This automatically deploys the application into Liberty. Alternatively, if you’re not using WDT, you can simply package your application and copy it into the dropins folder of your Liberty server.

After starting the server you will see a message on console.log similar to this:

[AUDIT ] CWWKT0016I: Web application available (default_host): http://localhost:9080/RxJavaApp/

You can now use any WebSocket client you prefer to make the invocations. For this article I am using Chrome’s WebSocket extension application, which I used to connect to the corresponding WebSocket address ws://localhost:9080/RxJavaApp. Here’s a screenshot of the initial state of the client:


websocket_client

My first query string is Punta Cana,Rio de Janeiro, for which I received the response:

Destination: Punta Cana | Average Temp: 28
Destination: Punta Cana | Ranking: 3
Destination: Rio de Janeiro | Ranking: 1
Destination: Punta Cana | Last 5 Reviews: 
- Pool activities were ok. 
- Long ride from airport to the resort. 
- Nice clean beach. 
- Great couples trip. 
- Snorkeling was fun. 
Destination: Rio de Janeiro | Last 5 Reviews: 
- Great trip and food. 
- Very hot weather, drink lots of water. 
- I recommend staying close to the beach. 
- The hotel ABC could use an upgrade. 
- Boat trip was great.
Destination: Rio de Janeiro | Average Temp: 35

As you can see, I received the responses asynchronously, as each rx.Subscriber finished its processing at a different time.

Using the same opened WebSocket session, I then made another call to the client using the string Hawaii,Varadero and the response was:

Destination: Hawaii | Ranking: 5
Destination: Punta Cana | Ranking: 3
Destination: Rio de Janeiro | Ranking: 1
Destination: Varadero | Ranking: 4
Destination: Hawaii | Last 5 Reviews: 
- Food was very diversed! 
- I recommend hotel XYZ for families. 
- Airport is close to all resorts. 
- Great family trip. 
- Weather was very hot.
Destination: Punta Cana | Last 5 Reviews: 
- Pool activities were ok. 
- Long ride from airport to the resort. 
- Nice clean beach. 
- Great couples trip. 
- Snorkeling was fun. 
Destination: Rio de Janeiro | Last 5 Reviews: 
- Great trip and food. 
- Very hot weather, drink lots of water. 
- I recommend staying close to the beach. 
- The hotel ABC could use an upgrade. 
- Boat trip was great.
Destination: Varadero | Last 5 Reviews: 
- Food was ok. 
- Blue ocean and white sand! 
- Lost my wallet in the trip! 
- I recommend the local market. 
- Bus ride to airport was fun.
Destination: Hawaii | Average Temp: 27
Destination: Punta Cana | Average Temp: 28
Destination: Rio de Janeiro | Average Temp: 35
Destination: Varadero | Average Temp: 29

As you can see, the two rx.Observable sets were concatenated and the running result displayed. The final string I add is Cancun and receive the response:

Destination: Cancun | Average Temp: 30
Destination: Hawaii | Average Temp: 27
Destination: Punta Cana | Average Temp: 28
Destination: Rio de Janeiro | Average Temp: 35
Destination: Varadero | Average Temp: 29
Destination: Cancun | Last 5 Reviews: 
- Food was outstanding! 
- I recommend hotel ABC for a short stay. 
- Beach had a lot of seaweeds. 
- Great family trip. 
- Rained a few days, but activities were good.
Destination: Hawaii | Last 5 Reviews: 
- Food was very diversed! 
- I recommend hotel XYZ for families. 
- Airport is close to all resorts. 
- Great family trip. 
- Weather was very hot.
Destination: Punta Cana | Last 5 Reviews: 
- Pool activities were ok. 
- Long ride from airport to the resort. 
- Nice clean beach. 
- Great couples trip. 
- Snorkeling was fun. 
Destination: Rio de Janeiro | Last 5 Reviews: 
- Great trip and food. 
- Very hot weather, drink lots of water. 
- I recommend staying close to the beach. 
- The hotel ABC could use an upgrade. 
- Boat trip was great.
Destination: Varadero | Last 5 Reviews: 
- Food was ok. 
- Blue ocean and white sand! 
- Lost my wallet in the trip! 
- I recommend the local market. 
- Bus ride to airport was fun.
Destination: Cancun | Ranking: 2
Destination: Hawaii | Ranking: 5
Destination: Punta Cana | Ranking: 3
Destination: Rio de Janeiro | Ranking: 1
Destination: Varadero | Ranking: 4

The above code illustrates that I can keep adding destinations that I am considering and the server keeps aggregating the results, which are obtained asynchronously and manipulated as RxJava streams. As we have seen, the real work being performed by the various RxJava operators is being done in separate rx.Scheduler instances, which are backed by the Java EE 7 executors that were injected from Liberty!

This concludes our introduction to RxJava operations and how you can use them in your existing Java EE 7 applications. I hope it was also clear that Java 8 can simplify your reactive-based code by easily fitting into a chain of RxJava observables, and that WebSphere Liberty provides a powerful (yet easy-to-use) platform for building your reactive applications.

What other use cases do you see for RxJava? Can you fit this into Acme Air? Please comment below!

Join The Discussion

Your email address will not be published. Required fields are marked *