This unit explores some fundamental concepts of streaming data. After completion, you will have the knowledge required to implement a full-stack reactive system, as well as a greater understanding of the deeper levels of engineering within a truly reactive system.

In the previous unit, we demonstrated reactive integration patterns with the Message Broker API. This is sufficient for integrating components of our back-end system together, such as two microservices, each of which represents a unique bounded context. However, in a complete reactive system, we need to integrate with more than RESTful services. We also need to enable a high degree of responsiveness in our user interfaces.

The PubSub API is a powerful tool that gives us a mechanism to interact with the “final mile of delivery,” which in this case is our UI. The PubSub API allows us to publish and subscribe to and from events within a single service cluster. On the other hand, the Message Broker API is for integrating services together across clusters.

You may be asking yourself, “What’s a cluster?” And perhaps, “What’s the difference between a service and a cluster, anyway?” These are fair questions and topics that we’ll cover as we move closer to the packaging and deployment of a reactive system to the cloud.

We’ll begin by discussing the basics of services, messaging, and clusters within Lagom. Once we have this broad understanding, we can revisit the Lagom Service API and look at how it supports streaming data under the hood. Once we have this understanding in place, we’ll conclude by connecting a live stream of events all the way from a Lagom persistent entity, to a Lagom service, to a Play service, and finally to our Vue UI over a WebSocket connection.

This unit provides a more complete and nuanced picture of the full functionality of the Lightbend Reactive Platform and showcases how all of the tools in the platform blend together to make full-stack reactive programming possible at an enterprise scale.

Overview of messaging in Lagom

There are two main APIs for messaging in Lagom:

  • The Message Broker API, which is used for service-to-service durable messaging with at-least-once semantics
  • The PubSub API, which is used for intra-service broadcast messaging with at-most-once semantics

Let’s start off by covering what these terms mean so you can design and implement Lagom services with confidence.

Message Broker API recap

The Message Broker API supports at-least-once delivery semantics, which ensures that each consumer processes every message at least once, but possibly more than once. This is the most reliable form of delivery available in Lagom, which makes it perfectly suited to sharing domain events between services.

For instance, if we need to make sure that an order event makes it from the brokerage-bounded context to the order fulfillment-bounded context, we can use the Message Broker API to reliably facilitate this due to its at-least-once delivery semantics. The underlying Kafka topics will become a part of your system’s public API, and Kafka enables at-least-once delivery. In this context, a Kafka topic becomes another endpoint within your system. It’s also helpful to remember that pub-sub is simply another integration approach like REST, and systems may use a combination of both for integration purposes.

Delivery guarantees are different than processing guarantees. A tool can promise to deliver a message to a consumer at least once, but a tool can’t promise that the message is actually processed at least once. Be careful of software that conflates delivery guarantees with processing guarantees. When delivery is guaranteed and processing is highly likely (within tolerance of your SLA, such as 99.99%), we call this effectively once semantics. When both delivery and processing is guaranteed, we call this exactly once semantics. Exactly once is virtually impossible; be very cautious around this term.

With this in mind, and the understanding that the Message Broker API uses Kafka under the hood, we can think of Kafka as a large durable buffer that enables producers and consumers to proceed at different paces. When used with partitioning, this enables the quick addition of consumers in order to scale out and process messages more quickly whenever needed. In essence, the combination of Lagom’s Message Broker API on top of Kafka brings durability and scalability to service-to-service integrations.

Technically speaking, the Message Broker API can be used when producers and consumers are all within the same service boundary. However, it is much more suitable to use something lighter. In Lagom, there is a better option for publish-subscribe messaging within the same service boundary: the PubSub API.

PubSub API introduction

Unlike the Message Broker API, the PubSub API does not make use of Kafka. Instead, the PubSub API broadcasts messages to all subscribers within a single service cluster using Akka messaging. While both APIs are used for messaging, the underlying implementation is quite different.

