Introduction

This unit picks up where Unit 4 left off, showcasing what the PortfolioEntity is and how to implement your own PersistentEntity in Lagom. We’ll begin by covering how to write to a PersistentEntity across various components of the Reactive Stock Trader. This will lead us into event sourcing.

If you recall from the previous unit, we obtain a reference to an entity in Lagom as follows:

PersistentEntityRef<PortfolioCommand> ref =
    persistentEntities.refFor(PortfolioEntity.class, portfolioId);

Let’s dive into what a persistent entity is in Lagom, and how it relates to event sourcing.

Event sourcing

Event sourcing is an approach to persistence that derives the current state of an entity from events, rather than storing the current state explicitly and reading back that value, for instance, by updating a row in a relational database using SQL. In other words, in event sourcing, we derive current values from past events rather than setting current values and throwing away the events. Think of event sourcing as an alternative to CRUD. Whereas CRUD supports creating and reading records in a relational database while periodically performing update and delete operations on specific records, event sourcing limits the vocabulary of persistence to create and read operations.

Before we go any further, it’s important to understand the difference between a physical operation and a logical operation. CRUD semantics are based on physical operations. For instance, a SQL UPDATE physically alters a record in a database.

Assume that we update a row in a table. In the future, if we want to know the latest value of that row, we query the table, find the row, and return the latest value. What’s lost in this style of development are the transitions from one value to another over time. In CRUD systems, the log of values over time is considered unimportant. Only the most recent value is preserved. It’s true that we could create a secondary table to also manually record state transitions, but it’s considered of secondary importance to the current values; if someone forgot to create the journal entries of state changes, the application may still be promoted to production.

In contrast, event sourcing is based around logical operations (compared to physical operations), similar to bookkeeping in an accounting context. In accounting, information is never altered or destroyed. Similarly, in an event-sourced system we maintain state by collecting events about a specific entity, persisting those events to a journal, and then providing a read model about that entity in order to extract relevant knowledge based on the events. The read model is simply a summation of events about an aggregate root from the beginning of time until the read is executed.

The easiest way to visualize this is to think of double-entry accounting and, more specifically, how your bank handles your checking account. If you go to the bank to deposit a cheque, the teller certainly doesn’t erase all other transactions and then update your account balance in a journal. The log of transactions is considered as important, if not more important, than your current bank balance, which may even be out of date for a few minutes or hours after you visit a bank branch.

image

As developers, we’ve been conditioned to think that CRUD is a normal way to handle data in the software we build, which partially explains why so many errors and anomalies occur in enterprise software. What we consider “normal” in the programming world would look totally unprofessional in other contexts, like accounting and finance.

Note: Event sourcing will take some adjusting to before it feels like a natural approach to persistence for developers used to CRUD-based systems. However, for developers working on high-value systems, it’s important to know how something happened. Event sourcing is built from the ground up on this philosophy.

One of the core tenets of event sourcing is that data is not modified or deleted. New events are recorded in a journal (similar to a ledger in accounting), and the current values of entities can be read by summing all events. This is similar to obtaining your current bank balance based on the sum of all deposits and withdrawals over a period of time. Physical updates and deletes are eliminated, only logical updates and deletes are permitted because physical operations destroy previous values, making recovery of state at a previous point in time almost impossible without complex recovery processes from backups. In event sourcing, if you need to update a value or delete an entity altogether, that’s simply a new event.

Event-driven semantics offer a huge advantage in distributed systems. They allow us to integrate components together, namely aggregate roots (entities) and bounded contexts (microservices) based on publish/subscribe semantics.

Not only do events guide us towards pub/sub, the events themselves are reflected all the way into the core of the applications in the persistence tier itself. In a fully event-sourced system, events are the conceptual building block of the system, all the way from the user interface to application logic, to storage. If we click on a button in the UI that effectively means “user’s address updated”, that’s exactly the language of the event that we will persist to disk.

By using event sourcing we can eliminate a whole host of ORM-related complexities (object-relational mapping), but there is a cognitive shift required to effectively architect and develop event-sourced systems. While event sourcing is a new approach that won’t solve every issue in your system, for high-value services that need to scale, we can eliminate a host of complexities and inefficiencies caused by object-relational mapping impedence.

Domain-driven design (DDD) terminology

The core abstractions we will work with are aggregates and aggregate roots. These are both terms from DDD, used to describe and model a business domain in a higher level of precision than at the bounded context level (a bounded context contains aggregate roots, and aggregate roots contain aggregates).

Aggregate roots are the main abstraction that we will cover in this unit. Think of an aggregate root as the entity that will be retrieved based on a lookup ID, such as a bank account. Commands will be directed to the aggregate root and events emitted by the aggregate root. The aggregate root can also subscribe to events from other aggregate roots.

