Win $20,000. Help build the future of education. Answer the call. Learn more

Reactive in practice, Unit 6: CQRS, Part 1 – Write side


It can be hard to land on a general-purpose data model that serves both reads and writes. CQRS (Command Query Responsibility Segregation) is an optimization pattern based on the principle of having a separate model for reads and a separate model for writes.

At its core, CQRS is a design pattern based around the separation of reads and writes. With CQRS, we build unique data models for reads and writes, giving us the ability to treat them as separate concerns with separate requirements. This separation of concerns gives us the ability to optimize each side individually, which is mandatory in systems that deal with a high volume of transactions, queries, or both.

Picture a microservices system that has a high-throughput processing service and a separate reporting service. In a real-world stock trading system, we may process a huge volume of trades every minute. Optimizing for a high trading volume requires treating writes separately from the computation of queries because persistence has significantly different requirements than reporting. For example, in stock trading, not only do we need to support high throughput, but we also need to ensure that trades are durable and not dropped. Losing trades would cost a customer and our company significant amounts of money in opportunity cost and penalties, not to mention the loss of reputation of our company.

On the flipside, we’re not trying to diminish the importance of reporting. Querying a high volume trading system in real-time is very computationally expensive. Think of your credit card statement: the reason that the current balance of your credit card is not updated in real-time is because querying the raw transaction log with credit card transaction would be completely impractical. For example, VisaNet handles an average of 150 million transactions every day and is capable of handling more than 24,000 transactions per second. Imagine updating up to 24,000 account balances in a relational database every second. It’s not possible.

In order to create a positive user experience, most high-volume systems must perform all sorts of optimizations for queries, including precomputing a view for your credit card balance. This helps to significantly improve the responsiveness of the UI by returning query results in seconds, versus minutes. We also have less stringent requirements around the resilience of a query service, so we’re likely to prioritize performance over durability; if a query fails here and there, it’s not as critical as dropping a trade.

Some enterprise systems, such as online banking and credit card processing systems, are naturally built around these concepts.

In a CQRS-based system, neither the write or read side share resources directly; for instance, database tables or model classes. Instead, the two sides should have their own models, underlying persistence, and leverage a data pump to transform raw events (or tables) from the write side into a more query-friendly data structure for the read side. One of the most common antipatterns in microservices is to “reach into” another service for data. This is also an antipattern is CQRS, as you can consider the read side and the write side as two separate services. The solution to this antipattern is to adopt an event-based push model, also referred to as a data pump, a term coined by Sam Newman in his book Building Microservices. This helps us to avoid a tangled nest of dependencies between microservices. In Lagom, we can implement this pattern using a ReadSideProcessor, which we will cover in the next two units.


In the diagram above, we can see events being persisted to the journal, and a ReadSideProcessor, which can subscribe to those events and either create new commands, or update a read-side database for querying. In the next two units, we will cover ReadSideProcessor in Lagom and how it acts as our data pump, which is one of the most important patterns in event sourcing, CQRS, and microservices.

There’s an important clarification to make before we proceed. CQRS and event sourcing are not dependent on each other. CQRS can be used without event sourcing. Many systems such as online banking, stock trading, and ecommerce systems are already based around some facets of CQRS, as it’s a natural way to architect systems with high scalability and resilience requirements.

However, it’s our opinion that event sourcing requires CQRS. Raw events are impossible to query directly in any kind of efficient manner. We need to transform events into a separate model for queries and transaction processing, and store those models separately from the write side.

Note: If we were to gradually introduce these concepts into an existing enterprise system, we’d start by adopting CQRS first, in order to perform write/read optimizations, and then gradually adopt event sourcing if and when business flows benefit from asynchronous processing. More on this will be covered in future units.

Unit 5 effectively covered the details of event sourcing. Up to now, we have covered:

  • How commands flow through the BFF (‘backend for frontend’) tier into the service tier
  • How the service tier processes commands and directs them to persistent entities
  • How persistent entities process commands in order to change state and emit aggregate events
  • How this is accomplished based on an event-sourced architecture

