Introduction

This unit explores how to manage a long-running transaction flow. To demonstrate this, we’ll look at how the ‘wire transfer’ bounded context is modeled and implemented in Reactive Stock Trader. This will give you a flavor of how you would implement similar functionality in your own systems.

This unit also covers how to use a ReadSideProcessor to process wire transfer events. Wire transfers make for a good example of a long-running business process flow because they are not a single atomic operation, but rather a longer-running business process that involves multiple steps.

Note: This is an area of Lagom that is still in flux. The optimal solution would be a process manager type built into Lagom, but the final approach is still being contemplated by the Lagom team. For now, we recommend the approach outlined in this unit, as it works, is straightforward, and can be refactored if/when a process manager type is available in Lagom.

Read-side processor for transactions

Imagine that a customer requests a wire transfer to move \$10,000 from an external savings account into your trading platform. Now, imagine that a few days later, the source bank sends your system an error message that the funds are not available. Finally, imagine that the customer has already performed \$10,000 worth of trades in your system because you gave them access to the funds right away.

What impact would this have on your business? Would you need to undo this error, or would you compensate by extending the customer temporary credit? These business decisions will determine the most optimal and efficient approach to the problem of consistency in your system.

There are a few approaches to handling this scenario, depending on your business requirements.

Optimistic approach

The optimistic approach is coding like all transfers will complete successfully, eventually. So, as soon as the transfer request is placed, the \$10,000 will become available in the customer’s trading account. This is how many real-world checking accounts work. Let’s say we deposit a check in our checking account. If we’re a good customer, our account may have ‘holds waived,’ which means that the second we deposit the check we have access to the funds even though it may take days or weeks for the check to clear. Luckily, we’re a good customer and only deposit checks that clear, so “holds waived” are a positive thing: as a customer we have immediate access to funds, and our relationship with our bank remains strong, so we probably do even more business with them.

Pessimistic approach

The pessimistic approach is to assume that the check will not clear due to insufficient funds or fraud. This means that we will need multiple steps in our business process flow to ‘clear a check’ after it is deposited. The first step will involve the customer depositing the check, which is simply a fancy transfer request from one bank to another. The next step will involve the destination bank kicking off a behind-the-scenes check-clearing process with the source bank to verify that the funds exist and actually move the funds. For the deposit to complete, the cash involved must actually change hands from the source bank to the destination bank’s clearing account. Finally, the check isn’t ‘cleared’ and the funds available until the destination bank confirms that it has ‘custody’ of the cash. Once custody of the cash is verified, the check is cleared and the money will become available in the customer’s account.

A ‘clearing account’ is much different than your personal checking account. In a banking system, your bank doesn’t keep your funds isolated from the funds of other customers. The bank’s money is more like a pool of money. Your account is nothing more than a promise that the bank owes you a certain amount of money from the pool. Much of the bank’s money is invested and loaned out, which is how banks turn a profit. As a matter of fact, if all customers tried to withdraw their money at the same time, most banks would run out of cash! This is called a ‘run on the bank’ and happens during economic crises. Even in the real world, there isn’t much evidence of ‘strict consistency.’ Don’t try to avoid edge cases in your systems that don’t exist in the real-world business processes to begin with!

Let’s skip the optimistic approach and model the pessimistic approach in order to demonstrate an approach to transfers that involves clearing the transfer before making the funds available. We want to demonstrate this approach because most event sourcing and microservice tutorials take the easy way out and implement happy-path eventual consistency, skirting around transactions altogether. This almost completely avoids the toughest problems of a distributed systems architecture, which is finding the sweet spot between consistency, convenience, reliability, and performance.

To begin, we’ll re-explore read-side processors and how they can help us to implement transactional ‘clearing’ functionality for wire transfers. To get started, let’s first review the UI.

After creating a new portfolio, the first step you’ll need to take is to transfer money from a hypothetical savings account into your portfolio. Once the transfer is complete, the cash will be available in your portfolio and you can begin to place some trades.

Let’s walk through how this transfer flow is modeled and executed.

Business requirements

Our primary goal with this process is to avoid a few sad-path states. For instance, we don’t want an error midway through a wire transfer to essentially duplicate money, or even worse, erase money. We also don’t want to provide a customer with money that never enters our custody.

With this in mind, we’ll use Lagom’s ReadSideProcessor semantics to create a supervisor to handle the transfer flow.

The transfer process will:

  • Retrieve the funds from the source account
  • Send the funds to the destination account
  • Credit the customer’s portfolio with the funds

Let’s assume two critical business requirements: If the first step fails, the transfer should be canceled. If the second step fails, the transfer should be canceled and the funds should be returned back to the sender.

Revisiting read-side processors

