Introduction

This unit covers reactivity in depth by exploring the overall goal of responsiveness and how reactive architectures are well-suited to build elastic, self-healing systems. Understanding concurrency and parallelism will help us to make sense of how Play and Lagom systems are coded, specifically, why CompletionStage<T> is a return type in almost all of our methods and how it works.

The concepts of time and space are at the heart of a reactive system. Strategically leveraging concurrency, parallelism, distribution, and location transparency are the keys to unlocking the full potential of our software. Asynchrony will be the focus of this unit, as once an asynchronous style of programming is well understood, we can then progress into more advanced topics through the rest of the series, such as command sourcing, event sourcing, and persistent entities.

Let’s begin by covering the concepts of concurrency and parallelism before we tie these concepts into Reactive Stock Trader integration and architectural patterns.

Concurrency and parallelism

Concurrency is when multiple processes make progress within overlapping periods of time. A single dealer shuffling a single deck of cards and dealing them into two separate piles is an example of concurrency. Both piles of cards grow larger in near real time, but there is still only one card being placed on one pile at a time. Concurrency can give us the illusion of parallelism as two process make reasonable progress in near real time, although only one operation is happening at any given time.

alt

Parallelism is when multiple processes progress simultaneously. To achieve parallelism in our card shuffling example above, we need to add a second dealer and a second deck of cards. Now, we can have two decks being shuffled by two dealers into two separate piles, completely in parallel.

Each pile of cards grows at a rate independent from each other. Any type of failure is isolated, such as one of the dealers dropping their deck or otherwise messing up the reshuffling process. The performance and success of one dealer does not affect the other dealer.

alt

This frames the discussion, which we should keep in mind as we progress through the material to come. Let’s now cover three different modes of concurrency, from “low level” to “higher levels” of abstraction:

  • Threads
  • Promises and futures
  • Actors

Our goal is to leverage these concepts to take advantage of system resources available to us, from multi-core CPUs, to thousands of processors spread across a cluster.

Threads

Multi-core computing in Java through the direct use of threads requires quite a bit of knowledge of the details of how concurrency works on the JVM. It’s easy to introduce race conditions and deadlocks with even the smallest mistake. Two threads can change the contents of the same location in memory, and if code is not “thread safe,” the state of our application can be corrupted.

Multithreading becomes even more complex on a multi-core computer. Parallel access to the same state in-memory by multiple threads must be strictly controlled by locking on that state and manipulating the state sequentially. This abstraction is referred to as thread safety. Thread safety is typically enforced by programmers in Java using the synchronized keyword.

Programming against threads should be isolated to library-level code and is not recommended for typical business application development.

Promises and futures

Java 7 introduced an easier way to implement concurrent applications in the form of promises and futures, which were introduced as part of the ForkJoin Framework (JSR-166). These abstractions were significantly refined in Java 8 and are now considered essential techniques for anyone tackling multi-core complexity on the JVM.

Many modern Java frameworks expose promises and futures through their own APIs; for example, Spring and EJB 3.1 have AsyncResult, while Play and Lagom use CompletionStage<T> for asynchronous computation in Java and Future[T] in Scala. All modern cloud-native frameworks encourage asynchronous computation to make the most of the resources available.

Actors

Actors are a concurrency primitive with a few simple properties. For now, think of an actor as a lightweight thread. However, in order to avoid locks, actors can only communicate with each other via messaging. This means that actors can’t directly access or mutate the content of each others’ memory. The only way an actor will ever change state is during the processing of a message.

With this in mind, an actor can only do a few things:

  • Send messages to other actors
  • Create new actors
  • Change behavior for how it responds to future messages
  • Change its own state

That’s it! Actors are a simple primitive for asynchronous message passing. Don’t let the simplicity fool you, though. Actors are the perfect model to implement reactive, cloud-native systems with.

The actor model itself was created in 1973 by Carl Hewitt, whereas implementations such as Erlang and Akka came later. Actor-based tools add additional functionality to actor model itself, which make it easier to build the modern systems of today. Actor-based technologies such as Erlang and Akka add their own twists to the actor model.

alt

Akka is a concurrency/parallelism toolkit that supports Java and Scala, and executes on the JVM, which makes it a little more accessible to developers who are already familiar with Java or Scala.