What we haven’t covered is how CQRS fits into a complete system. Therefore, the next three units will cover the complete scope of CQRS in an event -ourced system and how these patterns complement each other.

We will break CQRS into three parts:

  • Unit 6: CQRS write-side — this unit
  • Unit 7: CQRS read side — read side for precomputed views and queries
  • Unit 8: CQRS read side — read side for transactions

Let’s begin by covering CQRS architecture in broad terms before diving into the write-side architecture.

CQRS architecture

CQRS is broken down into two paths: the command side and the query side. In this series, we’ll simply refer to these sides as the “read side” and “write side”.

Note: We chose this naming convention, rather than “command side” and “query side,” because there are many operations that we can perform on the read side, such as querying, complex event processing, and precomputing views. “Query” is a limiting term that doesn’t cover the full spectrum of reads and the processing of reads.

Commands flow through the write side of our system, which affects the state of our persistent entities. After a state change, an aggregate event is created, written to a journal, and emitted. In Lagom, there are two main paths that aggregate events will follow. Each event will be:

  • Written to the journal
  • Emitted to one or more subscribers using a ReadSideProcessor or a raw event stream

The architecture in the following diagram highlights the high-level flow of events through a CQRS system for transactions, which we will cover in this unit. We can see that a ReadSideProcessor subscribes to events in the journal and can direct new commands to a service. Recall event storming and the difference between user-initiated commands and reactions. In this unit we will clearly cover how to use a ReadSideProcessor to react to events and create new commands.


Of the following listed items, those that are not crossed out will be covered in this unit. The items that are crossed out will be covered in the next unit.

Write side:

  • Commands
  • Command handlers
  • Entities
  • Entity repository
  • Event store
  • Event bus

Read side (transactions):

  • Read side processors
  • Raw event streams

Read side (queries):

  • Read side processors
  • View handlers
  • View store
  • Views
  • Read handler
  • Queries

Once we complete this unit and the next unit, we will have a complete picture of an event-sourced CQRS architecture.

Then we can move on to advanced concepts such as streaming data (Unit 9), advanced integration patterns (Unit 10), and operational concerns (Unit 11). Finally, we will conclude the series with a complete architectural overview (Unit 12).

Write-side architecture

Commands represent our intent to change state, while events represent the actual state change that has already occurred. Our main goal for the write side of Reactive Stock Trader is to ensure that all state changes are represented by an appropriate event and to make sure that each event is stored durably.


Commands form the backbone of our intent for something to happen in our system. Commands have their own data model and should be intentionally named. In other words, the name of a command should specify the exact intent without ambiguity. For example, the command to ‘open a new portfolio’ is called Open. It only requires a name, which we encode as a String.

Open command (in PlaceOrder):