As covered in the previous unit, a ReadSideProcessor works by subscribing to events in the journal (which is Cassandra by default) and processing each event with a specific tag.

Note: There are additional ways to perform similar processing, such as using the message broker API (which we will discuss in Unit 9). For now, we will focus on the ReadSideProcessor, as it’s the simplest way to implement this logic. We should already have a good understanding of how a ReadSideProcessor works from the previous unit.

First, we’ll need to define a new read-side processor by extending ReadSideProcessor, which “consumes events produced by PersistentEntity instances, and updates some read-side data store that is optimized for queries.” We will call this a TransferProcess.

TransferEvent

In Reactive Stock Trader, we have six different types of transfer events:

  • TransferInitiated
  • FundsReceived
  • CouldNotSecureFunds
  • DeliveryConfirmed
  • DeliveryFailed
  • RefundDelivered

You’ll be able to clearly see the ‘visitor pattern’ in the following code, which provides compile-time guarantees that we will handle all potential transfer events during processing.

TransferEvent.java:

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = Void.class)
@JsonSubTypes({
    @JsonSubTypes.Type(TransferEvent.TransferInitiated.class),
    @JsonSubTypes.Type(TransferEvent.FundsRetrieved.class),
    @JsonSubTypes.Type(TransferEvent.CouldNotSecureFunds.class),
    @JsonSubTypes.Type(TransferEvent.DeliveryConfirmed.class),
    @JsonSubTypes.Type(TransferEvent.DeliveryFailed.class),
    @JsonSubTypes.Type(TransferEvent.RefundDelivered.class)
})
public abstract class TransferEvent implements AggregateEvent<TransferEvent>, Jsonable {

    public abstract <T> T visit(Visitor<T> visitor);

    public interface Visitor<T> {
        T visit(TransferInitiated transferInitiated);

        T visit(FundsRetrieved fundsRetrieved);

        T visit(CouldNotSecureFunds couldNotSecureFunds);

        T visit(DeliveryConfirmed deliveryConfirmed);

        T visit(DeliveryFailed deliveryFailed);

        T visit(RefundDelivered refundDelivered);
    }
}

Aggregate tags

We covered tags in the previous unit, but we’ll revisit them in this context as well. There are four main flows that we need to consider:

  • Success
  • Insufficient funds
  • Failed transfer with refund
  • Cannot deliver refund
Success

transfer initiated → funds retrieved → delivery confirmed → transfer complete

This is the main flow that we expect when transferring money into a portfolio. Everything goes successfully, the money is withdrawn from the user’s bank account and is added to their portfolio.

Insufficient funds

transfer initiated → could not secure funds → transfer complete

This flow occurs when the user does not have enough money to complete the transfer. We simply end the transfer and display an error message to the user.

Failed transfer with refund

transfer initiated → funds retrieved → delivery failed → refund delivered → transfer complete

In this situation, we’re able to retrieve money from the user’s bank account, but for some reason are unsuccessful at depositing it to their portfolio. In a real-world system, we would likely attempt a manual fix first before refunding the money.

Cannot deliver refund

transfer initiated → funds retrieved → delivery failed → ???

This is purgatory.

Similar to the previous flow, we cannot deliver the funds to the portfolio, but in this case, we also are unable to refund the money within a reasonable amount of time, or experience a failure during refund delivery. A ‘delivery failed’ event without a sealing ‘transfer complete’ event within a reasonable amount of time will require manual intervention.

Tagging events

To tag events that will be handled by this processor, we need to make sure that each event type extends AggregateEvent and implements public AggregateEventTagger<TransferEvent> aggregateTag(). As we covered in the previous unit, a tag is metadata written to each event in Cassandra, which gives Lagom the ability to treat all events with the same tag as a stream of events that can be processed.

In the four main flows above we can see a number of events between ‘transfer initiated’ and ‘transfer complete’. We want to work on only the events that apply to this business process. The good news is that we’ve implemented all relevant events with the visitor pattern in TransferEvent, which means that all event types in TransferEvent will be tagged with the same value and therefore handled by our processor.

Keep in mind that tagging events is (somewhat) permanent. Each event will have a tag associated with it, which is a raw string type. Renaming classes in your application will not change the tags of existing events in the journal. During a refactoring exercise, you’ll either need to leave tag names alone regardless of name changes in your codebase or change the contents of the journal.

buildHandler()

To begin, we’ll create a new processor that extends ReadSideProcessor<T> with TransferEvent, as we will need to process all events of type TransferEvent.

We won’t cover the building blocks of the ReadSideProcessor<T> again, as we already covered them in depth in the previous unit. Instead, we will focus on the specific buildHandler() functionality required to handle wire transfers. However, just to orient ourselves, here is the TransferProcess class definition that we will work with:

TransferProcess.java:

public class TransferProcess extends ReadSideProcessor<TransferEvent> {
    // implementation goes here
}

Next, we’re going to create our own ReadSideHandler<T> type, which gives us flexibility when processing the transfer event, which we need to visit, as we’ve implemented the event using the visitor pattern. In the buildHandler() method, we will then pass our custom HandleEvent class, rather than use out-of-the-box functionality.

@Override
public ReadSideHandler<TransferEvent> buildHandler() {
    return new HandleEvent();
}

class HandleEvent extends ReadSideHandler<TransferEvent> {
    @Override
    public Flow<Pair<TransferEvent, Offset>, Done, ?> handle() { // 1
        return Flow.<Pair<TransferEvent, Offset>>create()
            .log("transferEvent")
            .withAttributes(
                Attributes.createLogLevels(
                        Attributes.logLevelInfo(),
                        Attributes.logLevelInfo(),
                        Attributes.logLevelInfo()
                )
            )
            .mapAsyncUnordered(concurrentSteps,
                e -> e.first().visit(transferEventVisitor)); // 2
    }
}

The ReadSideHandler<TransferEvent> will stream TransferEvent types along with an Offset (1). We ‘map over’ the live stream of transfer events with mapAsyncUnordered (2). We will ‘visit’ each event, which leverages the visitor pattern and ensures that we fail to handle a transfer event type.

Some of this language may be a little new for those who haven’t worked with streams, and specifically stream processing with Akka. mapAsyncUnordered means that we can handle each event out of order, completely asynchronously. This gives us a chance to run multiple ‘visitors’ in parallel. The only downside is that the events may be processed out of order. It’s helpful to keep in mind that order can only be maintained sequentially, so any time that asynchronous handling is introduced, ordering is lost. This is no different here than in other technologies, such as Kafka. concurrentSteps (above) is a configurable value for how many asynchronous handlers we wish to run simultaneously.

Next, we’ll implement the visitor itself, referenced as transferEventVisitor (2). This is a callback that will be executed with each event processed.

TransferEventVisitor

Each transfer event has an associated ‘visit’ behavior, which will be executed when handled by the read-side processor.

There’s a lot of code in each visit(...) method, so we’ll walk through the following code step by step.

We’ll start with the ‘transfer initiated’ reaction, which kicks off the wire transfer processing flow. First, we need to make sure that the transfer has been initiated by a portfolio before we begin processing (1). Next, we’ll build the FundsTransfer.Withdrawal command (2), obtain the portfolio ID (3), and invoke the withdrawal command on the portfolio service (4). In our sad-path case (5), we default to accepting the transfer, as this is a demo system. But in a real-world production system, you obviously don’t want a source of free money!

Inner class of TransferProcess.java:

class TransferEventVisitor implements TransferEvent.Visitor<CompletionStage<Done>> {
    // ...
    @Override
    public CompletionStage<Done> visit(TransferEvent.TransferInitiated transferInitiated) {
        val transferEntity = transferRepository.get(transferInitiated.getTransferId());
        if (transferInitiated
                .getTransferDetails()
                .getSource() instanceof Account.Portfolio) { // 1
            val transfer = FundsTransfer.Withdrawl.builder()
                .transferId(transferInitiated.getTransferId())
                .funds(transferInitiated.getTransferDetails().getAmount())
                .build(); // 2
            val portfolioId =
                ((Account.Portfolio) transferInitiated.getTransferDetails()
                    .getSource())
                    .getPortfolioId(); // 3
            return portfolioService
                .processTransfer(portfolioId)
                .invoke(transfer)
                .thenApply(done -> transferEntity.ask(
                    TransferCommand.RequestFundsSuccessful.INSTANCE))
                .exceptionally(ex -> transferEntity.ask(
                    TransferCommand.RequestFundsFailed.INSTANCE))
                .thenCompose(Function.identity()); // 4
        } else {
            // Any other accounts are out of scope. This means they will
            // freely accept and transfer money.
            // You don't actually want sources of free money in a production system!
            return transferEntity
                .ask(TransferCommand.RequestFundsSuccessful.INSTANCE); // 5
        }
    }
    // ...
}

Note the use of the term ‘reaction’ in our description above. Essentially, the above code is an implementation of a reaction in event storming parlance. If you recall from our event storming session in the very beginning of this series, we have two mechanisms for the issuance of commands:

  • UI input (such as a button click), and
  • System commands as reactions to events.

We’re using a ReadSideProcessor in Lagom as our implementation of a reaction. These can help us to model how system commands are generated inside our system without direct user input.

Next, let’s look at the event for ‘funds retrieved’. In this case, we’ve experienced a successful delivery of funds, so we can signal that the transaction flow has completed successfully and ‘seal’ it with a completion event.