The main principle of Lagom is to bring a higher-level abstraction on top of Akka to make it more simple to develop reactive microservices with. Specifically, Lagom makes it much easier to implement systems based around event sourcing. Lagom provides an easy-to-use API purposefully built for event-sourced systems, while Akka handles all of the low-level considerations, like messaging, journalling, sharding, and clustering.

(We will dive much deeper into what all of this means in the next unit of this series; coming soon.)

Asynchronous boundaries

Let’s circle back to why we need to achieve asynchrony. In order to do this, we’ll revisit our original example of card dealers in Las Vegas.

Isolation between the card dealers is like an “invisible boundary” in both time and space. In distributed systems, this is sometimes referred to as an asynchronous boundary.

If you were paying particular attention to our example of parallelism, you’ll notice that one of the dealers paused to light up a cigarette in the middle of shuffling the deck of cards. This distraction did not affect the other dealer whose pile of cards continued to grow at a steady rate.

Imagine we have a single service with exactly ten operations to complete, and each operation is self-contained. A synchronous service would execute one operation after the other, with the total completion time being the total length of time of all operations. An asynchronous service can execute each operation simultaneously and combine the result for a total execution time, equalling only the longest running operations plus whatever amount of time it takes to combine the results.

alt

Building a reactive system “all the way down” using CompletionStage<T> (Java) or Future[T] (Scala) allows the runtime to parallelize operations wherever possible and to join the results when all operations have completed. (For more details about the inner workings of reactivity in Play, see the blog post “What Makes the Play Framework Fast?“)

The reason this is such an important topic is that asynchronous programming is at the core of a reactive system. Without a thorough grounding in the concepts we’ve outlined above, it’s easy to get lost in syntactic sugar and assume that what’s happening under the hood is magic. Quite the opposite! Models like the actor model are mature and have been proven in the industry for decades. It’s hardware and network capabilities, which have only recently caught up with the introduction of turnkey cloud platforms. Cloud platforms and related technologies have made a reactive style of development feasible on a much larger scale, which is helping to improve the performance and resilience of our systems. This will only continue to improve as more developers become familiar with this style of development.

Integration patterns

Let’s recap everything we’ve covered to this point:

  • We want to base our entire system around a few building blocks: the vocabulary of our business (commands, events), the state of entities (bank accounts, portfolios, customers), and interactions between all of these components.
  • Whenever possible and appropriate, we want to leverage concurrency and parallelism to use resources as efficiently as possible.

Now that we have an idea of reactive principles and in a generalized context, along with the conceptual importance of asynchrony, let’s revisit the design work we’ve accomplished in Units 1 through 3, and expand on the Reactive Stock Trader architecture, leveraging what we’ve learned so far in this unit.

Architectural decisions

In the Reactive Stock Trader, we have three bounded contexts:

  • Portfolio
  • Broker
  • Wire transfers

The first major architectural decision we had to make is how to package and deploy each bounded context. To make Reactive Stock Trader a little easier to work with, for the purposes of a proof-of-concept, we chose to separate each bounded context into a separate module within a single git repository, rather than maintain a separate repository for each microservice.

As you explore a new business domain, it’s often easier to work with a single service, rather than a collection of microservices. Once you understand the non-functional requirements, such as uptime and latency, team requirements, and other conditions, such as the number of developers on a project, you can begin to carve out a system to microservices if it makes sense.

Also, a key benchmark of a microservice architecture is that change to one service should not require the immediate change to another service, which makes it a distributed monolith. In the proof-of-concept phases of a project, it’s difficult to make this guarantee, so it’s often a big time-saver to keep services bundled together until requirements stabilize.

With this in mind, let’s explore our package structure and definition of each bounded context.

Package structure

Our project is currently contained in a single GitHub repository which includes the UI, the BFF, and the API and implementation modules for each of the services: broker-api, broker-impl, portfolio-api, portfolio-impl, wire-transfer-api, and wire-transfer-impl.

Bounded contexts

Each bounded context is represented by the following folder structure:

├── bounded-context-api
└── bounded-context-impl

For each service, the api module produces a JAR that we can use to help us integrate as a consumer of that service. It defines the endpoints that the service provides and the data objects that it exposes as part of its external interface. The impl module contains the implementation of that service and produces the deployable JAR for the service.

├── broker-api
├── broker-impl
├── portfolio-api
├── portfolio-impl
├── wire-transfer-api
└── wire-transfer-impl