class Open implements PortfolioCommand, ReplyType<Done> {
    @NonNull String name;

More complicated commands may require additional parameters, but we’ll want to keep the model as specific as possible to the command itself. For example, ‘placing an order’ requires significantly more details than opening a portfolio, which we can decompose into a more nuanced class hierarchy. For instance, the PlaceOrder command in the ‘broker bounded context’ requires an OrderId and an OrderDetails type.

Place order command (in PlaceOrder):

class PlaceOrder implements PortfolioCommand, ReplyType<Done> {
    @NonNull OrderId orderId;
    @NonNull OrderDetails orderDetails;

Note: In order to organize our common types, we’ve included them in a common models folder. In a real-world system, we may opt to not use shared libraries, but rather create more robust ‘serde’ logic. (‘Serde’ stands for serialization/deserialization and is becoming a common abbreviation for such.)

Looking at the PlaceOrder command above, you’ll notice an OrderId type and an OrderDetails type. OrderId is a simple data type that helps us to avoid stringly typed ids and other values that can and should be turned into a more meaningful type to avoid confusion. We recommend you follow this pattern in your code as well!

You’ll also notice that PlaceOrder defines a ReplyType. Done is the standard reply type. We immediately return Done because this is a command, and commands don’t have return values. We accept the command, return Done, and then begin to process the command.

Note: If you require a response from a command, you use a query, which we will cover in next unit. We will cover asynchronous command-handling mechanisms in later units.


public class OrderId {
    @NonNull String id;
    public static PathParamSerializer<OrderId> pathParamSerializer =
        PathParamSerializers.required("OrderId", OrderId::new, OrderId::getId);
    public static OrderId newId() {
        return new OrderId(UUID.randomUUID().toString());


public class OrderDetails {
    @NonNull Symbol symbol;
    int shares;
    @NonNull TradeType tradeType;
    @NonNull OrderType orderType;

This class hierarchy continues with TradeType, OrderType, and so forth. We should do our best to avoid unnecessarily complex class hierarchies, as commands should be simple data types.

Now that we have discussed the basics of a command, we need to understand how commands are constructed and how commands are processed.

Command handler (BFF to service integration)

Commands are constructed by the BFF (‘backend for frontend’) tier and then issued to the service tier. The service tier handles interaction with individual persistent entities. (For a refresher of our architecture, please review Unit 3.)

The following code creates a new ‘place order’ command in the BFF here. We will refer to this functionality as a ‘command handler,’ which is common terminology in CQRS systems and agnostic to the framework being used.

Place order controller (from PortfolioController in the BFF tier):

public CompletionStage<Result> placeOrder(String portfolioId) {
    Form<PlaceOrderForm> form = placeOrderForm.bindFromRequest();
    if (form.hasErrors()) {
        return CompletableFuture.completedFuture(badRequest(form.errorsAsJson()));
    } else {
        PlaceOrderForm orderForm = form.get();
        OrderDetails order = OrderDetails.builder()
            .build(); // 1
        return portfolioService
            .placeOrder(new PortfolioId(portfolioId)) // 2
            .invoke(order) // 3
            .thenApply(orderId -> {
                val jsonResult = Json.newObject()
                        .put("orderId", orderId.getId());
                return Results.status(Http.Status.ACCEPTED, jsonResult); // 4

We convert user input from the PlaceOrderForm into an OrderDetails instance (1). We then need to place the order in the correct portfolio, which we look up from the ‘entity repository’ by ID (2) and then invoke the lambda, which is returned from placeOrder with our order object (3). Finally, once successfully placed, we signal back to the UI (4).

Placing the order (3) is where some ‘new’ features of Java introduced in Java 8 come into play, specifically lambda notation. placeOrder returns a higher order function using the ServiceCall\<t, u\> return type, which returns a function that expects an order. .invoke(order), above, is what passes the order to orderDetails in the code below, and allows us to execute the function.

Let’s now explore what happens when we invoke the place order ServiceCall with an order.

Entity repository (service to entity integration)

Note that we haven’t interacted with a traditional database once. So far we’ve covered how to place an order by looking up a portfolio, creating an order command, and sending the command to the portfolio. So, how does persistence actually work?

The following code will begin to fill in the missing details about how persistence is accomplished in Lagom. Let’s start with placing an order in the Portfolio service, which was invoked by the BFF in the previous section.

Place order call in the Portfolio service:

public ServiceCall<OrderDetails, OrderId> placeOrder(PortfolioId portfolioId) { // 1
    return orderDetails -> { // 2
        val orderId = OrderId.newId();
        return portfolioRepository
                .get(portfolioId) // 3
                .placeOrder(orderId, orderDetails)
                .thenApply(done -> orderId);

In the above example, we’re constructing a higher order function that accepts a PortfolioId (1) and is invoked with an order of type OrderDetails (2). The reason that we construct this as a higher order function rather than a direct method call is that we want to allow Lagom a chance to decide how and when to execute this code asynchronously. Imagine these higher order functions being constructed as fast as possible and added to a queue, at which point Lagom can decide exactly how they are executed. In the meantime, a simple reply has already been returned to the UI, as to not block the user experience.

The main purpose of the higher order function is to look up an entity (3) so that the command can be sent to it. In Lagom, each entity is unique, based on its identifier. The command execution is occuring inside of a Future. When the future completes, we take the result of the future, map it to an orderId, and return that. We return the Future immediately, but it doesn’t complete until the command completes.

This is one of the most complex aspects of reactive, asynchronous programming to adjust to in your designs. It may take a little bit of time before asynchronous programming feels natural, but practicing and learning these concepts will make your software far more robust and performant!

While we’re in development mode, it’s safe to assume that each entity is unique and that you can obtain a reference to an entity by ID. Once you have a reference, it’s possible to send commands to an entity.

If we’re deploying Lagom to a production environment, we’re most likely running on more than one node. An entity will still only live in memory on a single node. This is how a Lagom system can scale — the more entities in our system, the more nodes we can add to the system. The runtime itself takes care of locating a reference to the entity using Akka Cluster Sharding, which is what Lagom itself is built on top of.

The repository handles high-level logic around entities themselves; essentially, the repository is where to put operations that interact with multiple entities. The following code is our portfolio repository in most of its entirety.


public class PortfolioRepositoryImpl implements PortfolioRepository {
    private final Logger log = LoggerFactory.getLogger(PortfolioRepositoryImpl.class);
    private final PersistentEntityRegistry persistentEntities;

    public PortfolioRepositoryImpl(BrokerService brokerService,
                                   PersistentEntityRegistry persistentEntities) {
        this.persistentEntities = persistentEntities;

    public CompletionStage<PortfolioId> open(OpenPortfolioDetails request) {
        val portfolioId = PortfolioId.newId();

        PersistentEntityRef<PortfolioCommand> ref =
            persistentEntities.refFor(PortfolioEntity.class, portfolioId.getId());

        return ref.ask(new PortfolioCommand.Open(request.getName()))
                .thenApply(done -> portfolioId);

    public PortfolioModel get(PortfolioId portfolioId) {
        return new PortfolioModel(persistentEntities, portfolioId);

The repository provides two methods for the following operations: opening a new portfolio and getting a PortfolioModel, which is a convenience wrapper around an entity. The wrapper helps us to avoid coding directly to the framework, which is an architectural principle that can make refactoring and upgrading simpler in the future. You can see in our portfolio service that we call the get method to obtain a PortfolioModel wrapper, which makes interacting with the entity a little easier than polluting the service code with extra entity-related boilerplate.

Next, we will cover the final piece of the write side puzzle: issuing commands to persistent entities. Specifically, we’ll cover how entities reliably persist state without using a relational database.

Event store (journal)

In event sourcing, we directly interact with entities in-memory, whereas with traditional systems — even modern microservice systems, such as Spring Boot — eventually your application code will read to and write from a traditional OLTP database in real time. This also means that for the most part, if using a traditional RDBMS, you’re dependent on the database for many facets of scalability and resilience.

In event sourcing, there is a data store underneath all of these events. Each event will be persisted to the journal, but only read back in a few circumstances:

  • When Lagom needs to restart after a system failure, the latest snapshot of an entity will be recovered from the journal and events beyond that replayed so that the entity can return to its most recent state in-memory.

  • When Lagom ‘passivates’ an entity by removing it from memory to conserve resources; if subsequently a passivated entity is interacted with, its state will be recovered from the journal in the same way as recovering from a system failure.

Compared to OLTP, this is a very limited set of interactions with disk-based storage. This is also the core reason that a horizontally scalable and wide column store, such as Cassandra, is the preferred approach for journaling.

The takeaway here is that the event journal serves a limited purpose, and that’s to provide resilience to persistent entities. However, all interactions with persistent entities will happen in-memory. We already covered how to implement persistent entities in the previous unit; however, for completeness, we will complete the example of opening a portfolio and dive a little deeper into how the event is persisted to the journal once generated.


private PersistentEntity.Persist placeOrder(
        PortfolioCommand.PlaceOrder placeOrder,
        CommandContext<Done> ctx) { // 1"Placing order %s", placeOrder.toString()));
    OrderDetails orderDetails = placeOrder.getOrderDetails();
    switch (orderDetails.getTradeType()) {
        case SELL:
            int available =
            if (available >= orderDetails.getShares()) {
                return ctx.thenPersistAll(Arrays.asList( // 3
                        new PortfolioEvent.OrderPlaced(
                            placeOrder.getOrderDetails()), // 2
                        new PortfolioEvent.SharesDebited(
                            orderDetails.getShares())), // 2
                        () -> ctx.reply(Done.getInstance()));
            } else {
                ctx.commandFailed(new InsufficientShares(
                return ctx.done();
        case BUY:
            return ctx.thenPersist( // 3
                    new PortfolioEvent.OrderPlaced(
                        placeOrder.getOrderDetails()), // 2
                    evt -> ctx.reply(Done.getInstance()));
            throw new IllegalStateException();

In the previous unit, we showed how event sourcing works with the ‘open portfolio command’. If you recall, we covered three of the four key steps involved in event sourcing:

  • Receive commands (1)
  • Create events (2)
  • Apply events and change state (3)

Let’s cover the distinction between a portfolio entity as it exists in memory, and the events that back it up, which have been written to the journal.

State (memory) versus journal (persistence)

The final piece of the puzzle for a complete understanding of CQRS and event sourcing is to distinguish in-memory state from events stored to the journal.

Whenever an event is persisted, it causes the event to be written to the journal, and then the event handler is invoked, and a state change occurs in-memory. The relationship between in-memory state of an entity and the journal is the key to gaining a complete understanding of CQRS, event sourcing, and Lagom. State and journaled events are not one in the same as they would be in a relational database.

Recall that in an event-sourced system, an entity can always be reconstructed in-memory by replaying all events back from the beginning of time. Rather than manually mutating variables, which would break the whole concept of event sourcing, persistent entities are actually finite state machines. Each persistent entity must have a behaviur assigned per valid state that it can be in. In the case of a portfolio, it can only be ‘open’ or ‘closed’.

We covered persistent entities in detail in the previous unit, but didn’t dive into behaviors and state changes in depth. What we will cover now is what happens after an event is processed and written to the journal.

The following code shows the command and event handlers for an ‘open’ portfolio.


private class OpenPortfolioBehavior extends
        PortfolioBehaviorBuilder<PortfolioState.Open> {

    OpenPortfolioBehavior(PortfolioEvent.Opened evt) {

    OpenPortfolioBehavior(PortfolioState.Open initialState) {

        builder.setCommandHandler(PortfolioCommand.SendFunds.class, this::sendFunds);


        setEventHandler(PortfolioEvent.OrderPlaced.class, evt ->
            state().apply(evt)); // 1
        setEventHandler(PortfolioEvent.SharesCredited.class, evt ->
        setEventHandler(PortfolioEvent.FundsDebited.class, evt ->
        setEventHandler(PortfolioEvent.FundsCredited.class, evt ->
        setEventHandler(PortfolioEvent.SharesDebited.class, evt ->
        setEventHandler(PortfolioEvent.OrderFulfilled.class, evt ->
        setEventHandler(PortfolioEvent.OrderFailed.class, evt ->

Event handlers control how state is updated after an event is successfully written to the journal. This is accomplished by Lagom by executing the lambda provided (1). Portfolio state is controlled by the PortfolioState class. For each one of the event types above, we will update the state of the portfolio based on the event itself.

PortfolioState can be considered an immutable wrapper that contains the read model of an individual portfolio. Each time a value is changed, the wrapper will be recreated, rather than modified.


public interface PortfolioState extends Jsonable {

    <T> T visit(Visitor<T> visitor);

    enum Closed implements PortfolioState {

        public <T> T visit(Visitor<T> visitor) {
            return visitor.visit(INSTANCE);

    interface Visitor<T> {
        T visit(Open open);

        T visit(Closed closed);

    final class Open implements PortfolioState {
        @NonNull BigDecimal funds;
        @NonNull String name;
        @NonNull LoyaltyLevel loyaltyLevel;
        @NonNull Holdings holdings;
        @NonNull PMap<OrderId, PortfolioEvent.OrderPlaced> activeOrders;
        @NonNull PSet<OrderId> completedOrders;

        public static Open initialState(String name) {
            return Open.builder()

        Open apply(PortfolioEvent.FundsCredited evt) {
            return this.withFunds(getFunds().add(evt.getAmount()));

        Open apply(PortfolioEvent.FundsDebited evt) {
            return this.withFunds(getFunds().subtract(evt.getAmount()));

        Open apply(PortfolioEvent.SharesCredited evt) {
            return this.withHoldings(holdings.add(evt.getSymbol(), evt.getShares()));

        Open apply(PortfolioEvent.SharesDebited evt) {
            return this.withHoldings(holdings.remove(evt.getSymbol(), evt.getShares()));

        Open apply(PortfolioEvent.OrderPlaced evt) {
            return this.withActiveOrders(, evt));

        Open orderCompleted(OrderId orderId) {
            return this

        public <T> T visit(Visitor<T> visitor) {
            return visitor.visit(this);

We can now see exactly how a portfolio stays up to date based on the events being processed. Each event type possible will cause an associated reaction to the state of the portfolio, such as increasing the funds on hand, or changing the holdings. We effectively create a brand new version of the Open portfolio with the new values, which replaces the old version of the Open portfolio in-memory.

If our system were to crash and we needed to restart, we can recreate each portfolio in-memory by replaying all of the historical events. Each event would change state one at a time until the portfolio is back to the proper, up-to-date state. This is the core of event sourcing. CQRS simply formalizes it by separating reads from writes, which will become even more obvious in the next unit!

Event bus

The final piece of the write side puzzle is how to share aggregate events outside of the persistent entity boundary.

An event bus provides the ability for events to be published to subscribers so that subscribers can consume and process them. This is effectively how the write side integrates with the read side in a CQRS system.

We can assume that we will be ‘publishing’ events to multiple destinations:

  • Journal
  • ReadSideProcessors
  • Raw event streams

We have already covered the journal. In the next two units, we will cover ReadSideProcessors and raw event streams for both transactional process and creating views for queries.

Lagom comes with Kafka out of the box for the event bus in development mode. This radically simplifies getting started with Lagom for developing CQRS-based microservices! ReadSideProcessor and raw event streams are both backed by Kafka under the hood.


In this section, you have learned how a persistent entity guarantees durability, even after a server failure or recovering from disk after passivation. You also learned that events can be pumped from the write side (backed by persistent entities) to the read side for further processing, which we will now explore in greater depth.

Reactive Stock Trader is built purely around event sourcing and CQRS, however, we wouldn’t recommend a real-world system based purely around these patterns. For instance, if we add ‘user accounts’ to Reactive Stock Trader, we would be perfectly comfortable to create an ‘account-bounded context’ backed by an ‘account service’ and use a relational database to store account details, or a cloud-based identity service. Having an infinite journal of events such as password changes or address changes is (probably) of low business value, while also increasing the complexity of the service.

For this reason, you should consider CQRS to be a technique for optimization in high-throughput systems and services rather than the first pattern you turn to for a new system. Greg Young advocates using CQRS in cases where it will give you a competitive advantage. If you are working in a portion of your system that is really just plumbing, then it’s not providing a competitive advantage, and you’re better off to use an off-the-shelf component. In the context of Reactive Stock Trader, the portion that handles the high-frequency trades is a competitive advantage. The portion that handles user logins and password updates, not so much. Good enough is good enough, unless you can prove that the expense of optimizations will lead to a competitive advantage.

On a related note, event sourcing without CQRS is unrealistic. As we’ve seen in the previous unit, our write model is based on individual, atomic events. Extracting meaningful information from a system based purely on events is virtually impossible. A big tradeoff of event sourcing is giving up our ability to easily query a shared data model with SQL, so once write-side persistence is moved from CRUD to immutable events, we need a different approach to queries and other types of read-side processing. We will cover read-side processing in the depth in the next two units.


Rather than transform our code into a relational database representation of the business, we’ve kept our code directly aligned with the business processes. Commands flow into a service, state changes happen, and events flow out. By leveraging event sourcing and CQRS in Reactive Stock Trader, the application code is a true representation of our business domain, all the way from inception during event storming, to implementation details as we’ve built out our Lagom entities.

The next units of this series will cover the nitty-gritty of how to build a system around CQRS. There are many articles on event sourcing and CQRS, but few examples are backed up by a functioning reference architecture. We hope you find these patterns useful.

Previous: Event sourcingNext: CQRS, Part 2 – ReadSideProcessor for queries and views