Introduction

One of the most familiar aspects of modern enterprise development is relational databases. In many cases, an enterprise application can best be described as functionality that sits on top of a relational database, with the application acting as a facade of the relational database management system (RDBMS), with minimal functionality beyond persistence. When we move to alternate approaches of persistence, we lose the familiarity of RDBMSes and some of the power that they bring, especially when writing SQL queries. To give up this convenience, we need to realize a huge benefit.

This unit will not only show how to query an event-sourced and CQRS-based system, but also how to move processing into its own asynchronous boundary. In other words, views will be precomputed before being requested by the UI, and the computation of views will happen completely asynchronously and not impact any other aspect of processing within our system.

What are the benefits of this? Consider a modern enterprise system that is query-focused. Data may be persisted once but queried 100s of times. Why perform expensive computation to query the same piece of data over and over again when we can compute the query whenever the underlying data is updated? The precomputation of views is especially important in machine learning systems with massive amounts of data to contend with, where even the most optimized SQL query may take dozens of seconds to execute across vast quantities of data.

We’ve already seen how event sourcing and CQRS fit together, but up until now we’ve been missing the final — and arguably — most important piece of the puzzle. How do we query our system and obtain meaningful information from a collection of raw events?

This unit covers how event sourcing and CQRS compliment each other by transforming raw events into precomputed views that can be queried. Let’s start by walking through what happens when we open a new portfolio.

Event-sourced persistence

When we move to event sourcing, we naturally embrace CQRS and the separation of reads and writes. Thus far, in Reactive Stock Trader, all persistence has been accomplished through event sourcing by storing raw aggregate events. In Lagom, this is handled by default with Cassandra. When we open a new portfolio, we emit an “open portfolio” command, which is eventually handled by a persistent entity, PortfolioEntity. Once the command is successfully applied, a “portfolio opened event” will be persisted and then Done is emitted to the caller.

PortfolioEntity.java:

private PersistentEntity.Persist open(PortfolioCommand.Open cmd,
                                      CommandContext<Done> ctx) {
    PortfolioEvent.Opened openEvent = PortfolioEvent.Opened.builder()
        .name(cmd.getName())
        .portfolioId(getPortfolioId())
        .build();

    return ctx.thenPersist(openEvent,
        (e) -> ctx.reply(Done.getInstance()));
}

This is interesting, but what’s actually happening on disk? To understand more, let’s examine how events are persisted to Cassandra once they are written.

As a refresher, recall that ctx.thenPersist(…) is a method in the Lagom framework that handles both persisting the event and emitting a response to the caller.

On Windows or Mac, we can do this by downloading the free TablePlus application that supports Cassandra.

By default, when we start Lagom using sbt runAll, Cassandra is automatically launched on port 4000. Once connected, you should be able to choose from our various bounded contexts, including ‘broker’ and ‘portfolio.’ Perform some trades and choose ‘broker,’ after which you’ll see something like the following:

alt

You can see a few tables; the most interesting for us right now is the ‘messages’ table. Selecting this table will reveal all the messages processed within the broker context, leaving us with rows of events, along with any event metadata in JSON format, depending on how we’ve actually persisted the event itself.

Let’s open the ‘portfolio’ table now and take a look at some of the events on the portfolio side. Immediately, we’ll be able to notice a few interesting columns, three of which we’ll discuss further: event, ser_manifest, and tag1.

alt

event captures the raw JSON data associated with the event, otherwise known as the event payload. (Tip: Some events may be so complex that we don’t actually store this data inside the event itself, but rather a pointer to this data in a separate document store.)

ser_manifest is the event type itself. We can see that we have portfolio opened events, funds credited events, shares credited events, and so forth.

tag1 is a tag that we assign to the event, which helps us to control parallelism through sharding (more on this shortly). For now, we can consider a tag as a string that we provide to Lagom so that it can tag events in a way that’s separate from the event type itself. In our case, all events related to a portfolio share the same tag.

Over time, we will wind up with a tremendous volume of events as they form the core of our system!

However, it begs the question of how to query a system based almost purely on events. No matter how we tackle persistence, a core capability of any system is the ability to prepare concise and meaningful views for the UI.

Read-side support

Lagom provides this exact functionality in the form of read-side processors. For those who have read Sam Newman’s book Building Microservices, a read-side processor is the equivalent of a data pump. With read-side processors, you can subscribe to individual events and compute views with a totally different and independent schema. In the case of our ‘portfolio list’ that we would like to build, the read-side schema is incredibly simple and looks like the following:

alt

