Digital Developer Conference: Hybrid Cloud 2021. On Sep 21, gain free hybrid cloud skills from experts and partners. Register now

Reactive in practice, Unit 9: Reactive integration patterns

This unit covers how to integrate across various bounded contexts within our system. Unit 4 covered the basics of concurrency and parallelism, which we referred to as asynchrony. This unit expands on these basic building blocks to demonstrate how asynchronous programming affects our architectural decisions at a systems level.

Let’s begin by covering the basics of integration within a reactive system before diving into Lagom’s Message Broker API.

Integration patterns distilled

Unit 4 covered three modes of concurrency:

  • Threads
  • Promises and futures
  • Actors

All of those modes of concurrency are sufficient for processing data on a single VM. But what happens when we need to integrate components of an enterprise system that will be distributed across various virtual machines, nodes, and clusters? We need to come up with an integration approach that scales.

Lagom is a framework built on top of actors for implementing reactive microservices. In order to properly understand what this means, let’s cover some popular options for systems integration:

When evaluating different frameworks, it helps to know how they’re implemented. Principles matter. This will give you insight into how some of the most popular reactive microservice frameworks (including Lagom) are designed and intended to be used.

Database integration

Database integration is arguably the most common form of integration between distributed components of a system.

In the case of Reactive Stock Trader, we could represent persistent entities as database tables and then create a collection of services that reads from and writes to those tables. However, shared databases are a major cause of unnecessary complexity in enterprise systems.

Imagine we have three services: A, B, and C; and each of these services integrates with the same database. Let’s say that Service A releases a new version. As part of this release, there’s a change to the database schema that Service A requires. The change goes well and is verified to work flawlessly for Service A. However, almost immediately after the release, bugs start to pop up in Services B and C. What’s the cause? Services B and C have not had a release in weeks, but now the teams must scramble to trace the root cause issue back to the update that happened to the shared tables as part of Service A’s update.

This is an example of strong coupling between components, which results in a strong coupling between teams. A system like this is monolithic in nature and brittle in practice. Any change to a service that requires a change to the database will likely involve triage between a vast swath of teams within an organization. It’s hard to quantify how many emergency conference calls happen every day because of the database integration pattern that we just outlined. However, as long as databases are not shared between components, database integration is not necessarily an anti-pattern. Some popular microservice frameworks, such as Axon and Spring, are internally implemented using database tables to handle the persistence of entities.

Point-to-point service integration

In a microservices architecture it’s standard practice to integrate components with a point-to-point style of integration over HTTP with RESTful APIs.

If most point-to-point service calls are in a single direction, such as a web application that grabs data from resources such as databases and third-party APIs, point-to-point integration is not only perfectly acceptable but possibly the only option. However, point-to-point integration leads towards unnecessary complexity when it becomes unidirectional across many services. Once services begin to point at each other we can easily end up with a tangled web of integrations that can impact our overall understanding of the system.

Tangled web

Let’s imagine an even more complex system with three services that all integrate with each other in both directions. This means we have 18 different point-to-point integrations: three integrations in each direction (six), multiplied by three services (18). Now imagine that we have hundreds of services that all point at each other.

Complex, cyclical dependencies make everything from testing, debugging, scaling, and maintenance a challenge, especially if the outage of one service causes the outage of another service. In a system built on top of point-to-point calls, we can quickly experience cascading failures.

Not only do these issues cause outages, but they also force teams to heavily coordinate with each other. Imagine an instance where Team A needs to upgrade Service A. If the system is tightly coupled, other service teams in the organization become concerned with the upgrade, as it will force cascading changes to their services. Perhaps a steering committee is formed, and everyone collaborates to determine the optimal upgrade path of Service A. Team A finally gets permission from Team B, C, D, all the way to Z, simply to update Service A.

Let’s explore a better way to integrate complex systems.


Publish-subscribe is a pattern based on indirection because messages are not directed to recipients, but rather published to a queue. In a reactive event-driven system, events are published by publishers, without publishers knowing (or caring) how many subscribers there will be for an event. Likewise, subscribers can subscribe to specific types of events and not care about the publishers of those events —- who the publishers are or even how many publishers there are. Therefore, there are no unnecessary coordination costs. If events are published by the teams who naturally steward that particular business unit, it’s very easy to subscribe to those events with confidence. For instance, subscribing to customer address updates on a topic owned by the shipping team makes sense — it naturally follows the map of our bounded contexts, which already represent the natural structure within our business.