Each bounded context pair is implemented with Lagom.

Aggregates

If we dive a little bit into the portfolio-impl folder, we’ll notice the core of our business domain, namely aggregates.

src/main
├── java
│   ├── Module.java
│   └── com
│       └── redelastic
│           └── stocktrader
│               └── portfolio
│                   └── impl
│                       ├── Holdings.java
│                       ├── InsufficientFunds.java
│                       ├── InsufficientShares.java
│                       ├── PortfolioAlreadyOpened.java
│                       ├── PortfolioCommand.java
│                       ├── PortfolioEntity.java
│                       ├── PortfolioEvent.java
│                       ├── PortfolioModel.java
│                       ├── PortfolioRepository.java
│                       ├── PortfolioRepositoryImpl.java
│                       ├── PortfolioServiceImpl.java
│                       ├── PortfolioState.java
│                       └── migrations
│                           └── PortfolioEventMigration.java

We can see a number of classes that help us to define our commands, events, and state. The aggregate root itself is represented by PortfolioEntity, while the PortfolioServiceImpl is the implementation of our outward-facing API. We will cover many of the other notable classes, such as PortfolioRepositoryImpl, within the rest of this unit and subsequent units.

Shared libraries

We also have a number of other packages:

├── bff
├── commonModels
├── integration-tests
├── project
└── utils

We’ll begin by exploring each one of these packages and how it relates to the overall architecture before we finish this unit with an end-to-end code example, from UI request through completion.

  • commonModels contains DTOs (Data Transfer Objects) shared across services. When refactoring to microservices, these will be duplicated to each microservice and serialized/deserialized when passing across microservice boundaries.
  • integration-tests contains all of our integration tests that are used to verify interactions work properly between bounded contexts.
  • project contains all Lagom- and Play-specific configuration code, which we will explore in later units before covering production deployments.
  • utils contains other shared libraries between bounded contexts.

Common advice in the development of microservices is to avoid shared libraries. However, the reality of large-scale microservice development (from dozens, to hundreds of services) requires a realistic set of guidelines for shared library use. “Reducing microservice overhead with shared libraries” (CircleCI Blog, October 2017) is a good article on the subject. If you have definitive plans to isolate each bounded context as a physically separate microservice in the future, you’ll need to either duplicate all shared library code between microservices, or create versioned shared libraries that you can deploy to an artifact repository. If you would prefer to avoid the use of commonModels and utils, copying/pasting this code to each bounded context is a reasonable approach as long as you’re aware of the tradeoffs.

Backend for Frontend integration

The BFF represents our Backend for Frontend, which is implemented with Play. It has a very simple package structure, and its primary responsibility is stateless transformations, such as response aggregation, authentication, authorization, and so forth.

In our example, we are not implementing auth, but, rather, showing it as an intermediary to decouple our microservices from the API required by our UI. For instance, we may need to make compromises to the public-facing API to satisfy the requirements of the UI. We want to contain those compromises within the BFF to avoid polluting our business domain.

src/main
├── java
│   ├── CustomHttpErrorHandler.java
│   ├── FormattersModule.java
│   ├── FormattersProvider.java
│   ├── JavaJsonCustomObjectMapper.java
│   ├── Module.java
│   └── controllers
│       ├── OpenPortfolioForm.java
│       ├── PlaceOrderForm.java
│       └── PortfolioController.java
└── resources
    ├── application.conf
    └── routes

The core of our public-facing API is represented by the routes file above, which is a Play construct. This lets us define a series of REST endpoints using a fairly straightforward DSL.

For our example in this unit, we’ll walk through how to integrate all of the pieces for opening a new portfolio, placing an order, and viewing the summary of our portfolio. If you recall in the previous unit, we walked through much of this functionality in Lagom, showing exactly how to implement these operations, based on the outcome of event storming and DDD. In this unit, we will integrate this functionality all the way from the UI, down to the Lagom services. The BFF sits in the middle of these two components.

POST /portfolio controllers.PortfolioController.openPortfolio()
POST /portfolio/:portfolioId/order controllers.PortfolioController.placeOrder(portfolioId)
GET /portfolio/:portfolioId controllers.PortfolioController.getPortfolio(portfolioId)

Portfolio controller