The PubSub API is built on top of Akka Distributed Publish Subscribe in Cluster. Akka Distributed Publish Subscribe provides at-most once messaging semantics (compared with the at-least-once guarantee Kafka provides).

At-most once delivery means that a message will be delivered up to a single time, but there’s a possibility that some messages will be dropped. However, there’s no possibility that a message will be delivered twice. This is because messages are not buffered, like in Kafka; they are delivered directly to an actor’s mailbox using Akka actor-to-actor messaging.

Let’s discuss how PubSub works under the hood.

The first step is that interested subscribers subscribe to a topic of interest. Lagom will represent subscribers, publishers, and mediators under the hood as Akka actors. Routing is handled by a mediator actor to keep track of all channels (topics) and the subscribers per topic, which is created and managed automatically by the underlying Akka runtime.

Subscribe

Once a new message is published to the mediator, it will broadcast the message to all registered subscribers in real time. Regardless of whether the publisher is an entity or service, everything is backed by Akka actors under the hood.

Broadcast

This all begs the question: Why you would use an API that may drop messages (PubSub), rather than an API that guarantees the delivery of messages (Message Broker)? It turns out that at-most once delivery is the appropriate approach to messaging when consumers care the most about recency and velocity over durability. That is, keeping up with the most recent messages — even if some messages get dropped along the way — should be the most important consideration when using PubSub.

In Reactive Stock Trader, there’s one use case that requires velocity above durability: pushing updates to the UI.

Imagine that our system has 1 million users connected at the same time. The velocity of updates to users becomes much more important than effectively once delivery because data won’t actually be lost even if a message is dropped. When pushing updates to a UI, the worst case scenario is that a user may simply not receive an alert in real time. However, on the next page refresh, the most up-to-date information will be displayed through the standard request/response channel. Therefore, there is no long-term harm if a real-time message over a WebSocket connection is dropped. In this situation, reactivity will be maintained by dropping messages under heavy load, rather than allowing the latency of our system to increase exponentially (or even crash).

The goal of this unit is to demonstrate how to stream updates to the UI in near-real time. Before we demonstrate the code involved, we’ll cover some fundamentals so that you gain a deep understanding of why we will work with the PubSub API rather than the Message Broker API. This will help to clarify what we mean by messaging within a single service cluster instead of messaging between service clusters, and how the UI fits into this picture.

The rest of this unit will cover:

Akka Cluster and Lagom service clusters

Akka is a distributed computing toolkit from Lightbend based on the actor model that was popularized by Erlang and Carl Hewitt in the 1970s.

If you’re interested in exploring the full gloriousness of the actor model and declarative programming in general, there’s nothing better than watching this epic video, circa 1990, about how and why the actor model was used by Ericsson as the basis of their declarative language for building real-time systems, Erlang. In the distributed systems world, this video is legendary, so I highly recommend watching it.

While Erlang is a unique language with its own unique VM, Akka is essentially the actor model brought to the JVM. This gives Java and Scala developers the ability to leverage a powerful approach of message-driven concurrency without having to learn a brand new language and a brand new runtime environment.

What makes Akka so special is that it’s basically object-oriented programming done right. It brings an exceptionally elegant concurrency model to developers that makes it easy to develop hyper-scalable applications. Actors communicate to each other by message passing; the cool thing is that actors don’t need to live in the same thread, or even on the same JVM. This concept of location transparency is what the PubSub API relies on.

Location transparency

Location transparency essentially means that the physical location of a service or actor is irrelevant to the caller of a service or actor. Individual actors can move around a cluster at runtime as needed, without impacting the functioning of the system. This is because the reference to an actor is stable, while the actual physical location may change. The Akka runtime handles routing of messages for us using those stable references, and you can continue to send messages to dead or dying actors without interruption. Once the actors are restarted on a healthy node, Akka will handle routing of the messages in-flight.

This does not directly affect our code, but it’s helpful to know about, especially as we move on to the next unit and cover how to deploy Reactive Stock Trader to the cloud with Kubernetes. Location transparency is one of the key design and engineering principles that brings self-healing properties to an Akka-based system. This is also helpful in order to understand the design decisions behind Reactive Stock Trader.