Aggregates are various pieces of data associated with an aggregate root, such as the individual deposits and withdrawals of a bank account. You never work with aggregates directly outside of the context of an aggregate root. Aggregate roots form a consistency and transactional boundary around aggregates.

All aggregate roots are persistent entities in Lagom. A PersistentEntity in Lagom is an aggregate root in DDD. With this in mind, we’ll now cover exactly how aggregate roots are represented in Lagom as persistent entities.

Persistent entities can do a few interesting things, such as:

  • Receive commands

  • Create events

  • Apply events and change state

  • Emit event to subscribers

Persistent entities can also be created, looked up from a repository, and logically deleted (although all events and data associated with a persistent entity will remain in storage — more on this later).

Note: We’ll explore the aggregate root and aggregate boundaries in increasing depth throughout the rest of this series. We recommend moving on from this unit only after these concepts feel intuitive.

Event sourcing in Reactive Stock Trader

Let’s consider two values that are interesting in a stock trading context: the current value of a portfolio and the estimated future performance of a portfolio.

A portfolio will likely hold a number of equities and perhaps some cash. Each trade will have a certain number of shares associated with it, along with a date and time that the trade was executed, and the strike price of the trade. We will also have a number of wire transfers in and out of the portfolio as we move cash in and take profits out.

Based on the non-trivial amount of interactions happening, we need to understand how the portfolio’s current value and rate of return have been calculated, not simply what the current value is. To do this we will derive the values based on all past events. Each event — trades and transfers — will be persisted to durable storage from the moment that a portfolio is created. There will be no way a state change happens without an event being the reason for the state change. This is supported by the fact that at the core of persistence in Lagom is the thenPersist(...) method.

thenPersist
Lagom provides some very handy features out of the box, including automatic persistence in development for entities using Cassandra. For now, assume that entities store events to disk in a durable fashion when you see thenPersist. We’ll dive deeper into persistence mechanisms in a subsequent unit.

Let’s go over a partial example of event sourcing below. The open(...) method is within the portfolio’s entity class definition. This code enables us to open up a new portfolio with a portfolio name (as we showcased in Unit 2 when we prototyped the UI).

private PersistentEntity.Persist open(PortfolioCommand.Open cmd, // 1
                                      CommandContext<Done> ctx) {
    PortfolioEvent.Opened openEvent = PortfolioEvent.Opened.builder()
        .name(cmd.getName())
        .portfolioId(getPortfolioId())
        .build(); // 2
    return ctx.thenPersist(openEvent, (e) ->
                           ctx.reply(Done.getInstance())); // 3
}

In the example above, we have three of the four key steps involved in event sourcing:

  • Receive commands (1)
  • Create events (2)
  • Apply events and change state (3)

To expand on the above, we accept a command (1) and then build the event itself (2) using the builder pattern. We accomplish this by passing in a portfolio ID, which is system-generated, and the name of the portfolio, which the user provided with a form when they created the portfolio.

Note: In some cases we won’t be able to process the command (for example, a precondition fails), in which case we may build a different event. All of these details should be captured during event storming. If failure scenarios are missing during implementation or new questions arise, simply unroll your event storming output and do some additional storming!

Next, we wrap the ‘open event’ along with a function to invoke once we persist the event. We return thenPersist(…) as a value, which is a Persist object in Lagom.

If you’ve read this far and are still wondering where typical CRUD-style persistence is, there is none! All persistence will happen through the Lagom persistent entity API, which is backed by Cassandra under the hood by default.

Note: Lagom does support relational databases for those who wish to take a traditional approach to persistence. Relational databases are often used for precomputing queries, which we will cover in Unit 7.

This effectively shifts state out of relational databases and into the domain itself. As you can see, there is some additional code to write, but as we will demonstrate, this upfront investment will pay dividends over time. Conceptually speaking, event sourcing is more simple than CRUD, although, at first, it may be a little unusual.

Let’s repeat our mantra of event sourcing and cover the four steps in slightly more detail:

  1. A command is received by an entity.

  2. The entity checks to see if the command can be applied.

  3. If the command can be applied:

    1. The entity creates an event.

    2. The entity changes state based on the event details.

    3. The entity persists the event.

    4. The entity publishes the event to a topic.

  4. If the command cannot be applied:

    1. The entity creates a failure event.

    2. The entity publishes the failure event to a channel.

Diving into PersistentEntity

Let’s explore the PortfolioEntity in more detail, specifically covering three aspects of an entity in Lagom:

  • API

  • State

  • Behavior

The first two building blocks of an entity —- API and State —- can be visualized with the signature of the PortfolioEntity itself:

class PortfolioEntity extends PersistentEntity<PortfolioCommand,
                                               PortfolioEvent,
                                               Optional<PortfolioState>> {
    // implementation
}