The building blocks of a Play application are its controllers, which assign an HTTP request that matches a URI endpoint to a controller method, which we can see in the routes file above.

Each controller will contain a number of references to dependencies, such as (Lagom) services, data structures representing forms, and so forth.

Note: Another key reason to put Play in front of downstream services in Lagom is that Play has a lot of convenience handlers for form processing and other RESTful things, whereas Lagom is tailor-made for reactive microservices without assuming HTML integration.

public class PortfolioController extends Controller {
    private final PortfolioService portfolioService;
    private final Form<PlaceOrderForm> placeOrderForm;
    private final Form<OpenPortfolioForm> openPortfolioForm;

    // ...
}

You’ll notice that the service itself has no constructors or other assignments, yet the service reference is all we need to interact with Lagom. That’s because Play leverages Guice, so we can inject a reference of PortfolioService into the controller using Module. Below is an example of dependency injection using Guice in Play.

public class Module extends AbstractModule implements ServiceClientGuiceSupport {
    @Override
    protected void configure() {
        // route all paths to go through this Play BFF
        bindServiceInfo(ServiceInfo.of("web-gateway-module", ServiceAcl.path(".*")));
        bindClient(PortfolioService.class);
        bindClient(BrokerService.class);
        bind(JavaJsonCustomObjectMapper.class).asEagerSingleton();
    }
}

With the configuration above, we signal to Guice that it should inject an instance of com.redelastic.stocktrader.portfolio.api.PortfolioService wherever PortfolioService is defined. By binding the service client in this way, we’re able to interact with the API provided by the service’s API module. Behind the scenes, the framework uses a service locator to direct the service calls we make across the network to the cluster implementing that service.

Finally, we can demonstrate how to service a request from the BFF to Service. It’s as simple as calling the public API, in our case openPortfolio, along with our request.

You’ll also notice that Play has fairly straightforward form-handling built in, with the ability to bind from the request into a DTO that we can pass along to the service.

public CompletionStage<Result> openPortfolio() {
        OpenPortfolioDetails openRequest = openPortfolioForm.bindFromRequest().get().toRequest();
        return portfolioService
                .openPortfolio()
                .invoke(openRequest)
                .thenApply(Results::ok);
    }

You’ll notice a few other things that relate to the asynchronous nature of Play and Lagom that we will cover, specifically, CompletionStage<T> and .thenApply(Results::ok). In order to cover the asynchronous nature of Play, we’ll dive back into the microservice itself and show how to join up a set of async responses using allOf.

Next, we will focus on how we can weave a completely reactive service all the way down from the BFF in Play, to the service implementation in Lagom, and back up to the UI.

Integrating with Lagom

Let’s begin by covering the deeper bits of our Lagom code to get a flavor of what functionality Lagom provides us out of the box. We’ll start by explaining service descriptors before diving into implementation details with PortfolioServiceImpl.

Service descriptors

The Descriptor provided by the service interface describes how the service calls and topics are exposed externally. Each service will have a descriptor that shares the details of its public interface. In essence, this descriptor defines how service calls are mapped to HTTP endpoints, and how topics are mapped to the message broker (in other words, Kafka).

Let’s look at the Portfolio service descriptor:

  String ORDERS_TOPIC_ID = "Portfolio-OrderPlaced";

  default Descriptor descriptor() {
        return named("portfolio").withCalls(
                restCall(Method.POST, "/api/portfolio", this::openPortfolio),
                restCall(Method.POST,"/api/portfolio/:portfolioId/close", this::closePortfolio),
                restCall(Method.GET,"/api/portfolio/:portfolioId", this::getPortfolio),
                restCall(Method.POST,"/api/portfolio/:portfolioId/placeOrder", this::placeOrder)
        ).withTopics(
            topic(ORDERS_TOPIC_ID, this::orderPlaced)
        );
    }

We can see that the service is named “portfolio” (named("portfolio")), which will enable other services to find the portfolio through a service discovery mechanism.

We’ve exposed three POST operations and defined the URL path for those calls — one to open a portfolio, one to close a portfolio (in other words, “liquitate” a portfolio), and one to place an order for a stock trade.

We’ve also exposed a GET operation for the portfolio view query.

When we’re consuming Lagom services on the JVM, we can use the interface methods to consume services. This makes integration between Play and Lagom so seamless, as we witnessed in the BFF example (portfolioService.openPortfolio().invoke(openRequest).thenApply(Results::ok);).