Imagine we have a service that is messaging an actor on Server 2. Suddenly, the actor dies because the server crashes. This does not impact the health of the system, however. Under the hood, Akka will spin up the actor on a healthy server and then reroute in-flight messages to it after it starts up. The references to each mailbox stay stable regardless of the underlying conditions at runtime.

Transparency

As you’re working with Lagom, it’s helpful to visualize that each persistent entity is backed by an Akka actor. Lagom brings the sophistication of Akka to as many developers as possible without requiring such a steep learning curve. With Lagom, we gain many of the benefits of Akka at runtime without the up-front learning curve. Learn more about location transparency in Akka.

It’s worth learning about the inner workings of Akka, as it’s such a fundamental part of the design and implementation of modern distributed systems. The deeper you dive into Lagom, the more you may want to tap into the power of Akka under the hood. Learning about Akka will pay dividends over the years to come as more and more software moves to a functional, declarative, distributed style.

A complete dive into Akka is well beyond the scope of this series, but we’ll cover enough about Akka Cluster so that the architectural decisions we make in Lagom feel even more reasonable and justified.

What’s an Akka Cluster?

The Lagom PubSub API uses clustered actors.

“Akka Cluster provides a fault-tolerant decentralized peer-to-peer based cluster membership service with no single point of failure or single point of bottleneck. It does this using gossip protocols and an automatic failure detector.” (Akka documentation)

What this means is that actor systems can connect with each other to form a cluster of actor systems. Imagine a scenario in which we deploy Reactive Stock Trader to the cloud. We don’t want to run it only on a single VM; we want to launch it on a number of VMs on a number of different systems across various network boundaries. This way, if a single VM or a single machine fails, the system as a whole will continue to function. As previously described, location transparency is a very valuable property of a reactive system. Akka Cluster is how location transparency happens in Lagom. As the system grows in usage, persistent entities will be sharded across nodes in the cluster.

By keeping entities in memory, we retain velocity in our system. And by sharding entities across a cluster, we can grow our pool of available memory as much as is needed over time. Without distribution, this architecture isn’t possible. The following figure shows the conceptual view of a single microservice backed by cluster sharding.

Sharding

In a microservice environment, we will create one Lagom service per bounded context in our system. This bounded context will be represented by a Lagom services cluster, which uses Akka Cluster under the hood. All components within a single cluster can then communicate with each other using the PubSub API.

Integrating across a second bounded context means that we’ll need to add a completely separate Lagom microservice, which means a completely separate Akka Cluster will exist at runtime. From an infrastructure and operational perspective, you can consider these like two separate systems within the same zone of trust. In essence, two clusters cannot directly message each other using Akka, which is why we need to use the Lagom Message Broker API, instead of the PubSub API across services. But within a service, PubSub is a highly effective choice, thanks to the power of Akka under the hood.

Cluster

In Reactive Stock Trader, we have a number of bounded contexts, including Portfolio, Orders, and Wire Transfers. Each one of these is represented under the hood by a cluster. Hopefully, this diagram makes it easy to understand the integration choices we’ve made so far.

Now that we have a broader understanding of how Lagom works under the hood, let’s learn how to leverage the PubSub API for streaming data from Lagom all the way through our stack to the UI.

Streaming events

Our goal is to wire together Reactive Stock Trader so that we can push wire transfer domain events from persistent entities all the way to our UI. The basic steps for streaming events to the UI are as follows:

  1. Publish events from persistent entities to services using the PubSub API.
  2. Subscribe to those events in a Lagom service and expose the events as a stream.
  3. Subscribe to the Lagom stream from Play.
  4. Push the events from Play to Vue over a WebSocket connection in real time.

The first decision we need to make is deciding which aggregate events we wish to translate into domain events for streaming. We won’t directly share aggregate events; instead, we’ll create new domain event types.

