Accelerate the value of multicloud with collaborative DevSecOps Learn more

Develop reactive microservices with Reactive Messaging

Introduction

Many microservice-based applications have been developed using the popular RESTful approach for communication between these microservices, often referred to as imperative microservices. However, with the rise in popularity of reactive programming, many developers are transforming their applications, moving away from the imperative logic previously used to an asynchronous, non-blocking, functional style.

But it can be hard to know where to start to achieve this transformation and to enable greater responsiveness and reactivity from your applications. This article will take you through the journey from imperative to reactive and explain when you need to consider writing reactive microservices.

Why reactive?

To explore reactive’s growing popularity among developers and why so many applications are making the move to reactive, let’s take a look at a simple demo application.

This demo application consists of two microservices: service-a and service-b. Initially, they are wired together through RESTful calls, enabling one service-a endpoint to be exposed to the application’s clients, as depicted in the following figure.

Figure of two microservices connected through RESTful calls

So far, so good! However, one day service-b stops responding, blocking service-a. This leaves your application blocked and unresponsive. To counteract this, the invocation of the calls between service-a and service-b could be changed from synchronous to asynchronous, allowing service-a to perform other tasks while waiting for service-b to come back online.

From Synchronous to Asynchronous calls

Changing from synchronous to asynchronous calls is fairly straightforward. You can use CompletionStages in Java 8. However, once you’ve entered an asynchronous world, new headaches emerge. For example, Java EE context now needs to be managed. Any new threads in a thread pool won’t inherit any contexts from its parent. This is an issue because a security context, JNDI (Java Naming and Directory Interface), and CDI (Contexts and Dependency Injection) often need to be associated with any new threads assigned to your method calls. So, how can this be achieved? Fortunately, Eclipse MicroProfile has an answer to this too: MicroProfile Context Propagation.

MicroProfile Context Propagation

MicroProfile Context Propagation introduces the ManagedExecutor and ThreadContext APIs to manage the contexts of the threads, dispatched by the thread pool, managed by your application runtime. Managed executors in MicroProfile Context Propagation allow you to use completion stages that run with predictable thread contexts regardless of which thread the action ends up running on. With MicroProfile Context Propagation, the thread context is completely deterministic because context is always captured from the thread that creates the completion stage and applied when running the action. The following code listing shows example of using MicroProfile Context Propagation to propagate Security and Application Context.

@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER })
public @interface SecurityAndAppContext {}

@Produces @ApplicationScoped @SecurityAndAppContext
ManagedExecutor createExecutor(
    @ConfigProperty(name="exec1.maxAsync", defaultValue="5") Integer a,
    @ConfigProperty(name="exec1.maxQueued", defaultValue="20") Integer q) {
    return ManagedExecutor.builder()
                          .maxAsync(a)
                          .maxQueued(q)
                          .propagated(ThreadContext.SECURITY, ThreadContext.APPLICATION)
                          .cleared(ThreadContext.ALL_REMAINING)
                          .build();
}

However, transforming your application to use asynchronous calls alone is only going to be helpful if the back end is reliable. If the back end is not reliable and microservices often fail, then the asynchronous threads will become unresponsive, remaining in a hanging state, waiting for the back end. To ensure the asynchronicity of an application’s communication is effective, the resiliency of the microservices involved needs to be improved. For this, MicroProfile Fault Tolerance can be utilized.

MicroProfile Fault Tolerance

MicroProfile Fault Tolerance provides the following capabilities help ensure microservices remain resilient:

  • @Retry: Deal with the temporary network glitches
  • @CircuitBreaker: Fail fast under repeatable failures
  • @Bulkhead: Prevent one microservice bringing the whole system down
  • @Timeout: Set a time limitation on mission-critical tasks
  • @Fallback: Supply a backup plan

The following code listing demonstrates how to use @Retry and @Fallback to build a resilient microservice:

    @Retry(maxRetries = 2)
    @Fallback (applyOn={ExceptionA.class, ExceptionB.class}, skipOn=ExceptionBSub.class, fallbackMethod="fallbackForServiceB")
    publicString serviceB() {
       return nameService();
    }

    Private String fallbackForServiceB() {
        return "myFallback";
    }

By implementing asynchronous programming, many developers assume they have now enabled their applications to be non-blocking. But, in the majority of cases, it’s unfortunately not this simple. Asynchronous programming alone does not address the issue of blocking threads of execution. When a microservice within an application takes a long time to respond, the thread on which it is performing that process is blocked, waiting for the response. The more threads that are blocked, the more unresponsive an application becomes. One way to try and solve this issue is to dispatch more threads to be able to deal with the additional processes, but threads are not infinite. What happens when all of the available threads are used up? The application grinds to a halt and becomes unresponsive to the user.

With the rise of open source, many applications also make use of third-party code that the application developers may not be familiar with or know if it is non-blocking or not. This can also cause potential blockages in the flow of an application’s processes.

So, how do we truly make our applications non-blocking? For this, we need to turn to reactive. If you’re unsure of what the term “reactive” really means, check out the article “Defining the term ‘reactive’” (IBM Developer, July 2020).

Create a reactive microservice

As laid out in The Reactive Manifesto, responsive, resilient, and elastic applications are underpinned by an asynchronous, messaged-driven backbone. MicroProfile Reactive Messaging enables asynchronous message-based communication between the components of an application, providing an easy way to create reactive microservices. It offers microservices the capability to asynchronously send, receive, and process messages that are received as continuous streams of events.

MicroProfile Reactive Messaging

MiroProfile Reactive Messaging makes use of and interoperates with two other specifications:

  • Reactive Streams, for asynchronous stream processing with back pressure. It defines a minimal set of interfaces to allow components that do this sort of stream processing to be connected together.
  • MicroProfile Reactive Streams Operators, which builds on Reactive Streams to provide a set of basic operators to link different reactive components together and to perform processing on the data that passes between them.