If we want to interact with Lagom services from non-JVM-based code, we’ll need the RESTful paths above to properly construct our requests.

Now that we understand service descriptors, let’s dive into service calls, using openPortfolio() as an example.

Portfolio service

To begin, here is the code that is responsible for opening a new portfolio:

@Override
public ServiceCall<OpenPortfolioDetails, PortfolioId> openPortfolio() {
    return portfolioRepository::open;
}

The code above accepts a request of type OpenPortfolioDetails and returns a response as a promise. Think of a promise in two parts:

  • The promise itself: “I will provide a value at some point in the future.”
  • The type associated with the promise: “I will provide a PortfolioId value at some point in the future.”

Moving towards a reactive style of development can be confusing at first. We’re often not working with values directly, but rather promises of a value in the future. This is actually a very common style in many purely functional languages, such as Haskell, and is even a common practice in less-than-pure functional languages, such as JavaScript.

Rather than attempt to control operations one step at a time, instead, we’re wiring together a series of transformations that take some input and eventually produce some output. Think of an assembly line, where our code operates on data as it passes through these composed functions. The goal of this unit is to ground readers in the concept and importance of concurrency, while the rest of this series dives deeper and deeper into a reactive and functional style of programming. As you work through these examples, consider that we’re coding the instructions while letting the runtime take care of the details, such as order of operations.

Now, we need to create the portfolio and return an ID! Let’s turn our attention to the PortfolioRespository and PortfolioRepositoryImpl.

Portfolio repository

Looking at the implementation details gives us the final piece of the entire chain of interactions.

public CompletionStage<PortfolioId> open(OpenPortfolioDetails request) {
    UUID uuid = UUID.randomUUID(); // 1
    String portfolioId = uuid.toString();
    PersistentEntityRef<PortfolioCommand> ref =
        persistentEntities.refFor(PortfolioEntity.class, portfolioId); // 2
    return ref.ask(new PortfolioCommand.Open(request.getName()))
        .thenApply(done -> portfolioId); // 3
}

There are a few things happening in this piece of code, so we’ll step through each operation one by one:

  1. Generate a new random UUID. This will be the ID of the new portfolio.
  2. Get a singleton writer for a PortfolioEntity with the ID of portfolioId.
  3. Apply the command open portfolio to the persistent entity with the UUID we generated, and return a promise for the ID (CompletionStage<PortfolioId>).

Let’s walk through steps 2 and 3 in more detail, as they are the beginnings of a deeper dive into event sourcing.

Event sourcing and persistent entities

Up until now, we haven’t covered persistence, such as integrating with a database or any other mechanism of durably storing data. Lagom is based around the principles of event sourcing ,which we are going to cover in depth in the next unit (coming soon). This will explain the functionality of line 2.

PersistentEntityRef<PortfolioCommand> ref =
    persistentEntities.refFor(PortfolioEntity.class, portfolioId);

Persistent entities in Lagom represent the aggregate roots in a bounded context. The PortfolioEntity above maps directly to the Portfolio aggregate that we identified during event storming.

Summary & next steps

To recap, we’ve covered the basics of asynchrony, including concurrency and parallelism. This grounds us in the fundamentals of asynchronous programming principles, which will be necessary as we dive deeper and deeper into asynchronous flows implementing with commands and event sourcing.

To support an asynchronous style of programming, we demonstrated how to leverage UML sequence diagrams in the previous unit to provide detailed insight into a business process flow, which clearly denotes synchronous and asynchronous steps. Now that we have a more complete overview of asynchronous boundaries, we should be able to revisit those diagrams and point out the asynchronous boundaries in that flow. This will become more important in later units as we leverage those boundaries for the reasons of resilience and performance.

Finally, we covered an end-to-end implementation of the open portfolio service, all the way from the BFF layer, through the service descriptors and calls, finally pausing at persistent entities.

In the next unit of the series, we will dive deep into event sourcing with persistent entities. For now, think of the PortfolioEntity as an implementation of an aggregate root. It can be referenced by ID in order to obtain its current state, it can be issued commands, and it can be subscribed to for events. At this level of detail, we can think purely in terms of interactions, rather than implementation, namely what commands and events does the entity support.

Previous: Translate the domain model to service APIsNext: Event sourcing