Ideally, relevant events have already been decided during event storming sessions and mapped to the bounded contexts identified in those sessions.

Before covering publish-subscribe in more detail, we must disambiguate two terms: events and messages:

  • Messages are published to one or more topics. Subscribers can subscribe to messages on a particular topic. A topic is a sequence of related messages.
  • Events are stored within a message. An event is ‘something interesting that happened.’ Events represent the business context within your application.

In Kafka, we create a message based on an event, then publish the message to a topic. A subscriber subscribes to a topic, reads messages from the topic, extracts the events within, and reacts to each event. Throughout the rest of the series, we’ll use both terms. When we speak about messages, we’re typically speaking about a serialized event, including all relevant metadata.

Now that we have a few basic terms out of the way, let’s discuss the benefits of publish-subscribe.

Benefits of publish-subscribe

A major benefit of publish-subscribe is that events are published to a log (such as Kafka or Cassandra) and serve as an audit trail of what led to the state of a component. In Lagom, not only can we determine what the state of a persistent entity was at a given point in time, we can also determine exactly how the entity arrived at that state by stepping through domain events in Kafka and aggregate events in Cassandra.

Another technical benefit of the publish-subscribe pattern is location transparency, which helps to achieve greater network scalability and reliability. Location transparency means that publishers and subscribers are not expected to reside on the same thread, VM, node, cluster, or data center. Imagine a publisher that’s publishing a stream of events, but mid-way through publishing, the VM crashes. In a reactive system, the service can restart on a healthy node and resume publishing by attaching to the source of data that it was acting on and picking up processing where it left off. Any subscribers to these events may notice a slight delay in the event flow as the crash/restart happens, but otherwise the system is back to a healthy state. This is known as self-healing, which is possible (in part) due to location transparency.

Drawbacks of publish-subscribe

There are a few drawbacks of the publish-subscribe pattern that developers should be aware of. The most noticeable form of complexity comes from schemas, serialization, and immutability.

Messages are very difficult to change once published. Modifications to existing messages (and the events within) should be avoided unless absolutely necessary. Publishers can update the schema of their messages, but these should only impact new events. This means that subscribers will need to handle events with multiple schema versions. Existing events should never be deleted or modified unless required by government or industry regulations such as EU General Data Protection Regulation (GDPR).

Lagom is based on Akka, which has a host of commercial add-ons, including GDPR for Akka Persistence. This makes the removal of data for GDPR reasons trivial.

Another source of complexity is how compact messages should be. Imagine we have a publisher of trading events and dozens of subscribers. A single subscriber requires additional data about trades. How do we handle this? We don’t. It’s up to the subscriber that requires additional data to find that data without altering the schema for existing messages. This is in sharp contrast to database integration, where tables often wind up overly complex because they include all data that every possible reader or writer will require. In contrast, messages should be kept very lean by only including the bare amount of data necessary. Schemas should therefore change infrequently.

In publish-subscribe systems, it’s common to produce and consume very small message payloads, and then perform RESTful requests to query additional data as needed. For instance, if we’re subscribing to trade events and require additional information about a company, we don’t include all company information in the event itself. We’ll simply request company data from a third-party API like IEX. If we require even more data, such as all historical trades for the customer, we can request that data directly from a service that publishes that data. As long as our service doesn’t fail if an external API call fails we can still assure a high-level of reliability in our system.

Pro-tip: Lagom’s Message Broker API goes hand in hand with ReadSide Processors. Lagom favors compact events, which trigger some sort of behavior, such as updating a read-side view. We should strive to avoid performing IO or other complex processing in real-time during a request.

Communicating domain events across bounded contexts is handled by Kafka out of the box in Lagom. Therefore, we need to cover the basics of Kafka at a high level and how Kafka fits into the architecture of a system built with Lagom.

Kafka 101

Kafka was originally developed at LinkedIn as a way to integrate various systems together across their entire organization. This helped LinkedIn scale without brittle approaches like point-to-point service integration, backdoors, or shared databases.