When creating domain events, an important best practice is to only include the minimum amount of information required for the event to be worked with by a variety of consumers. If a consumer requires more information than is available in the event itself, then they can query a service endpoint for additional data. It’s an anti-practice to add all available data to a domain event. This leads to bloated events and coordination bottlenecks between teams, as each team adds new fields to shared event schemas over time.

In the case of Reactive Stock Trader, it would be nice to keep users up to date with the status of their transfers in real time without requiring page refreshes. Our real-time interactions will be as follows:

  1. Once a transfer is requested, we will provide live updates of the status update by pushing critical state changes from the transfer persistent entity to our transaction history log in the UI.
  2. Whenever the status changes for a given transfer, we’ll update it in the transfer log in real time without requiring a page refresh.
  3. Finally, when a transfer is complete, we’ll update the available funds for the current portfolio.

We don’t need to publish every single status update that’s tracked by the transfer persistent entity. The three most important changes for the UI to know about are:

  • Transaction Initiated
  • Delivery Completed
  • Delivery Failed

Once someone clicks on the Submit Transfer button, we will begin updating them on the progress of the transfer. This is even more important in critical domains like banking. Transferring thousands of dollars from one account to another is a nerve-wracking experience for most people. We need to ensure that users of our software are kept up to date.

Part of building reactive systems is being reactive to the needs of our users on a human level. When users trust that updates are immediate and accurate, they will more deeply engage with our software. The next time you’re building a system, think carefully about the domain. In the case of Reactive Stock Trader, try to put yourself in the shoes of someone waiting for a wire transfer to complete, which may be the next step to paying off a long-standing debt, finalizing a mortgage, or moving another important life goal forward.

1

Publish events from an entity

Let’s start by revisiting TransferEntity, a persistent entity that functions in the role of process manager and keeps track of the status of transfers across bounded contexts. We can consider this entity the source of truth for the status of a transfer, which means we will generate status updates based on changes within this persistent entity.

TransferEntity.java

private Behavior empty() { // 1
    BehaviorBuilder builder =
        newBehaviorBuilder(Optional.empty());
            builder
                .setCommandHandler(
                    TransferCommand.
                        TransferFunds.class,
                            (cmd, ctx) -> { // 2

        TransferDetails transferDetails =
            TransferDetails.builder()
                .source(cmd.getSource())
                .destination(cmd.getDestination())
                .amount(cmd.getAmount())
                .build();

        ObjectMapper mapper = new ObjectMapper();

        TransferUpdated tc =
            buildTransferUpdated(transferDetails,
                "Transfer Initiated"); // 3

        publishedTopic.publish(
            mapper.valueToTree(tc).toString()); // 4

        return ctx.thenPersist(
            new TransferEvent.TransferInitiated(
                getTransferId(), transferDetails),
                evt -> ctx.reply(Done.getInstance())
        );
    });

    builder.setEventHandlerChangingBehavior(
       TransferEvent.TransferInitiated.class,
       this::fundsRequested);

    return builder.build();
}

We start by revisiting the various behaviors of our persistent entity, beginning with the empty() behavior (1). The command handler for TransferFunds (2) means that the transfer has been initiated. When the transfer command is received, we will generate a TransferUpdated event (3) with a status of "Transfer Initiated". Finally, we publish this using (publishedTopic.publish(...)), which will broadcast the message to all subscribers within the service cluster (4).

2

Subscribe to broadcast messages in a Lagom service

In order to subscribe to broadcast messages using the PubSub API, we need to inject a PubSubRegistry in our service. Let’s look at a condensed version of WireTransferServiceImpl to see how subscribing works.

WireTransferServiceImpl.java

public class WireTransferServiceImpl implements WireTransferService {

    private final TransferRepositoryImpl transferRepository;
    private final CassandraSession db;
    private final PubSubRegistry pubSub; // 1