PersistentEntity is a parameterized type, so we need to provide it with a three different types:

  • Command type

  • Event type

  • State type

In the case above, we accept PortfolioCommand types, we emit PortfolioEvent types, and state is contained within an optional PortfolioState type.

State transitions

The behavior of an entity is based on its current state. PersistentEntity in Lagom is inspired by Akka FSM, a toolkit for Akka used to implement finite state machines. In order to fully understand a persistent entity in Lagom, we should understand how finite state machines work.

From the Akka documentation, an FSM works as follows:

State(S) x Event(E) -> Actions (A), State(S’)

If we are in state S and the event E occurs, we should perform the actions A and make a transition to the state S’.

Let’s decode this. Think of state transitions like a recipe: in order to get a new state, we need to combine our current state along with an event. Once we ‘apply’ the event to the current state, we next perform an action, and then the transition to a new state is complete.

Note: We will cover the importance of recovery in much more detail during the operational units later in this series. In a cloud-native, distributed system, it’s absolutely critical to restore state in a rapid fashion, as entities may be relocated from one node in the cloud to another.

Let’s revisit the thenPersist(...) method:

return ctx.thenPersist(openEvent,
    (e) -> ctx.reply(Done.getInstance())
);

When we update the state of our portfolio, we’re not modifying any variables in place. Every time we modify even a single aspect of the portfolio, we create a new version of that portfolio in memory with the appropriate new values. The ‘old’ portfolio is effectively de-referenced by Lagom, and the reference now points to the new copy.

image

Persistence is a separate concern from state transitions, although they are closely related. Persistence occurs as each event is written to our journal, which by default is Cassandra. If we ever need to restart our system, each entity can recover to the correct state by playing back every event for that entity one at a time, going through each state change as described above until it recovers to the current state.

Event sourcing is like having super powers. You can literally go back to a certain point in time, recover from the journal up to that stage, and play totally different events on top of the old state. This can be like magic for testing by injecting events into a system to observe behavior. But with superpowers comes great responsibility, which you will learn over the next four units.

Once you’re comfortable with the concept of event sourcing, modeling and building event-driven reactive systems will become second nature. It’s perfectly okay if this seems unintuitive at first. In typical CRUD-based Java development, it’s much more standard to mutate local variables one at a time. For example, if a customer updates their address, we simply change the address and leave all other values in a ‘customer object’ alone. However, Lagom works with immutable state. If an event happens that causes the change to only a single value in an aggregate boundary, such as ‘customer address updated’, Lagom creates a copy of the current state and applies the change on top of it. This results in the new state.

Valid states and changing behavior

Up until now, we’ve only focused on a single command, which is the open(...) method, in order to open a new portfolio. What’s interesting about this command is that it transitions a portfolio to the open state.

Currently, we only have three states that a portfolio can be in:

  • Uninitialized
  • Open
  • Closed

If we were modeling a real-world trading portfolio, we may have an extensive amount of valid states that a portfolio can be in:

  • Uninitialized
  • Good standing
  • Suspended
  • Under review
  • Closed

In Lagom, the behavior of an entity is based on the current state of the entity (for example, open, closed), similar to runtime behavior swapping using the strategy pattern in traditional Java applications.

For instance, attempting to open an already open portfolio should result in an error, as you cannot open a portfolio twice. Another example is that if a portfolio has been closed, any trade-related commands to it should fail, as the portfolio is no longer in a valid ‘open’ state to process trades. However, if the portfolio is in an open state, trades should be accepted and executed.

Dynamic behavior based on state makes it possible to model entities based on various real-world situations. It also means that the valid state of an aggregate root should be considered during event storming.

Portfolio-state transitions

Let’s walk through the complete lifecycle of an entity to demonstrate these core concepts in action. We’ll start with our initial behavior.

When we get a reference for an ID, we may get:

  • An entity with no snapshot/journal (none) (1)

  • An entity whose current state is uninitialized (some(none)) (2)

  • An entity whose current state is \’open\’ (some(some(Open))) (3)

  • An entity whose current state is \’closed\’ (some(some(Closed))) (4)

From a business-logic perspective, states 1 and 2 are actually the same thing: a brand new uninitialized portfolio with no events. We are very unlikely to see 1; however, we can’t guarantee this through compile-time safety, so we need to assume we can receive a ‘none’ when requesting a portfolio by portfolio ID. We should expect to routinely see both 3 (an open portfolio) and 4 (a closed portfolio).

Working with a nested some(...) is awkward. Our first order of business is to flatten this down to None, Some(Open), or Some(Closed). Let’s walk through how we accomplish this.

Initial behavior