Using MicroProfile Reactive Messaging, you can annotate application beans’ methods using the provided @Incoming and @Outgoing annotations. A method with an @Incoming annotation consumes messages from a channel. A method with an @Outgoing annotation publishes messages to a channel. A method with both an @Incoming and an @Outgoing annotation is a message processor — it consumes messages from a channel, does some transformation to them, and publishes messages to another channel.

The following code listing is an example of an @incoming annotation, where my-channel represents the channel, and the method is called for every message sent to my-channel.

@Incoming("my-channel") 
public CompletionStage<Void> consume(Message<String> message) { 
  return message.ack();
}

The following code listing is an example of an @Outgoing annotation, where my-channel is the targeted channel, and the method is called for every consumer request.

@Outgoing("my-channel")
public Message<String> publish() { 
    return Message.of("hello");  
}

You can create a plain org.eclipse.microprofile.reactive.messaging.Message using org.eclipse.microprofile.reactive.messaging.Message#of(T).

These annotated methods are then transformed into reactive streams-compatible publishers, subscribers, and processors, and are connected together using channels. A channel is a name indicating which source or destination of messages is used. Channels are opaque strings.

The following figure shows the annotations @Outgoing and @Incoming assigned to methods A, B, and C, and how they are connected together using channels (in this case “order” and “status”).

Figure showing annotations connected together using channels

There are two types of channels:

  • Internal channels are local to the application. They enable multi-step processing, where several beans from the same application form a chain of processing (as shown in the above image).
  • External channels connect to remote brokers or message transport layers, such as Apache Kafka. External channels are managed by connectors using the Connector API.

Connectors are extensions that manage the communication with a specific transport technology. Most implementations of MicroProfile Reactive Messaging will include pre-configured connectors for some of the most popular and commonly used remote brokers, like Apache Kafka. However, you can also create your own connectors, as the Reactive Messaging specification provides an SPI to implement connectors. This way, MicroProfile Reactive Messaging does not limit which messaging brokers you use. Open Liberty supports Kafka-based messaging transmission.

Mapping a specific channel to a remote sink or source of messages is configured in the application configuration. Note that an implementation may provide various ways to configure the mapping, but support for MicroProfile Config as a configuration source is mandatory. In Open Liberty, the configuration properties can be set anywhere that’s read by MicroProfile Config. For example, as system properties in Open Liberty’s bootstrap.properties file or environment variables in Open Liberty’s server.env file, as well as other custom config sources.

Implementing MicroProfile Reactive Messaging in an application

To help show a realistic example of MicroProfile Reactive Messaging being used, we’ve created a small demo application consisting of two microservices — reactive-service-a and reactive-service-b — connected using Kafka.

In this example application, reactive-service-a acts as a publisher, publishing messages to the channel initial-prices. The following code listing is from the ProducerBean.java class in reactive-service-a, which shows the @outgoing annotation, the channel this service is publishing to (initial-prices), and what will be in each method.

@ApplicationScoped
public class ProducerBean{

  Random random= new Random();
  @Outgoing("initial-prices")
  publicFlowable<Integer> generatePrices() {
      System.out.println("Calling generatePrice()");
      return Flowable.interval(5, TimeUnit.SECONDS)
          .map(tick ->
              {
                int price = random.nextInt(1000);
                System.out.println("Generating price: " + price);
                return price;

              });
  }

}

The channel initial-prices has been instantiated in the microprofile-config.properties file, as shown in the following code listing. This channel has been mapped to the Kafka topic topic1.

mp.messaging.outgoing.initial-prices.connector=liberty-kafka
mp-messaging.outgoing.initial-prices.topic=topic1

Reactive-service-b is a consumer and consumes the messages produced by reactive-service-a. In the following code listing you can see the use of the @Incoming annotation to consume messages from the channel prices.

@ApplicationScoped
public class KafkaConsumer {


    @Incoming ("prices")
    public void consume(int price) {
        System.out.println("Consumer recieved: " + price + " @" + System.currentTimeMillis());
    }

}

Note, though, that in this KafkaConsumer.java class of reactive-service-b, the channel name is now prices, instead of initial-prices used in reactive-service-a. This is because in reactive-service-b’s microprofile-config.properties file, the same Kafka topic (topic1) is now mapped to a channel called prices. This helps to demonstrate how the two microservices are loosely decoupled and can use completely different channel names to map to the same Kafka topic, as they are simply just opaque strings. This way, if different teams design, build, and manage the two microservices, they do not have to use the same channel names.

Acknowledgement

This article is based on a conference presentation by Emily Jiang and Clement Escoffier, entitled “Reactive microservices in action.” You can download the presentation slides from the conference website.

Next steps

  • Learn more about reactive systems. Download the free ebook Reactive Systems explained (O’Reilly), or check out “Getting started with Reactive Systems” (IBM Developer, April 2020).

  • Creating reactive Java microservices” (Open Liberty guide): Get hands-on and explore how to transform your own applications to be more reactive. This guide details how to use MicroProfile Reactive Messaging to create a reactive microservice.

  • Start building a reactive system using supported runtimes for your enterprise applications. There are several options, including reactive APIs from OpenLiberty and MicroProfile, as well as Vert.x and Quarkus through Red Hat Runtimes. These are all offered as part of IBM Cloud Pak for Applications.

  • Architect event-driven applications with IBM Event Streams, which is a fully-supported event-streaming platform built on open-source Apache Kafka, designed to simplify the automation of mission-critical workloads. Using IBM Event Streams, organizations can quickly deploy enterprise-grade event-streaming technology.