“Kafka was developed at LinkedIn back in 2010, and it currently handles more than 1.4 trillion messages per day across over 1400 brokers. Kafka’s strong durability and low latency have enabled us to use Kafka to power a number of newer mission-critical use cases at LinkedIn.” — Joel Koshy, LinkedIn’s Engineering Blog

Think of Kafka as the way LinkedIn created a simplicity stack for the entire organization. At volumes of over a trillion daily messages, handling messages in a predictable, reliable, and performant manner is a critical undertaking. It’s these strenuous requirements that gave birth to Kafka. Now Kafka is an open source technology and commercially supported by Confluent.

In Lagom, the Message Broker API integrates with Kafka out of the box, which is why some of the basic concepts and motivations behind Kafka are important to understand.

Flow control

Complex Event Process (CEP) systems have existed for years, but have always suffered from the possibility of cascading failure under heavy load. If publishers and subscribers are coupled together, a subscriber can quickly become overwhelmed by a fast producer of data.

Imagine a water faucet and a drain. If the water flows faster that the drain’s capacity, the water will spill over the sink, causing a flood. This is exactly what happens in some messaging systems. But rather than a flood of water we may experience a flood of messages, which can lead to a spike in memory usage and OOM (out of memory) errors. And instead of water damage, this will cause a cascading failure throughout our entire system.

Kafka is basically an enterprise-grade durable buffer that enables flow control. Kafka enables flow control by buffering messages to disk and, based on its design, can ensure that writes are never dropped. This in turn ensures that messages are delivered at least once.

We’ll talk about messaging semantics such as ‘at least once’ delivery more in the next unit.


The unit of data in Kafka is a message. A message is simply an array of bytes. A message must be serialized using a serialization technique such as Avro or Protobuf. A message can contain any type of data, but in our case we will only be publishing domain events.

Messages can have a key, which is also a byte array. Keys determine which partition a message is written to (more on partitions shortly). For example, for transactions, a key can ensure all operations in a transactional boundary write to the same partition. This is known as ‘message affinity’. Lagom follows suit and allows us to specify a partition key strategy.

Affinity is an important concept in distributed systems. For performance reasons, we may want messages in the same partition. For similar reasons, we may want related microservices running on the same node in a cluster. We’ll cover this in much more detail in Unit 11.

Messages are stored durably based on a configureable retention period. By default, messages are retained for 14 days. However, in a lower volume systems, we recommend configuring Kafka to store messages indefinitely. This will give your team access to historical messages for analytics or debugging purposes. If storage becomes a challenge, you can then lower the retention period.


Additional structure must be imposed on messages, otherwise, it’s difficult for subscribers to know how to process them. It’s possible to use simplistic schemas like JSON or XML, however, most Kafka developers use Avro. Thrift and Protobuf are two other options. Schemas should be well-defined and stored in a common repository. Only then can Kafka be used without coordination issues. Kafka comes with a Schema Registry for this reason. Read the Schema management documentation (on Confluent) for more details.

Topics and partitions

Messages are arranged in topics. Topics broadly categorize messages, such as trades and wire transfers. Topics can have one or more partitions. A message is appended to a single partition within a topic. Partitions are the mechanism used for scale and resilience.

One caveat is that the number of partitions per topic can be increased but never decreased. We recommend starting with a low number of partitions and then increasing only as needed.

Messages are stored in order and can be read deterministically within a partition. This means that order is maintained within a partition but not within a topic. For this reason, it’s simplest to start with one partition per topic and only increase the partition count when scalability concerns arise.


During an event storming session, it can help to have a technical breakout session to decide on things like the number of topics and partitions within a topic based on predicted volume of messages whenever domain events are being discussed.


Messages to Kafka are written in a batch. A batch is a collection of messages written to the same topic and partition.

Correctly sizing batches is a critical part of tuning Kafka. Furthermore, batches can be compressed — with Gzip or Snappy, for example. This increases data transfer capacity and storage capacity at the cost of extra required CPU cycles for compression/decompression.

Pro tip: Kafka has very low CPU overhead compared to network saturation, so compressing batches makes sense. Use Gzip or Snappy in a production system to reduce network saturation.