@Override
public CompletionStage<Done> visit(TransferEvent.FundsRetrieved evt) {
    val transferEntity = transferRepository.get(evt.getTransferId());
    if (evt.getTransferDetails().getDestination() instanceof Account.Portfolio) {
        val transfer = FundsTransfer.Deposit.builder()
                .transferId(evt.getTransferId())
                .funds(evt.getTransferDetails().getAmount())
                .build();
        val portfolioId =
            ((Account.Portfolio) evt.getTransferDetails().getDestination())
            .getPortfolioId();
        return portfolioService
                .processTransfer(portfolioId)
                .invoke(transfer)
                .thenApply(done ->
                    transferEntity.ask(
                        TransferCommand.DeliverySuccessful.INSTANCE))
                .exceptionally(ex ->
                        transferEntity.ask(TransferCommand.DeliveryFailed.INSTANCE))
                .thenCompose(Function.identity());
    } else {
        // As above, any unimplemented account type just freely accepts transfers
        return transferEntity
                .ask(TransferCommand.DeliverySuccessful.INSTANCE);
    }
}

Finally, we’ll handle the ‘delivery failed’ scenario, which is our sad path case. In this case, we’ll need to issue a refund. We only get into this state if the money has already been moved from the source account to our system (the ‘custodian’), but, for some reason, it can’t be credited to the portfolio it was directed at.

@Override
public CompletionStage<Done> visit(TransferEvent.DeliveryFailed deliveryFailed) {
    val transferEntity = transferRepository.get(deliveryFailed.getTransferId());

    if (deliveryFailed.getTransferDetails().getSource() instanceof Account.Portfolio) {

        val portfolioId = ((Account.Portfolio) deliveryFailed.getTransferDetails()
            .getSource())
            .getPortfolioId();

        val refund = FundsTransfer.Refund.builder()
            .transferId(deliveryFailed.getTransferId())
            .funds(deliveryFailed.getTransferDetails().getAmount())
            .build();

        return portfolioService
            .processTransfer(portfolioId)
            .invoke(refund)
            .thenCompose(done ->
                transferEntity
                        .ask(TransferCommand.RefundSuccessful.INSTANCE));
    } else {
        return transferEntity
                .ask(TransferCommand.RefundSuccessful.INSTANCE);
    }
}

In real-world banking systems, this is not uncommon. For instance, if your bank receives a wire transfer to your account, but the transfer doesn’t contain a description for what the funds are for, your bank may decide to reject the wire and return the funds. An implementation like this could automate that workflow; after all, even a manual review and rejection of a wire transfer could become another event to include in our visitor logic, so this process can expand to handle all sorts of complexities of a real-world business process.

Conclusion

Read-side processors can be used for many types of operations. In this unit, we’ve used them to initiate calls from the read-side processor to a service to help us process a long-running transaction with multiple steps. Also, in this unit, we’ve completed the event sourcing and CQRS sections of this series. Let’s take this time to discuss reactive programming in general.

We feel that CQRS, event sourcing, and reactive systems will become much more prevalent. For example, the Axon framework hit 1 million downloads in late 2018, while Lagom continues to gain traction. Vlingo is another promising framework early in it’s development cycle, built around event sourcing, CQRS, and DDD principles.

We highly recommend that senior developers, technical leads, and architects gain a broad understanding of the patterns in this series so that they’re ready to tackle the business problems of tomorrow. For example, machine learning and AI will only add to the appeal of reactive systems. The volume of data that developers will deal with is going up exponentially, not down, so techniques to gain efficiency in reading, writing, and processing data is a clear way to tackling the complexity of high-quality enterprise systems.

We must point out that alternatives to event sourcing and CQRS are emerging from database vendors. For example, FaunaDB and Google Cloud Spanner are new-ish databases that promise strong consistency plus horizontal scale. FaunaDB is based on the Calvin paper, and the heritage of this product comes from Twitter. Google Cloud Spanner is based on the Spanner paper from Google and only available in GCP. Both look very promising for what they claim to offer. However, there are tradeoffs with databases that promise no compromises for scale, resilience, and consistency. For example, FaunaDB and Google Cloud Spanner are proprietary, strictly commercial endeavors. Fauna recommends hosting FaunaDB within their own ‘serverless cloud,’ while Google Cloud Spanner only works within GCP. This will not pass multi-cloud requirements in most organizations. They also do nothing to provide the ability to analyze and replay events. However, if you do not wish to build an event-sourced service and would rather keep CRUD plus OLTP semantics, these options are worth exploring.

Finally, remember that you can use both event sourcing/CQRS and OLTP-based services in a microservices architecture. Microservice architectures can be optimized and designed per service, rather than applying a one-size-fits-all approach to an entire system. Selecting the best approach and tools for each service is a reasonable path to creating efficient, extensible systems.