These rows will be kept up to date in near real time as portfolios are opened and created, with portfolio_id serving as the primary key. Let’s now dive into how to build this view and how we can then query this view to provide the information to the UI.

To accomplish all of the following tasks, we’ll be using Lagom Read-Side Support and, more specifically, a Lagom Read-Side Processor for Cassandra.

If you have specific requirements to use a database other than Cassandra for queries, Lagom has good support for both JDBC and JPA as an alternative to Cassandra for read-side processing. We’ll be covering Cassandra, as it has the best out-of-the-box support, but most of what we discuss here can easily be tailored to SQL with JDBC and JPA.

The first step is to understand what’s happening under the hood in Lagom. What we’re effectively doing is subscribing to events within an aggregate boundary based on a tag. And whenever we see a specific event type with a specific tag, we’ll read that event and use the event’s payload to update a read-side table.

In this example, we want to build and maintain a very simple list of portfolios in the system, which will be ready to query without any expensive computation done on the fly. In the UI, this will look like the following:

alt

We have a very simple home screen that lets us either ‘create a new portfolio’ or ‘choose a portfolio’ (to be the active portfolio). The first place to start is to create an event processor within the portfolio-bounded context.

Let’s start with a completely empty read-side processor so that we can discuss all of the components involved at a high level. We will not fill in the implementation details yet so that you can get a complete overview of the structure of a ReadSideProcessor.

PortfolioEventProcessor.java:

public class PortfolioEventProcessor extends ReadSideProcessor<PortfolioEvent> { // 1
  private final CassandraSession session;
  private final CassandraReadSide readSide;

  private PreparedStatement writePortfolios = null; // 2

  @Inject
  public PortfolioEventProcessor(CassandraSession session,
                                 CassandraReadSide readSide) { // 3
    this.session = session;
    this.readSide = readSide;
  }

  @Override
  public PSequence<AggregateEventTag<PortfolioEvent>> aggregateTags() {} // 4

  @Override
  public ReadSideHandler<PortfolioEvent> buildHandler() {} // 5

  private CompletionStage<Done> prepareCreateTables() {}

  private CompletionStage<Done> prepareWritePortfolios() {}

  private CompletionStage<List<BoundStatement>> processPortfolioChanged(Opened event) {}
}

We start by extending ReadSideProcessor<T> in Lagom (1), where T is the type of event that we’ll subscribe to in order to create our view. In our case, we’ll be processing PortfolioEvent.

In our constructor (3) we’ll need to inject two classes: CassandraSession and CassandraReadSide.

  • CassandraSession handles all interactions with Cassandra, such as preparing and executing statements.
  • CassandraReadSide specifically handles a few nuances of event sourcing, like the storage of offsets so that an event isn’t processed twice by the processor (more about offsets shortly).

Once we have our helpers injected (3), we’ll need to override two methods: aggregateTags and buildHandler.

Aggregate tags (4) capture the tags that we’ll be subscribing to in the processor.

Finally, the build handler (5) stitches everything together and creates the necessary configuration for Lagom to know how to process events for the read side.

Let’s begin to build up the implementation details of this skeleton class.

Aggregate tags

PortfolioEventProcessor.java:

@Override
public PSequence<AggregateEventTag<PortfolioEvent>> aggregateTags() {
  return PortfolioEvent.TAG.allTags();
}

The tags are defined in the event class itself, as shown above. Here we can specify exactly which tags we are interested in, which in this case are all PortfolioEvent tags.

PortfolioEvent.java:

int NUM_SHARDS = 5;

AggregateEventShards<PortfolioEvent> TAG =
      AggregateEventTag.sharded(PortfolioEvent.class, NUM_SHARDS);

@Override
default AggregateEventShards<PortfolioEvent> aggregateTag() {
    return TAG;
}

There are a few ways to define tags. The events that we process here should already be in place, which means they should already have a tag associated with them.

Our event is using AggregateEventShards<T>, which makes the events available for sharded read-side processing. The NUM_SHARDS value we specify is something of a parallelization factor, meaning that we’re going to spread the event to five different shards, which means that we can actually have five different streams processing these events in parallel. However, there’s a small catch. According to the documentation this number should never be changed!

Note: numShards should be selected up front, and shouldn’t change. If it does change, events for the same entity will be produced by different event streams and handled by different shards in the read-side processor, leading to out-of-order event handling.

We will cover sharding in significantly more depth in later units. For now, make a mental note that you will need to do a little bit of up-front capacity planning before deciding on the number of shards per event.

Build handler

Next we need to override the buildHandler() method, which contains the specific instructions to Lagom on which events that we want to process and how we want to process them in order to update the read-side view in Cassandra.