    @Inject
    WireTransferServiceImpl(
            TransferRepositoryImpl transferRepository,
            ReadSide readSide,
            CassandraSession db,
            PubSubRegistry pubSub) { // 2
        this.transferRepository = transferRepository;
        this.db = db;
        this.pubSub = pubSub; // 3
        readSide.register(TransferProcess.class);
        readSide.register(TransferEventProcessor.class);
    }

    // ...
}

All of our Lagom services are fronted by Play, which serves the role as back end for front end, or API gateway. Essentially, there will be no way for a WebSocket connection to be made directly with Lagom. The public API will be represented only by Play services and Kafka topics. This means that we need to make the event stream available using Lagom for consumption by Play so that we can expose it with Play’s public API.

The first step is to make our PubSubRegistry available (1) by injecting it into the service constructor (2, 3). Now that we have the PubSubRegistry reference available in our service, we’ll create a new Lagom service endpoint for subscribing to events from the PubSub API and streaming them out to Play.

@Override
public ServiceCall<NotUsed, Source<String, ?>> transferStream() { // 1
    // ...
}

Lagom has a ServiceCall signature that supports streaming data: a parameterized ServiceCall method (1). The signature is ServiceCall<Request, Response>. We can plug in a streaming source as the response type.

In our case, we’re only streaming one way: status updates from our persistent entity to the UI. This means we’re defining a ‘unidirectional flow.’ (Bidirectional flows are also supported but won’t be covered, as we do not need to work on live streaming data from the UI.) Streaming in Lagom (and Play) is backed by Akka Streams.

Let’s now take a closer look at the type signatures we’ve defined above and how they relate to Akka Streams.

Akka Streams 101

The method transferStream() is a Lagom service call with NotUsed as the input type and Source<String, ?> as the output type. We’ve seen other simple types as output for a ServiceCall, such as TransferId, but we haven’t come across a Source type yet.

In Akka Streams, a Source is literally just that: a source of streaming data. A Source has an output type and a materializer, Source<Out, Mat>.

For the transferStream() method, the type signature of our source is Source<String, ?> (1). This means that we’re providing a service call that ignores all input (represented by NotUsed) and a streaming source of strings with a wildcard type of materializer (represented by Source<String, ?>).

Remember that in Java generics, ? is a wildcard type, and, in this position, the wildcard type is an ‘unknown type’.

Reactive Streams 101

Both Lagom and Play support Akka Streams, which is an implementation of the Reactive Streams protocol. This enables toolmakers to implement their own Reactive Streams-compliant libraries, which developers can then use to create a complete streaming system from components built with multiple tools! It’s really an incredible feat of software engineering.

If you’re interested in learning more about streaming in general, I highly recommend taking a deeper look into Reactive Streams. For now, it’s sufficient to understand that when we provide a Source type as a service call parameter, we’re signaling our intention to materialize a source of streaming data.

Streaming all the way down

With this context and understanding, let’s view our complete ServiceCall method, which is actually very light considering the rich functionality under the hood.

@Override
public ServiceCall<NotUsed, Source<String, ?>> transferStream() {
    return request -> {
        final PubSubRef<String> topic =
            pubSub.refFor(
                TopicId.of(
                    String.class,
                    "transfer")); // 2
        return CompletableFuture
            .completedFuture(
                topic.subscriber()); // 3
    };
}

First, we obtain a reference to the PubSub ‘topic,’ which is a named channel (2). If you recall in our persistent entity, we are sending messages of TransferComplete type encoded as String values and tagged "transfer". Next, we subscribe to the messages using topic.subscriber() (3). The above code lets Akka Distributed PubSub know that this service call is now a subscriber. Akka will then take care of routing all messages appropriately.

According to the PubSubRef documentation, subscriber() consumes “messages from the topic via a stream. You can return this Source as a response in a ServiceCall and the elements will be streamed to the client. Otherwise, you have to connect a Sink that consumes the messages from this Source and then run the stream.” This means that subscriber() provides a source of streaming data and you can return that source to a Lagom service call. Lagom will handle wiring up the stream for you, including plugging in a materializer in the ? position.