Consumers exist to keep track of the messages that a subscriber has already consumed by keeping track of the offset of messages it has read for each partition. Consumer offsets are stored directly in Kafka (recommended) or externally in Zookeeper (supported but not recommended). Kafka is optimized for writing, so consumers have a bit of extra work to do. Managing offsets is the principle responsibility of consumers in order to ensure “at-least-once” processing.

Consumers work as part of a consumer group to read from a topic. Within the group, individual consumers map to individual partitions, which is called ownership. This is how consumers can horizontally scale. As the partition count within a topic increases, the consumer count within a consumer group should increase as well.

Kafka terminology uses Producers and Consumers, rather than Publishers and Subscribers. Kafka is a lower level of abstraction than Lagom. Think about it like so: Lagom is a publish-subscribe-based framework built on top of Kafka, which uses producers-consumers under the hood.


Producers are more straightforward than consumers in Kafka. A producer simply has to route messages to the proper partition within the specified topic using the provided message key. Kafka guarantees that any messages with an identical key will be persisted to the same topic.

Because partitions can only be increased and not decreased, it’s safe to create a key based on the hash of an ID, for instance an account ID. This way accounts with an ID from 1 to 1,000,000 may be stored in one partition, 1,000,001-2,000,000 in another partition, and so forth. Whatever key strategy you decide on is fine as long as you keep in mind that the key is used for partition selection.

Another responsibility of producers is to ensure that messages are durably persisted before they are read by consumers. Kafka guarantees this level of consistency by requiring an acknowledgement of a successful write, which happens after the message is stored across a configurable number of replicas (partitions on different brokers). The acknowledgement of successful replication is eventually received by the producer. This is how Kafka helps to ensure that a message is not lost, and even worse, not lost after it has been read by a consumer!

Brokers and clusters

A single Kafka server is called a broker. Brokers operate together in a cluster. Within a cluster, a single broker acts as the cluster controller.

A single partition is owned by a single broker, and this broker is the leader of the partition. A partition may be assigned to multiple brokers, which results in the partition being replicated. One broker is always the leader of a partition, even if a partition is assigned to multiple brokers.

Replicas are critical for resilience. In the event that a single broker fails, the cluster will detect the failure and be able to elect another broker as the leader of the partition. One partition is only ever active at a given time on the broker that owns the partition. Other brokers simply act as cold storage in the event of a failure.

Broker replica

Multiple clusters are common in multiple datacenter deployments. A single Kafka cluster can be configured to run inside a single datacenter. Kafka has no location awareness, so this is all handled through configuration.

Lagom Message Broker API

Now that we understand the core concepts of publish-subscribe and how it works in Kafka, let’s look at how we leverage all of these concepts within Reactive Stock Trader using Lagom. We’ll start by covering the brokerage component of Reactive Stock Trader.

Reactive Stock Trader models a portfolio management system, not a custodian service. In other words, Reactive Stock Trader can trigger buy and sell orders, but it never takes direct possession of any shares or cash. In the real world, a system like Reactive Stock Trader would integrate with a custodian service like Alpaca, Tradier, or another similar service.

When reviewing the brokerage code that we will now cover, think about how you would enhance it to integrate with a brokerage API like we mentioned above. Who knows, perhaps a future FinTech startup will clone Reactive Stock Trader and use it as an onramp to get started toward a future portfolio management system!

Up until now, we’ve worked primarily with aggregate events, which are contained within the boundary of a single persistent entity. The Lagom Message Broker API is the interface we will use to subscribe to and publish domain events.

The Lagom Message Broker API gives us:

  • A mechanism to transform some aggregate events into domain events
  • A mechanism to publish domain events to a topic
  • A mechanism for subscribers to consume domain events on various topics of interest

The Lagom Message Broker API is all about integrating between our various bounded contexts by producing and consuming domain events.

To demonstrate this functionality, we’ll look at (a short version of) the BrokerServiceImpl class, which loosely models the custody of stocks within our portfolio.

public class BrokerServiceImpl implements BrokerService {

    private final QuoteService quoteService;
    private final OrderRepository orderRepository;