@Override
public ReadSideHandler<PortfolioEvent> buildHandler() {
  return readSide.<PortfolioEvent>builder("portfolio_offset") // 1
    .setGlobalPrepare(this::prepareCreateTables) // 2
    .setPrepare(tag -> prepareWritePortfolios()) // 3
    .setEventHandler(Opened.class, this::processPortfolioChanged) // 4
    .build();
}

The build handler defines a few characteristics of this read-side processor. We’ll cover each point line by line.

Create offset (1)

The processor needs to maintain an ‘offset’, which tracks the events that have already been processed. This is essential so that the processor doesn’t process an event twice or completely skip an event.

Remember that the view will be constructed in real-time as events flow into the system. Offsets control how the view is rebuilt. When the ReadSideProcessor starts (for example, after a crash), it checks the offsets and processes each one that it hasn’t yet seen. Remember that on a restart we only want to process events that have happened since this processor went offline, and the database will keep track of this based on the ‘tag’ that we define.

In Cassandra, this looks like the following:

alt

You’ll notice that we now have a row called portfolio_offset, which will keep track of which events have been processed for our read-side processor. Keep in mind that you do not want to reuse the name of an offset between two different processors, or this will cause issues.

Create the view table (2)

We need to create the view table itself. Here, we provide a method that contains the instructions used on start to create the table, if it doesn’t already exist. As we outlined at the beginning of this unit, we only need to track a few values: ‘portfolio ID’ and ‘portfolio name’.

private CompletionStage<Done> prepareCreateTables() {
  // @formatter:off
  return session.executeCreateTable(
      "CREATE TABLE IF NOT EXISTS portfolio_summary ("
        + "portfolioId text, name text, "
        + "PRIMARY KEY (portfolioId))");
  // @formatter:on
}
Provide a callback for prepared write statement (3)

We need to provide a callback that provides a pre-prepared insert statement. This callback will be executed once, and then all writes will be optimized. This is optional but highly recommended. The alternative is to prepare the insert statement directly inside the event handler, described in the following code (4).

private CompletionStage<Done> prepareWritePortfolios() {
  return session
    .prepare("INSERT INTO portfolio_summary (portfolioId, name) VALUES (?, ?)")
    .thenApply(ps -> {
        this.writePortfolios = ps;
        return Done.getInstance();
    });
}
Event handler per event type for this processor (4)

We’ll now want to chain together all event handlers, one handler method per specific event type. In our case, we’ll be processing two event types: ‘open events’ and ‘close events’ on a portfolio.

private CompletionStage<List<BoundStatement>> processPortfolioChanged(Opened event) {  
  BoundStatement bindWritePortfolios = writePortfolios.bind();
  bindWritePortfolios.setString("portfolioId", event.portfolioId.getId());
  bindWritePortfolios.setString("name", event.name);
  return completedStatements(Arrays.asList(bindWritePortfolios));
}

With this complete, our event processor is done, and we’re ready to wire it up to the rest of our system.

Registering the event processor

In order for the read-side processor to do anything, we need to register it with our service. This is super straightforward.

PortfolioServiceImpl.java:

@Singleton
public class PortfolioServiceImpl implements PortfolioService {

    // code removed here for clarity ...

    private final CassandraSession db; // 3

    @Inject
    public PortfolioServiceImpl(PortfolioRepository portfolioRepository,
                                BrokerService brokerService,
                                ReadSide readSide,
                                CassandraSession db) { // 4

        this.portfolioRepository = portfolioRepository;
        this.db = db; // 5

        brokerService.orderResult()
            .subscribe()
            .atLeastOnce(Flow.<OrderResult>create()
            .mapAsync(1, this::handleOrderResult));

        readSide.register(PortfolioEventProcessor.class); // 2
    }

   // code removed here for clarity ...
}

In the above example, we simply need to inject an instance of ReadSide (1), which gives us an ability to register our event processor with Lagom (2).

You may also notice some other additions. Up until now, we’ve only covered how to create and populate the read-side view. But that’s not very useful if we can’t query it! Before we can define a query, we’ll need to insert a reference to the CassandraSession (3) by injecting it in the constructor (4, 5).

Now we have our read-side processor registered and a Cassandra session defined. The next step is to build a query and wire the read-side channel up to the BFF so that the UI can consume it.

Implementing the query

In order to query our view table, we’ll need to implement a new service method in our API. We’ll call this getAllPortfolios(...), which is an appropriate name for the functionality we want to implement (a ‘portfolio selection screen’).

First, let’s add this signature to the service interface.