Akka Streams requires that a source of data be connected to a sink of data before the stream can be materialized and executed. If you grab a reference to the source without passing it to a service call, you’ll need to handle the wiring and materialization of the stream yourself. There are some advantages to this; for instance, you may wish to construct more complex streaming logic to route messages through before they reach the UI.

Take a deeper dive into Akka Streams.

Now that we’ve wired up our source of data to our service, it’s time to consume this source in Play and push real-time events to Vue!

3

Connect the stream from Lagom to Play

Ultimately, this stream will be consumed over a WebSocket connection in Vue. First, in order to achieve this, we need to expose a publicly accessible service endpoint. As with the rest of our system, Play is the solution we’ll use for this, continuing to follow the BFF pattern. Let’s explore the streaming WebSocket endpoint in our BFF service controller.

WireTransferController.java

public WebSocket ws() { // 1
    return WebSocket.Text.acceptOrResult(req -> { // 2
        return wireTransferService
            .transferStream() // 3
            .invoke() // 4
            .thenApply(source -> { // 5
                return
                    F.Either.Right( // 6
                        Flow.fromSinkAndSource( // 7
                            Sink.ignore(),
                            source)); // 8
             });
    });
}

Every line of the above method is important, so we’ll step through it carefully.

Our type signature specifies that we’ll return a WebSocket handler (1), which is a convenience type provided by Play. If we look at the API spec, we can see the signature required to create the handler is as follows:

public abstract java.util.concurrent.CompletionStage<
        F.Either<Result,
                 akka.stream.javadsl.Flow<Message,Message,?>>>
    apply(Http.RequestHeader request)

This is quite the method signature for those without exposure to monadic data structures, but it’s helpful to keep in mind that Lagom is built with Scala and even the Java API shares some Scala flavor. Let’s cover what’s happening in detail. Based on our high-level understanding of Akka Streams, it should be straightforward to parse through this code.

To begin, we need to provide either a future of a Flow to use as a source of data or a result in order to reject the connection. The shape of this return type is specified by acceptOrResult (2). Basically, this is where we either place logic to connect the source (from Lagom) to the sink (the WebSocket handler) and start to stream data, or reject the connection, which we may do for any number of reasons, such as an authentication/authorization failure.

Either is a monadic data type commonly used in functional programming. Similar to option types, which help us avoid null values by passing either Some(value) or None, F.Either is a convenience type in Play that gives us the ability to return a success (called a right value) or a failure (a left value). Instead of reading this as “right” or “left,” some people read it as “right” or “wrong.” Regardless of how you read it, either is used to return successful types or failures without having to rely on unchecked exceptions. Think if this pattern as forcing you to handle every error, with exhaustiveness verified by the compiler.

Explore an old but good introduction to monadic concepts by Neil Ford. I highly recommend this article for Java™ developers who wish to increase their functional and type-level comprehension. If you wish to learn more about monads, specifically in Scala, check out this simple introduction, which explains monads in the context of Scala’s collection library.

We will return a success in the form of a Flow wrapped in an F.Either.Right, or a failure Result wrapped in an F.Either.Left. Again, this may be slightly unfamiliar syntax in the Java world, but it’s a great primer to the programming style of functional languages such as Scala and Haskell.

Now that we understand what we need to return, let’s revisit the ws() method. We’ll obtain a reference to transferStream() from Lagom (3), which we execute invoke() on in order to work with the underlying java.util.concurrent.CompletionStage (4). Once we have access to the completion stage, we can map over the stream with thenApply (5), which is a standard asynchronous programming method in Java 8 (see the CompletionStage documentation for more details). This is how we declare what we wish to happen with the Source; we will return a Flow.fromSinkAndSource(in, out) wrapped in F.Either.Right, which fits the type signature required by WebSocket (8).

There’s arguably a lot going on here, which may take some effort to fully understand. In order to feel completely comfortable with this method, you’ll need to have a basic understanding of:

  • Asynchronous programming in Java 8
  • Asynchronous programming in Play (which you can learn more about)
  • Akka Streams and Reactive Streams
  • Monadic data types