    public BrokerServiceImpl(PersistentEntityRegistry persistentEntities,
                             QuoteService quoteService,
                             PortfolioService portfolioService,
                             OrderRepository orderRepository) {

        this.quoteService = quoteService;
        this.orderRepository = orderRepository;


            .atLeastOnce(processPortfolioOrders()); // 1

    private Flow<OrderPlaced, Done, NotUsed> processPortfolioOrders() {
        return Flow
            .mapAsync(10, this::processOrder); // 2

    private CompletionStage<Done> processOrder(OrderPlaced order) {
        return orderRepository
            .placeOrder(order.getPortfolioId(), order.getOrderDetails()); // 3

First, we subscribe to interesting events on a topic. Because we’re simulating trade orders, we subscribe to OrderPlaced events (1). You’ll also notice that we specify atLeastOnce. This means that we must process each order at least one time, but we may wind up processing an order twice (or more) in the event of an edge case, such as node failure and restart. This is also why commands should be idempotent.

Idempotence is an important characteristic of a well-designed API. It means that the same command or event can be processed multiple times without changing the meaning of the result. For instance, setQuantity(1) is idempotent. If you execute this multiple times, the quantity will remain 1. However, incrementQuantity(1) is not idempotent. If that command is executed multiple times, for example, after a Kafka restart, it will corrupt the state of our system.

Please be aware of ‘at-least-once‘ and ‘idempotence‘ before crafting a distributed production system.

We process orders in parallel using mapAsync(10, ...), where 10 is the ‘parallelism factor’ (2). The factor of 10 limits how many orders are placed before we get acknowledgement that they have been placed (which should be essentially instant), rather than the maximum number of orders we can process concurrently.

Finally, you’ll note that we pass this::processOrder to the map function in processPortfolioOrders() (3). Whenever an order is published, we’ll process it using processOrder along with an order event of type OrderPlaced. In a real-world system, we would integrate with a third-party API here to perform the actual trade. But in our case, we will handle orders internally by grabbing the persistent entity of the order and executing the placeOrder command on the OrderModel, which is returned by the get(...) method.

Order model

    public CompletionStage<Done> placeOrder(PortfolioId portfolioId,
                                            OrderDetails orderDetails) {
        CompletionStage<Order> placeOrder = orderEntity
            .ask(new OrderCommand.PlaceOrder(portfolioId, orderDetails)); // 1

        placeOrder.thenAccept(order -> tradeService
            .exceptionally(ex -> { // 2
                return new
                        new OrderId(
            }).thenAccept(orderResult -> { // 3
                    .ask(new OrderCommand

        return placeOrder.thenApply(o -> Done.getInstance()); // 4

First, we place the order by creating a new PlaceOrder command, submit it to a new order entity, then wait for the unique order ID, which will be generated by the order entity on creation (1).

We then place the order with the trade service and then complete the order (3). We do this by providing a lambda to be executed when the order result is available from the trade service. If we encounter an error along the way, we’ll want to log it and return a failure (2).

Finally, we immediately return Done, which is sufficient for a sample reference application (4). Note that our service call responds with Done after the PlaceOrder command is accepted; it does not wait for the order to be fulfilled (which, in general, may require some time).

If the service is interrupted before this is completed, we will not reattempt. Consider the implications. This would be an excellent exercise to work through on your own on how you would handle this in a production system! As a further challenge, consider improving this in Reactive Stock Trader with a pull request.

Order entity

In OrderModelImpl, we invoke the ask method on orderEntity with two different commands: PlaceOrder and CompleteOrder. Let’s take a look at the OrderEntity class so that we understand how these commands are handled.

    private class UninitializedBehaviorBuilder { // 1
        private final Behavior behavior;

        UninitializedBehaviorBuilder() {
            BehaviorBuilder builder =

                this::placeOrder);  // 2

                    this::processing); // 3

                    (cmd, ctx) ->

            this.behavior =;

        Behavior getBehavior() { return behavior; }

        private Persist placeOrder(OrderCommand.PlaceOrder cmd,
                                   CommandContext<Order> ctx) {
            Order order = new Order(

            return ctx.thenPersist(
                    new OrderEvent.OrderReceived(order),
                    evt -> ctx.reply(order)); // 4

        private Behavior processing(OrderEvent.OrderReceived evt) {
                new PendingBehaviorBuilder(
                .getBehavior(); // 5

Lagom persistent entities are essentially finite-state machines. This means that an entity transitions between states based on events and has different behavior depending on what state it’s currently in. The state transitions for an order are as follows:

  • Pending
  • Fulfilled
  • Failed

Persistent entities always start out uninitialized, which is captured with UninitializedBehaviorBuilder (1). In this state, we accept a command, PlaceOrder (2), which can emit a state-changing event, OrderReceived (3). When the OrderReceived event is persisted (4), we transition to the pending state by returning a PendingBehaviourBuilder (5).

You may be asking yourself where and how order events are published as domain events to Kafka. Only persistent entities can publish domain events to a Kafka topic. This is all handled in Lagom by defining a TopicProducer.

For orders, domain events are published by providing a TopicProducer in BrokerServiceImpl. Lagom takes care of the remaining details.

    public Topic<OrderResult> orderResult() {
        return TopicProducer
            .taggedStreamWithOffset( // 1
                OrderEvent.TAG.allTags(), // 2
                orderRepository::orderResults); // 3

We return a TopicProducer that’s a sharded stream, meaning that it contains more than one partition (1). We can filter out events persisted by the entity, but in this case we will publish all events of type OrderEvent (2). Finally, we need to provide a method where we can perform filtering and transformation operations on the events themselves (3). In our case, we need to transform the aggregate events into domain events that are appropriate to be shared outside of the bounded context.

Understand the rationale behind Lagom’s approach to publishing.

You can only publish events from a persistent entity. There is no way to arbitrarily publish domain events outside of a persistent entity. In some cases you will find yourself creating persistent entities simply to publish events to Kafka. As you are working through the Reactive Stock Trader code, this is an important consideration, especially if you are wondering why a particular persistent entity exists. Remember that it may be specifically created to publish domain events.

Portfolio service

Finally, let’s look at (a very trimmed down version of) PortfolioServiceImpl to see what happens once an order is completed.

public class PortfolioServiceImpl implements PortfolioService {

    public PortfolioServiceImpl(PortfolioRepository portfolioRepository,
                                BrokerService brokerService,
                                ReadSide readSide,
                                CassandraSession db) {
            .subscribe() // 1
                1, this::handleOrderResult));


In the constructor of PortfolioServiceImpl, we subscribe to all types of OrderResult (1), which will be published by the broker.

    private CompletionStage<Done> handleOrderResult(OrderResult orderResult) {

        PortfolioModel portfolio = portfolioRepository

        return orderResult.visit(
            new OrderResult.Visitor<CompletionStage<Done>>() {
                public CompletionStage<Done>
                    visit(OrderResult.Fulfilled orderFulfilled) { // 1

                    return portfolio.processTrade(

                public CompletionStage<Done>
                    visit(OrderResult.Failed orderFailed) { // 2
                        return portfolio.orderFailed(orderFailed);

On all successful orders we invoke processTrade on the PortfolioModel (1), and on all failed orders we invoke orderFailed, instead (2).

We encourage you to follow through the remaining logic of orders all the way through the portfolio. You now have an idea of how we leverage the Message Broker API to communicate between bounded contexts.


This unit has covered the basics of publish-subscribe in Lagom, and we hope you have enjoyed learning about an alternative to complex point-to-point, database, and shared library integration.

By introducing a message broker, such as Kafka, into our architecture, you can avoid point-to-point calls that may spiral out of control into cyclical dependencies. This is especially important as our system grows. A few services may be manageable when they speak to each other directly, but dozens can quickly become unweildy.

Another benefit of asynchronous messaging is that in a complex system, the publish-subscribe pattern makes it much easier to test components in isolation. Rather than mocking out test APIs, you can directly inject events into topics on a test broker of Kafka and observe the behavior of your systems. You can simulate various behaviors by creating different patterns of messages and observe how they are handled. This is in stark contrast to the failures of the past, specifically SOA, which forced developers to set up complex ESBs and databases in order to verify the behavior of their system. Imagine trying to simulate the behavior of a single service by perfectly mocking the state of a complex shared database.

The next unit will expand on the topics we’ve covered in this unit to look at real-time streaming data and how we can transform Reactive Stock Trader into a completely real-time system that streams updates to users in real time.

Previous: CQRS, Part 3 – ReadSideProcessor for transactionsNext: Streaming data