In order to determine which state the portfolio is in on creation, we need to define its initial behavior by overriding the initialBehavior hook in Lagom. (By ‘creation’, we mean object creation in Lagom and Java, not the opening of a portfolio or another business concept.)

@Override
public Behavior initialBehavior(Optional<Optional<PortfolioState>> snapshotState) { // 1
    return snapshotState
        .flatMap(Function.identity()) // 2
        .map(state -> // 3
            state.visit(new PortfolioState.Visitor<Behavior>() {
                @Override
                public Behavior visit(PortfolioState.Open open) {
                    return new OpenPortfolioBehavior(open).getBehavior();
                }

                @Override
                public Behavior visit(PortfolioState.Closed closed) {
                    return new ClosedPortfolioBehaviorBuilder().getBehavior();
                }
            })
        )
        .orElse(new UninitializedBehavior().getBehavior()); // 4
}

We begin with an ‘option of an option’ (1) as our initial state, which is admittedly confusing and not easy to work with. We need to ‘flatten’ it to remove the nested option. To do this, we leverage Function.identity(), a static helper method introduced in Java 8, which returns its input as its output (2). Function.identity() is commonly used in mapping functions where we simply want to take the next element in a stream (or collection, list, or container) unaltered. In our case above, we use it to flatten snapshotState from an ‘option of an option’ into an ‘option’ of PortfolioState.

Once we have an Option<PortfolioState> (2), we need to determine if there is a previous state associated with this portfolio (3). In other words, we need to determine whether our portfolio is initialized (recovered from a snapshot) or uninitialized (brand new).

In the event that we’re working with an initialized portfolio, we return the proper behavior. As you may recall, we leveraged the visitor pattern in a previous unit to ensure that we cover all potential states. As you can see with the code above, we only have two valid states: open and closed. Depending on which state the portfolio is in, we’ll return the appropriate behavior.

Uninitialized behavior

If we have no state, we can assume that the portfolio is brand new (4). In this case, we start with an uninitialized behavior, which means that we’ve just created our entity. When in this state, our portfolio can only process one command: open. So, when we start with a brand new portfolio that is completely uninitialized, we need to open it by providing a name for the portfolio (as you may recall from our event storming exercise).

class UninitializedBehavior {
    Behavior getBehavior() { // 1
        BehaviorBuilder builder = newBehaviorBuilder(Optional.empty());
        builder.setCommandHandler(PortfolioCommand.Open.class, this::open); // 2
        builder.setEventHandlerChangingBehavior(
            PortfolioEvent.Opened.class,
            this::opened
        ); // 3
        builder.setReadOnlyCommandHandler(
            PortfolioCommand.GetState.class,
            (cmd, ctx) ->
                ctx.commandFailed(new NotFound(String.format("Portfolio %s not found.",
                entityId())
        ));
        return builder.build();
    }

    private PersistentEntity.Persist open(PortfolioCommand.Open cmd,
                                          CommandContext<Done> ctx) { // 2
        PortfolioEvent.Opened openEvent = PortfolioEvent.Opened.builder()
            .name(cmd.getName())
            .portfolioId(getPortfolioId())
            .build();
        return ctx.thenPersist(openEvent, (e) ->
            ctx.reply(Done.getInstance()));
    }

    private Behavior opened(PortfolioEvent.Opened evt) { // 3
        return new OpenPortfolioBehavior(evt).getBehavior();
    }
}

In the event that there is no state associated with the entity, we will build a new behavior that represents a new portfolio (1). We register our command (2) and event (3) handlers, which define that we only have one valid command: open, and one valid event: opened.

Think back to event storming. A consideration for a successful event storming session is understanding the behavior of aggregate roots and how they shift behaviors as they move from one state to the other. The more event storming sessions you host, the more natural these nuances will feel!

As a general practice, event sourcing follows a few basics steps, regardless of framework:

  1. Receive a command.

  2. Test to verify that it can be applied.

  3. Generate an event for the applied command.

  4. Apply the event to the entity.

  5. Change state.

  6. Emit the event (to subscribers).

In the next unit we’ll explore in more detail how CQRS (Command Query Responsibility Segregation) compliments event sourcing and explain the full lifecycle of a “write” operation in more detail by walking through how to place an order for an open portfolio.

Conclusion

Event sourcing can be considered one of the most critical milestones towards embracing a completely reactive style of systems architecture and development. The next unit will cover CQRS (Command Query Responsibility Segregation) to showcase optimizing for reads and writes separately. CQRS and ES are complementary approaches that help to shift state out of relational databases and into the business domain itself, reducing ORM impedance mismatch that developers have struggled with for some time. Once implemented, this enables developers to treat events as the building blocks of their systems ‘all the way down,’ from the UI to persistence tier.

Previous: Concurrency, parallelism, and asynchronyNext: CQRS, Part 1 – Write side