The great news is that everything you need to understand to build reactive systems is basically contained in this single method! So don’t rush your comprehension of it. Take your time, and take comfort in knowing that there’s a learning curve that every Java developer experiences before this style of code feels intuitive. Lagom makes it easy to be rapidly productive before understanding every nuance of what’s happening under the scenes, but I’m here to encourage you to gain that deeper understanding, which will open you up to a whole new world of declarative, functional, reactive programming.

Finally, to complete the Play portion of streaming, we simply need to:

  • Declare the new endpoint in our service interface
  • Declare the new endpoint in our routes file
  • Add the PubSub dependency to our build file

First, let’s add the endpoint to our service.

WireTransferService.java

public interface WireTransferService extends Service {

    String TRANSFER_REQUEST_TOPIC_ID = "WireTransfer-TransferRequest";

    ServiceCall<Transfer, TransferId> transferFunds();

    ServiceCall<NotUsed, PSequence<TransactionSummary>> getAllTransactionsFor(String portfolioId);

    ServiceCall<NotUsed, Source<String, ?>>
        transferStream(); // 1

    Topic<TransferRequest> transferRequest();

    @Override
    default Descriptor descriptor() {
        // @formatter:off
        return named("wire-transfer").withCalls(
            call(this::transferFunds),
            call(this::transferStream), // 2
            restCall(Method.GET, "/api/transfer/:portfolioId", this::getAllTransactionsFor)            
        )
        .withTopics(
            topic(TRANSFER_REQUEST_TOPIC_ID, this::transferRequest)
        );
        // @formatter:on
    }
}

We need to add our service endpoint (1) and then simply declare it as a call (2). This enables Play to determine which protocol to use, which will be ws:// (or wss://), rather than http:// (or https://).

Finally, we’ll need to add a route, which is necessary for the initial WebSocket handshake before Play opens up a persistent connection.

routes

GET     /api/transfer/stream                    controllers.WireTransferController.ws()

Last but not least, we need to make a few small changes to the build file in order to bring in the required dependencies for the PubSub API.

build.sbt

libraryDependencies ++= Seq(
      lagomJavadslPersistenceCassandra,
      lagomJavadslTestKit,
      lagomJavadslKafkaBroker, // 1
      lagomJavadslPubSub // 2
    )

As you can see from the code, we need to explicitly import the Kafka Broker plugin for the Message Broker API (1), and the PubSub plugin for the PubSub API (2).

4

Connect to Play over a WebSocket from Vue

Now that we have the stream wired together in Play and Lagom, we can finally create a WebSocket connection in Vue with a few small changes. All of our UI changes are self-contained within the wire transfer component page.

transfers/New.vue

connect() {
  this.socket = new WebSocket("ws://localhost:9000/api/transfer/stream"); // 1
  this.socket.onopen = () => { // 2
    this.socket.onmessage = (e) => { // 3
      let event = JSON.parse(e.data);
      var index = -1;

      // determine if we're updating a row (initiated) or adding a new row (completed)
      for (var i = 0; i < this.transfers.length; i++) {
        if (this.transfers[i].id === event.id) {
          index = i;
          break;
        }
      } // 4

      if (index === -1) {
        // unshift is similar to push, but prepends
        this.transfers.unshift({
          id: event.id,
          status: event.status,
          dateTime: event.dateTime,
          source: event.sourceId,
          destination: event.destinationId,
          amount: event.amount
        });
      } else {
        let t = {
          id: event.id,
          status: event.status,
          dateTime: event.dateTime,
          source: event.sourceId,
          destination: event.destinationId,
          amount: event.amount
        };
        this.transfers.splice(index, 1, t);
        this.updateCashOnHand();
      } // 5
    };
  };
}

We’ll create a new connect() method that we call from mounted() when Vue first renders the page.