PortfolioService.java

public interface PortfolioService extends Service {
    // ...
    ServiceCall<NotUsed, PSequence<PortfolioSummary>> getAllPortfolios();
    // ...
}

We can see that we’ll be returning a PSequence<T>, with our type being PortfolioSummary. It’s a best practice to create a view type that captures the schema of the view and return a sequence of these for queries that may result in multiple rows.

PSequence is part of PCollections, a library used to create immutable objects in Java. You should become familiar with this library when working with Lagom, as it’s used extensively in code and referenced in the Lagom documentation.

The implementation of getAllPortfolios() in the PortfolioServiceImpl is provided in the following code. This code references the instance of CassandraSession that we injected into the service, which gives us the ability to work with CQL, a query dialect specific to Cassandra that is very similar to SQL.

@Override
public ServiceCall<NotUsed, PSequence<PortfolioSummary>> getAllPortfolios() {
    return request -> {
        CompletionStage<PSequence<PortfolioSummary>> result = db.selectAll(
            "SELECT portfolioId, name FROM portfolio_summary;").thenApply(rows -> {
                List<PortfolioSummary> summary = rows.stream().map(row ->
                    PortfolioSummary.builder()
                        .portfolioId(new PortfolioId(row.getString("portfolioId")))
                        .name(row.getString("name"))
                        .build())
                    .collect(Collectors.toList());
                return TreePVector.from(summary);
            });
        return result;
    };
}

Something to point out is that Cassandra does not support joins. To be successful with Cassandra, we need to think about data as denormalized.

If you need to aggregate data from a number of different bounded contexts, you can query a number of view tables independently here and merge them programmatically into the return type.

The above code will build a collection of PortfolioSummary objects, one representing each portfolio in our view table, and return the collection as a future. The consumer will be the BFF, which we will implement in the next section.

Adding the endpoint to the BFF

We’ll need to expose this query in the BFF so that it can be consumed by the UI code.

First, we’ll add a single method to the controller, getAllPortfolios(), which will return the collection of PortfolioSummary objects as JSON.

PortfolioController.java:

public CompletionStage<Result> getAllPortfolios() {
    val portfolios = portfolioService
            .getAllPortfolios()
            .invoke();

    return portfolios
            .thenApply(Json::toJson)
            .thenApply(Results::ok);
}

Consuming the query in the UI

Finally, in order to implement the full functionality of being able to select between multiple portfolios, we’ll want to update our Vue.js code to call the BFF endpoint above.

First, we’ll add the helper method that uses Axios to make the actual HTTP request and then return the JSON payload.

portfolio.js:

export function getAllPortfolios() {
  const url = new URL('/api/portfolio', baseUrl);
  const request = axios.get(url.toString());
  return request.then(response => response.data);
}

Next, we’ll add a new method to the mounted() callback in Vue.js to grab all portfolios when the Home.vue is loaded.

Home.vue:

mounted() {
  portfolioService.getAllPortfolios()
    .then(portfolios => {
      let p = portfolios.map(portfolio => ({
        id: portfolio.portfolioId.id,
        name: portfolio.name
      }));
      this.portfolios = p;
    });
},

Finally, in the same file we create a new div that displays the list of portfolios and allows users to choose between them, setting their selection to the active portfolio.

We have some subtle formatting logic within these buttons that could use some improvement, but we’ll consider Vue.js and UX best practices out of scope for this series.

<div class="col-6" v-if="portfolios.length > 0">
  <h2>Choose a portfolio</h2>
  <p v-for="portfolio in portfolios" :key="portfolio.id">
    <button
        v-on:click="setActivePortfolio(portfolio.id, portfolio.name)"
        v-if="getActivePortfolio() !== portfolio.id">
          Select
    </button>
    <button
        v-on:click="setActivePortfolio(portfolio.id, portfolio.name)"
        v-else disabled>
          Select
    </button>
     <b>{{ portfolio.name }}</b> ({{ portfolio.id }})
  </p>
</div>

Summary

In this unit, we demonstrated a single pattern “all the way down” for preparing and querying read-side views. This unit is critical for rounding out our real-world knowledge of event sourcing and CQRS, as this unit is the implementation of the ‘Q’ in CQRS. It also ties together some patterns from the microservices world, including the data pump pattern.

The most important takeaway from an operational perspective is how much faster queries can be performed. By precomputing views, we effectively remove all complex real-time SQL computation, including joins, by moving the heavy lifting into asynchronous workers.

Previous: CQRS, Part 1 – Write sideNext: CQRS, Part 3 – ReadSideProcessor for transactions