To start, we need to establish the WebSocket connection (1, 2) and then specify the callback that will be executed with each new transfer event (3).

The main UI feature we will add is updating the transfer history table in real time. This gives users an insight into the status of all their transfers. In the callback, we need to do two things: first, look up the transfer ID to determine if it’s already in our table, and then determine if this is a new ID (for initiated events) or an existing ID (for events that should be completed or failed) (4). Our table is backed by an array, so we need to determine the index of the transfer ID or the lack of an index.

Once we have the index (or lack thereof) determined, we will either prepend the new event in the array, or splice the array to remove the previous transfer status and replace it with the latest status (5).

We also have an updateCashOnHand() method, which will be called each time a transfer completes.

updateCashOnHand() {
  getDetails().then(details => {
   this.cashOnHand = details.funds;
  });
}

This is wired up to the cash on hand card and will update in real time whenever the portfolio’s cash balance is updated.

<div class="card">
  <div class="card-body">          
    <h4>Active Portfolio</h4>          
    <div class="row">
      <div class="col">
        Cash on hand
      </div>
      <div id="cashOnHand" class="col">
        {{ cashOnHand | toCurrency }}
      </div>
    </div>
  </div>
</div>

Finally, we’ll add a nice little CSS effect that is tripped whenever the cash balance is updated. The card will flash yellow to get the user’s attention. It’s a nice little touch that keeps the UI feeling responsive.

<style scoped>
  @keyframes yellowfade {
      from { background: yellow; }
      to { background: transparent; }
  }

  .item-highlight {
      animation-name: yellowfade;
      animation-duration: 1.5s;
  }
</style>

Choosing the right messaging approach

Lagom provides a wealth of options for messaging, whether you’re integrating between services, or messaging within a service boundary. We’ve covered a lot of these options in this series, from read-side processors, to the Message Broker API, to the PubSub API. Let’s conclude this unit with a few general pieces of advice. A special thanks to James Roper (@jroper) from Lightbend, the original creator of Lagom, for his help with curating the following advice.

If the messages you are sending are persistent entity events, and the consumers are part of the same service, then a persistent read-side processor might be a good option. This also provides at-least-once delivery. And if the only effects of processing messages are database updates, then the built-in support for Cassandra read-side databases and relational read-side databases provide effectively once semantics, where the database updates are run transactionally to ensure that failures that occur during event processing cannot result in partial updates.

If the messages you are sending are general events from anywhere within a service boundary, and the consumers are part of the same service, plus you want to process the events as a stream, you can access a raw stream of events with the PubSub API. This is one of the most flexible messaging APIs in Lagom. It provides at-most once delivery, which means that individual messages may be dropped but overall velocity will be maintained. This is a great option for pushing events to UI subscribers.

If the messages you are sending are persistent entity events, and the consumers are each within a different service, the Message Broker API is your best option. This provides at-least-once delivery. Unlike the persistent read-side processor, we can’t quite call this “effectively once”, because there’s no guarantee an issue won’t prevent a consumer from handling the message. However, it’s still a fairly strong level of consistency, and more reliable than RESTful integration.

Finally, you always have the ability to integrate with point-to-point REST. However, this does not provide you with event stream semantics. You’ll either accept commands or provide results backed through raw data or a read-side model.

If your use case does not fit into one of the use cases that Lagom supports explicitly, you can use lower-level Akka APIs, including distributed publish-subscribe, to implement something more tailored to your needs. The best choice will depend on the specifics of your use case. The more you use Lagom, the more intuitive this choice will become.

Without a tool like Lagom, by now it’s probably easier to understand the complexity of building fully event sourced, CQRS, real-time streaming systems from scratch using generic tools that are not specifically designed for these patterns. Most vendors of similar tools to Lagom rely on databases under the hood rather than a foundation of genuine real-time pub-sub and streaming. This makes the learning curve with Lagom slightly steeper than alternative microservice frameworks for the JVM, but the long-term dividends are enormous. Learning Lagom positions you to be ready for the shift towards true real-time computing.