We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies.

We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies. Less

We use cookies and other tracking technologies... More

Login or register
to apply for this job!

Login or register to start contributing with an article!

Login or register
to see more jobs from this company!

Login or register
to boost this post!

Show some love to the author of this blog by giving their post some rocket fuel 🚀.

Login or register to search for your ideal job!

Login or register to start working on this issue!

Engineers who find a new job through Java Works average a 15% increase in salary 🚀

Blog hero image

Supervision & error handling in ZIO, Akka and Monix (part 3) + series summary

Adam Warski 30 July, 2018 (23 min read)

In the previous parts we’ve seen:

In this last part we’ll look at the final feature that makes actors stand out: error handling, supervision and the actor hierarchy.

1_AVlddrqI6KPGPh-JoBz-1g.png

What is this all about? Things fail all the time. Whenever you communicate with an external system, there might be a network error; the service might have a bug; requests can be malformed; servers might be down; etc.

Error handling is often a significant part of our application. Because it’s surface area is so large, it has a tendency to creep into each corner of our code and make it harder to read and understand. That’s why there are numerous efforts to contain the situation and separate the error handling code from the business logic.

If successfull, we’ll get clear, readable business logic, but also clear and readable error handling logic. Another part of the challenge is to get a degree of certainty that our error handling actually works!

Akka borrows from Erlang’s “let it crash” philosophy. The key idea is not to try to handle all errors in a process. Firstly, this leads to error code getting tangled with the business logic. Secondly, an actor can simply lack context to be able to fix the error.

For example, if there’s an actor whose sole responsibility is to read from a queueing system, and the connection to the queue breaks, what should the actor do? Re-create the connection? But how, if that’s not the responsibility of the actor?

That’s what supervision hierarchies in Akka (and Erlang, and other actor implementations) are for. Each actor has a parent; if an error is not handled by the actor, it is propagated to the parent. The parent can decide if the child process should be resumed, restarted, stopped, or if the error should be escalated to its parent.

Parent, grand-parent and so on, actors might have more and more context, and might hence be able to run appropriate logic — e.g. re-creating the connection to the queue and creating a new child actor which will read from the queue.

Using supervision hierarchies we also achieve the separation of concerns that we were after: the business logic is in the actor, while the error handling logic is in the supervisor.

Akka

As in the previous parts, we will start with an Akka example and see how to implement the same logic using Akka Typed, Monix and Zio. In this example we’ll:

  1. connect to an external queueing system through a QueueConnector trait
  2. after obtaining a connected Queue instance, read from the queue
  3. forward any messages to interested consumers
  4. upon any errors, attempt to re-connect, beforehand attempting to close() the old queue connection.

Here are the base traits we’ll be working with:

trait Queue[F[_]] {
  def read(): F[String]
  def close(): F[Unit]
}

trait QueueConnector[F[_]] {
  def connect: F[Queue[F]]
}

The traits are parametrised with a wrapper (higher-order) type F[_], which should be capable both of representing successful and failed computations. In the Akka example, we’ll be using QueueConnector[Future] and Queue[Future], as that’s the container type that Akka works with best.

We’ll implement a pattern that’s also known as “error kernel”. We’ll keep the important state safe & protected in a parent actor: here the state will be the set of registered message consumers (to which the messages read from the queue should be forwarded). The risky operations, which might fail: connecting to and consuming from the queue, will be delegated to a child actor. That way even if the child actor fails, the state will not be lost.

The parent actor will receive two types of messages:

case class Subscribe(actor: ActorRef)
case class Received(msg: String)

Subscribe to add an actor to the set of consumers interested in receiving messages, and Received, sent by the child actor, when a new message has been received from the queue. The subscribe and received-message handling logic is quite straightforward:

class BroadcastActor(connector: QueueConnector[Future]) 
  extends Actor with ActorLogging {
  
  private var consumers: Set[ActorRef] = Set()

  override def receive: Receive = {
    case Subscribe(actor) => consumers += actor
    case Received(msg) =>
      consumers.foreach(_ ! msg)
  }
}

But with this definition alone, nothing will really happen, as we never try to connect to the queue. That’s why when the broadcast (parent) actor starts, we’ll spawn a child actor:

class BroadcastActor(connector: QueueConnector[Future]) 
  extends Actor with ActorLogging {
    
  override def preStart(): Unit = {
    context.actorOf(Props(new ConsumeQueueActor(connector)))
  }
  
  // ...
}

What does the child actor do? Its internal state will consist of the currently connected Queue instance (if any). Once again we’ll use the preStart callback to try to connect to the queue immediately after the actor starts. As this is an asynchronous operation, we’ll pipeTo the result to the actor. That way the result of the connect operation will be sent as a message to the actor:

class ConsumeQueueActor(connector: QueueConnector[Future]) 
  extends Actor with ActorLogging {
  
  import context.dispatcher

  private var currentQueue: Option[Queue[Future]] = None

  override def preStart(): Unit = {
    log.info("[queue-start] connecting")
    connector.connect.pipeTo(self)
  }
    
  // ...
}

Once the connected queue is received, we can start reading messages from it. Each message, once available, will be forwarded to the parent actor, wrapped in Received. After a message is received, we can receive the next one by sending the queue to self (the current actor):

class ConsumeQueueActor(connector: QueueConnector[Future]) 
  extends Actor with ActorLogging {
   
  import context.dispatcher  
    
  private var currentQueue: Option[Queue[Future]] = None  
  
  // ...
  
  override def receive: Receive = {
    case queue: Queue[Future] =>
      if (currentQueue.isEmpty) {
        log.info("[queue-start] connected")
        currentQueue = Some(queue)
      }
      log.info("[queue] receiving message")
      queue
        .read()
        .pipeTo(self) // forward message to self
        .andThen { case Success(_) => self ! queue } // receive next message

    case msg: String =>
      context.parent ! Received(msg)

    case Failure(e) =>
      log.info(s"[queue] failure: ${e.getMessage}")
      throw e
  }
}

But what if there’s an error? If either connector.connect.pipeTo(self) or queue.read().pipeTo(self) fails, the actor will receive a Failure(e) message. We don’t really know what to do with that, so we are taking the easiest route: re-throwing the error — which will cause the actor to fail — and hence propagating the error to the parent.

Whatever the reason for the child actor to be stopped (either failure or a regular shutdown of the application), we make one last effort to clean up in the postStop method:

class ConsumeQueueActor(connector: QueueConnector[Future]) 
  extends Actor with ActorLogging {
  
  import context.dispatcher

  private var currentQueue: Option[Queue[Future]] = None

  override def preStart(): Unit = // ...

  override def postStop(): Unit = {
    log.info("[queue-stop] stopping queue actor")
    currentQueue.foreach { queue =>
      log.info("[queue-stop] closing")
      Await.result(queue.close(), 1.minute)
      log.info("[queue-stop] closed")
    }
  }
    
  // ...
}

If there’s any connected Queue instance (there might not be, if connecting failed), we try to invoke its close() method. As this is an asynchronous process, and the postStop method is synchronous, we have no other choice but to use Await.result.

And that’s all there is to the child actor; notice that there’s almost no error handling code at all (except for re-throwing any exceptions).

What will the parent do once a child fails? That depends on the supervision strategy. The default one is to Restart a child on “normal” exceptions. The strategy is defined in the parent actor as an overridable method:

class BroadcastActor(connector: QueueConnector[Future]) 
  extends Actor with ActorLogging {
    
  // ...

  // optional - the default one is identical
  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
    case _: ActorInitializationException => Stop
    case _: ActorKilledException         => Stop
    case _: DeathPactException           => Stop
    case e: Exception =>
      log.info(s"[broadcast] exception in child actor: ${e.getMessage}, restarting")
      Restart
  }
    
  // ...
}

Here we have a simple hierarchy with one child actor, but in more complex examples, besides restarting the actor (one-for-one), there is also the possibility of restarting all child actors if only one fails (all-for-one).

In addition, there’s also some flexibility in how the child actor is restarted. One option is to use backoff, that is not to restart the child actor immediately, but after a (growing) delay. If a system is down, it’s quite possible that it will be down if we try again right after failure. But if we wait a bit, it has a higher chance of getting back to shape. This is possible by wrapping the child actor in a BackoffSupervisor.

The example above is available in the GitHub repository, together with tests which simulate failures at various stages of the application. There’s quite a lot of logging going on, so you can observe what happens at each moment, when and if the actors are created and restarted.

Akka Typed

The Akka Typed implementation is slightly different in two aspects. First of all, failure handling is not tied to the parent actor. Instead, it’s a wrapper for a behavior which gives us more flexibility. Failure handling can be both defined in the parent, or can come pre-defined with the child actor behavior.

Secondly, if a parent actors spawns multiple child actors, each of them can have different supervisor handling — unlike the “global” configuration of the supervision strategy in the “traditional” Akka approach.

To implement our example we’ll define broadcastBehavior which will describe how the parent actor should behave. It will handle the same two types of messages as before, but because we need to parametrize the behavior with a single type, we introduce a common trait:

sealed trait BroadcastActorMessage
case class Subscribe(actor: ActorRef[String]) extends BroadcastActorMessage
case class Received(msg: String) extends BroadcastActorMessage

The message handling logic won’t have any mutable state. Instead, once again it will be a method parametrised with the state —the set of consumer actors — which is called recursively:

def handleMessage(consumers: Set[ActorRef[String]]): Behavior[BroadcastActorMessage] = 
  Behaviors.receiveMessage {
    case Subscribe(actor) => handleMessage(consumers + actor)
    case Received(msg) =>
      consumers.foreach(_ ! msg)
      handleMessage(consumers)
  }

But, before handling any BroadcastActorMessage, we should try to connect to the queue and start receiving messages. We’ll do that in a separate actor, spawned when the broadcast behavior is first created:

def broadcastBehavior(
  connector: QueueConnector[Future]): Behavior[BroadcastActorMessage] = 
  Behaviors.setup { ctx =>
    val connectBehavior = Behaviors
      .supervise[Nothing](connectToQueueBehavior(connector, ctx.self))
      .onFailure[RuntimeException](SupervisorStrategy.restart)
    ctx.spawn[Nothing](connectBehavior, "connect-queue")

    def handleMessage(
      consumers: Set[ActorRef[String]]): Behavior[BroadcastActorMessage] = // ...

    handleMessage(Set())
  }

We’re using Behaviors.supervise to wrap the child actor behavior (connectToQueueBehavior, which we’ll define next) so that whenever a RuntimeException happens, the actor will be restarted. Note that supervise is a wrapper for any Behavior, yielding a new Behavior. We could have defined it completely separately and outside of the parent actor. Depending on the use-case, it might be more logical to define it inside, or outside of the supervisor.

Even easier than before, we can also use restarts with a backoff by using SupervisorStrategy.restartWithBackoff (and others), instead of SupervisorStrategy.restart as in this case.

There’s an important difference between “traditional” Akka and Akka Typed. In the previous approach, we’ve seen that the default supervisor strategy for “normal” exceptions is to restart the child actor. In Akka Typed, the default is to stop the child actor. That’s why we need to explicitly specify what to do on child failures using onFailure.

The second difference from the previous implementaiton is that the child actor will in fact consist of two actors: one for connecting to the queue, the other for a connected queue. The reason why we need not only two behaviors but also two actors is that both of them will handle different types of messages. That’s the small price we’ll need to pay for type safety.

We won’t be sending any messages from the parent actor to the child actor, hence its type, as viewed by the parent actor, will be Behavior[Nothing]. Inside the actor, however, we are sending a message containing the connected queue, so we’ll need to create a behavior which accepts a Try[Queue[Future]] and then hide that fact from the parent using narrow:

def connectToQueueBehavior(connector: QueueConnector[Future], 
                           msgSink: ActorRef[Received]): Behavior[Nothing] = {
  Behaviors.setup[Try[Queue[Future]]] { ctx =>
    import ctx.executionContext

    ctx.log.info("[queue-start] connecting")
    connector.connect.andThen { case result => ctx.self ! result }

    Behaviors.receiveMessage {
      case Success(queue) =>
        ctx.log.info("[queue-start] connected")

        val consumeActor = ctx.spawn(consumeQueueBehavior(queue, msgSink), 
                                     "consume-queue")
        ctx.watch(consumeActor)

        // we can either not handle Terminated, which will cause 
        // DeathPactException to be thrown and propagated or rethrow the 
        // original exception
        Behaviors.receiveSignal {
          case (_, t @ Terminated(_)) =>
            t.failure.foreach(throw _)
            Behaviors.empty
        }
      case Failure(e) =>
        ctx.log.info("[queue-start] failure")
        throw e
    }
  }
}.narrow[Nothing]

Using the self-reference from the context, we are sending a message to self once the queue is connected (connector.connect.andThen { case result => ctx.self ! result }). If it’s a failure, we rethrow the error which will cause the supervisor in the parent to be invoked. If it’s a success, we spawn a child actor with the queue-consuming behavior (consumeQueueBehavior, defined below).

Note that instead of the preStart callback, in Akka Typed we simply create a behavior which runs the desired code when the actor is setup (using Behavior.setup), and then returns the “proper” behavior. There’s no looping in this actor, it only ever receives one message.

But that’s not the end. If the queue-consuming actor fails, we need to propagate that error to the parent. That’s not done automatically, we need to watch the new child actor (using ctx.watch). Then, the only thing left to do in the actor is to wait for the child’s termination signal (when things go wrong), and propagate that to the parent.

Termination signals are sent through a different channel than normal actor messages, hence the dedicated behavior factory (Behavior.receiveSignal, instead of the usual Behavior.receiveMessage).

Finally, we get to the behavior of the queue consumer:

def consumeQueueBehavior(queue: Queue[Future], 
                         msgSink: ActorRef[Received]): Behavior[Try[String]] =
  Behaviors.setup { ctx =>
    import ctx.executionContext

    ctx.log.info("[queue] receiving message")
    queue.read().andThen { case result => ctx.self ! result }

    Behaviors
      .receiveMessage[Try[String]] {
        case Success(msg) =>
          msgSink ! Received(msg)
          consumeQueueBehavior(queue, msgSink)

        case Failure(e) =>
          ctx.log.info(s"[queue] failure: ${e.getMessage}")
          throw e
      }
      .receiveSignal {
        case (_, PostStop) =>
          ctx.log.info("[queue-stop] closing")
          Await.result(queue.close(), 1.minute)
          ctx.log.info("[queue-stop] closed")
          Behaviors.same
      }
  }

Similarly to the “traditional” Akka implementation, we invoke reading from the queue and once the message is ready, we forward it to self (queue.read().andThen { case result => ctx.self ! result }). Once a message is received, we send it to the sink (that will be the parent actor) and recursively call the same behavior.

If it’s a failure, we simply throw the exception. That will cause the queue-connecting actor to be notified, which will in turn notify the parent actor.

What about closing the queue before the queue-consumer actor finishes (for whatever reason)? There’s no postStop method to override here like before. Instead, we modify the created behavior adding a receiveSignal handler. If we get a PostStop signal, we try to close the queue. Again, we need to synchronously return a new behavior, but the closing action is asynchronous — hence the need for the Await.

It’s important to note here that once again we are leveraging the fact that Behaviors, just like Monix’s Tasks and Zio’s IO are lazy. This allows modifying the (recursive) behavior by adding additional handlers or meta-data. Here, we are modifying Behaviors.receiveMessage[Try[String]] so that the signal handler is installed as well. If the behaviors were eagerly executed, the receiveSignal would never be called.

One more case where separating description of a computation from its interpretation is beneficial.

Monix

Let’s start examining the Monix implementation from the end, that is from the description of the task which will connect to the queue, consume messages from it and close it in the end (either due to normal termination or an error).

Instead of using lifecycle hooks (preStart, postStop in Akka), we’ll simply define a process which performs the connect-consume-close steps in sequence.

As in the previous parts, to communicate with the parent process we’ll use an MVar (a bounded, 1-element queue) which will store elements of type BroadcastMessage:

sealed trait BroadcastMessage
case class Subscribe(consumer: String => Task[Unit]) extends BroadcastMessage
case class Received(msg: String) extends BroadcastMessage

Next, we’ll define three separate tasks which connect to the queue, consume elements from the queue and finally close it:

val connect: Task[Queue[Task]] = Task
  .eval(logger.info("[queue-start] connecting"))
  .flatMap(_ => connector.connect)
  .map { q =>
    logger.info("[queue-start] connected")
    q
  }

def consumeQueue(queue: Queue[Task]): Task[Unit] =
  Task
    .eval(logger.info("[queue] receiving message"))
    .flatMap(_ => queue.read())
    .flatMap(msg => inbox.put(Received(msg)))
    .cancelable
    .restartUntil(_ => false)

def releaseQueue(queue: Queue[Task]): Task[Unit] =
  Task
    .eval(logger.info("[queue-stop] closing"))
    .flatMap(_ => queue.close())
    .map(_ => logger.info("[queue-stop] closed"))

The task definitions are pretty straighforward: they simply invoke the appropriate methods on the connector or a connected queue instance and perform some additional logging. Note that consumeQueue will never end normally, as after reading a single message and sending it the parent process (using inbox.put(Received(msg))), it’s always restarted to read another message (restartUntil(_ => false)).

Task[Queue[Task]] might look weird, but well … it’s a task which, when run, creates a Queue which in turn, wraps the results of its method in a Task.

How to combine these three tasks into a whole? We’ll use bracket:

def consume(connector: QueueConnector[Task], 
            inbox: MVar[BroadcastMessage]): Task[Unit] = {
  
  val connect: Task[Queue[Task]] = // ...
  def consumeQueue(queue: Queue[Task]): Task[Unit] = // ...
  def releaseQueue(queue: Queue[Task]): Task[Unit] = // ...
  
  connect.bracket(consumeQueue)(releaseQueue)
}

Note that we are using inbox as the name for the communication channel between the consume and broadcast processes to avoid name clashes, as Queue is already taken by our domain class.

bracket in an operator that forms one of the basic building blocks of error handling in Monix (and ZIO as well). It’s equivalent to the well known try ... catch ... finally construct from Java/Scala. The connect task should allocate the resources; then the first bracket parameter is the resource usage. Regardless of the way the resource usage part ends (either the task completes, there is an error or the fiber is cancelled), it’s guaranteed that the third task, to release the resources, will be evaluated as well.

And that’s exactly what we need! Using bracket, we can ensure that the queue will at least be attempted to be closed however the queue consumption ends.

It’s not quite clear what will happen when both resource-usage and resource-release parts throw an error. Which error will the user get? It’s the first one, and the second will be discarded. An important detail to keep in mind.

The above guarantees proper behavior when an error happens. But what if we just want to end the process gracefully? We might be no longer interested in consuming the queue. With Akka it was enough to stop the actor. Here we have to use cancellation.

In the previous parts we’ve used Fiber.cancel as well, to end a forked (asynchronous) process. Here the consumption logic will also be run asynchronously (as we’ll see below soon). If the user decides that queue consumption should stop, cancellation is the only hope to break the infinite consumption loop.

However, there’s a catch: by default a lot of things aren’t cancellable. For example, the infitite flatMap chain in consumeQueue (if we unfold the recursive invocations) will never be cancelled. That’s why we need to add a cancellation boundary using cancelable. This will cause the flat-map chain to allow stopping mid-way.

What cancelable does, in essence, is to instruct the interpreter of the task that when it receives a cancellation request for a fiber (ligthweight thread), and there’s an opportunity to stop executing the task — for example because the interpreter just finished one flatMap operation and is about to start another one — the task will be cancelled.

So far we’ve talked only about connecting to the queue. What about the rest? We still need to define the message-broadcasting process which will send the read messages to interested consumers. For that, we create a task which will handle both Subscribed and Received messages:

def processMessages(inbox: MVar[BroadcastMessage], 
                    consumers: Set[String => Task[Unit]]): Task[Unit] =
  inbox.take
    .flatMap {
      case Subscribe(consumer) => processMessages(inbox, consumers + consumer)
      case Received(msg) =>
        consumers
          .map(consumer => consumer(msg).fork)
          .toList
          .sequence_
          .flatMap(_ => processMessages(inbox, consumers))
    }

Nothing out of the ordinary that we haven’t seen before. We describe a never-ending process which reads messages from a queue, maps them to the appropriate tasks (updating the internal state — the set of consumers — if necessary) and recursively calls itself.

We still need to define how and when the consume process should be restarted:

def consumeForever(inbox: MVar[BroadcastMessage]): Task[Unit] =
  consume(connector, inbox).attempt
    .map {
      case Left(e) =>
        logger.info("[broadcast] exception in queue consumer, restarting", e)
      case Right(()) =>
        logger.info("[broadcast] queue consumer completed, restarting")
    }
    .restartUntil(_ => false)

That part of the broadcast process definition corresponds to the supervisor strategy. When a consume task fails — which can only happen due to an error — we have to decide what to do. Here we simply log the result and restart the process, just like as the supervisor’s restart.

While not built-in, that’s the place where we might use backoff or a limited retry mechanism; however, we’d have to code that by hand.

We’ve also managed to maintain the separation between the business and error-handling logic, however here it’s not enforced through a special mechanism. Instead, we are separating the Task description into a “single” consume task and a task which manages the restarts. Creating fine-grained, single-responsibility task descriptions is one way of creating readable, maintainable code when using Monix.

Finally, we need to tie all the parts together and kick-start the background processes:

def broadcast(connector: QueueConnector[Task]): Task[BroadcastResult] = {
  def processMessages(inbox: MVar[BroadcastMessage], 
                      consumers: Set[String => Task[Unit]]): Task[Unit] = // ...

  def consumeForever(inbox: MVar[BroadcastMessage]): Task[Unit] = // ...
  
  for {
    inbox <- MVar.empty[BroadcastMessage]
    f1 <- consumeForever(inbox).fork
    f2 <- processMessages(inbox, Set()).fork
  } yield BroadcastResult(inbox, f1.cancel *> f2.cancel)
}

To start the broadcast, we start two asynchronous processes: one consuming from the queue in a loop, the other processing messages. The two processes communicate through the inbox MVar.

The return type of the method consists of both the inbox — so that external clients have the possibility to subscribe new consumers, and of a task which, when run, will cancel the whole process. Note how the fact that Task is lazy allows us to simply create the description of the cancellation logic: f1.cancel *> f2.cancel (*> flat-maps the two tasks, discarding the result of the first), without fear of running the cancellation prematurely.

Cancelling f1 will invoke the bracket’s release, while cancelling f2 will cause messages to no longer be read from the inbox.

ZIO Finally, let’s see how ZIO handles errors. As expected, the implementation is quite similar to Monix, however this might be deceptive at times: there are some very important differences, especially in the cancellation model.

However, the overall structure of the solution is the same as before. We’ll be using the same two messages to communicate with the broadcast process:

sealed trait BroadcastMessage
case class Subscribe(consumer: String => IO[Nothing, Unit]) extends BroadcastMessage
case class Received(msg: String) extends BroadcastMessage

With the difference that in Subscribe, the consumer results in an IO instead of a Task or a Future. Following the same order as in the previous section, the description of how queue consumption should work looks familiar:

def consume(connector: QueueConnector[IO[Throwable, ?]], 
            inbox: IOQueue[BroadcastMessage]): IO[Throwable, Unit] = {
  
  val connect: IO[Throwable, Queue[IO[Throwable, ?]]] = IO
    .syncThrowable(logger.info("[queue-start] connecting"))
    .flatMap(_ => connector.connect)
    .map { q =>
      logger.info("[queue-start] connected")
      q
    }

  def consumeQueue(queue: Queue[IO[Throwable, ?]]): IO[Throwable, Unit] =
    IO.syncThrowable(logger.info("[queue] receiving message"))
      .flatMap(_ => queue.read())
      .flatMap(msg => inbox.offer(Received(msg)))
      .forever

  def releaseQueue(queue: Queue[IO[Throwable, ?]]): IO[Void, Unit] =
    IO.syncThrowable(logger.info("[queue-stop] closing"))
      .flatMap(_ => queue.close())
      .map(_ => logger.info("[queue-stop] closed"))
      .catchAll[Nothing](e => IO.now(
                  logger.info("[queue-stop] exception while closing", e)))

  connect.bracket(releaseQueue)(consumeQueue)
}

The bracket operator works the same way as in Monix (though the release-resource and use-resource arguments are reversed): it guarantees that, if the connect action succeedes, releaseQueue will be evaluated (closing an open queue connection), both when consumeQueue finishes normally or due to an error.

There are two important differences in the code, though. First of all, in consumeQueue you might notice that in the Monix version we had to explicitly mark the flatMap-chain as cancelable so that it’s possible to stop queue consumption from the outside. Here, that’s not needed: flatMap chains are by default auto-cancellable.

Secondly, the release-resource part in bracket must handle all errors: and that’s enforced through the type system, as the type of the release-resource parameter is A => IO[Void, Unit]. That’s why there’s no problem what to do in case the release action results in an error: normal errors aren’t possible (as the type states), and if the action does throw an exception (which is always possible), this is considered a programming defect and will be reported to the fiber’s supervisor and/or logged.

The broadcast process implementation corresponds directly to the Monix implementation without significant differences:

def broadcast(connector: QueueConnector[IO[Throwable, ?]]
              ): IO[Void, BroadcastResult] = {
  
  def processMessages(inbox: IOQueue[BroadcastMessage], 
                      consumers: Set[String => IO[Void, Unit]]
                      ): IO[Void, Unit] =
    inbox
      .take[Nothing]
      .flatMap {
        case Subscribe(consumer) => processMessages(inbox, consumers + consumer)
        case Received(msg) =>
          consumers
            .map(consumer => consumer(msg).fork[Nothing])
            .toList
            .sequence_
            .flatMap(_ => processMessages(inbox, consumers))
      }

  def consumeForever(inbox: IOQueue[BroadcastMessage]): IO[Void, Unit] =
    consume(connector, inbox).attempt.map {
      case Left(e) =>
        logger.info("[broadcast] exception in queue consumer, restarting", e)
      case Right(()) =>
        logger.info("[broadcast] queue consumer completed, restarting")
    }.forever

  for {
    inbox <- IOQueue.make[Void, BroadcastMessage](32)
    f1 <- consumeForever(inbox).fork
    f2 <- processMessages(inbox, Set()).fork
  } yield BroadcastResult(inbox, 
            f1.interrupt(new RuntimeException) *> 
            f2.interrupt(new RuntimeException))
}

To reiterate on the previous description: we create two processes, one which tries to connect to the queue and consume messages from it (consumerForever), restarting the whole procedure if necessary. The second one (processMessages) maintains the state — the set of current subscribers (and hence implements the Error Kernel pattern). As a result of the whole action, we return:

  • a queue to which new subscribers can be sent
  • a way to stop the whole process

Stopping the process involves, as before, interrupting the fiber which tries to connect to the queue and consumes messages from it, and another fiber which broadcasts the incoming messages.

Interruption in ZIO and cancellation in Monix

The way interruption and cancellation works in ZIO and Monix is one of their distinguishing differences, so it might make sense to compare them side-by-side.

Creating cancelable actions In Monix, cancelable actions can be created using:

  • Task.create, where the user needs to provide a Cancelable instance which should stop (or try) the asynchronous computation. Upon cancellation, this callback might run concurrently with the cancelled action
  • cancelable operator, which causes flatMap chains in the Task to become cancelable (by default they are not)

In ZIO we have:

  • IO.async0, where the user needs to provide a Canceler which will be run when the action is cancelled. The canceller might be run concurrently with the cancelled action
  • flatMap chains are cancellable by default, no need to explicitly mark them as such

Both libraries offer an uncancelable (Monix)/uninterruptibly (ZIO) operators which prevent the described action from being cancelled — even if it’s built out of cancellable operations.

In neither of the libraries atomic actions (such as a single flatMap step, or wrapped synchronous code) will be attempted to be interrupted/cancelled e.g. using Thread.interrupt.

Cancelling fibers

The way fiber cancellation/interruption tasks work is another important difference. In Monix, fibers can be interrupted by evaluating a task returned by the Fiber.cancel: Task[Unit] method. This task will complete once the cancellation is sent.

In ZIO, we have the Fiber.interrupt(t: Throwable): IO[E, Unit] method. It’s similar, as when evaluated, it will interrupt the target fiber. But it’s also different in two aspects. First, we can specify a specific interruption reason (an exception). That reason will be then reported to the any action that attempts to join the interrupted fiber, or to the fiber’s supervisor, allowing logging or restarts.

Second important difference is that the action returned by interrupt will only complete once the interruption is successful or the fiber ended. If we need the interrupt-and-forget semantics from Monix, this can be achieved by forking the fiber interruption into a fiber (.interrupt(...).fork).

What can be cancelled When can cancellation be invoked? Both Monix and ZIO provide a way to cancel/interrupt a running fiber, as described above.

Additionally, when a Monix task is run asynchronously e.g. using runAsync, it returns a CancelableFuture. That’s an extension to the regular Future which can additionaly cancel a running computation through the side-effecting cancel() method.

ZIO doesn’t have such possibilities, however the same effect might be achieved by forking the IO action to a fiber and obtaining (through the synchronous unsafePerformIO) a Fiber instance, which can then be interrupted.

Cleaning up Both Monix and ZIO have a bracket operator which works the same way: when applied to a resource-create action, it guarantees that a resource-release action will be run once the resource-use action completes successfuly, with an error or is cancelled.

ZIO also has some handy aliases, like ensuring (corresponds to finally) and bracketOnError.

Cancellation callbacks Is it possible to find out that an action has been cancelled within the action itself?

Monix has two such operators. Firstly, doOnCancel(cb: Task[Unit]) runs the given task when cancellation occurs (there’s also a counterpart which runs when the task ends normally, doOnFinish). Hence, it’s a “partial bracket”.

The second, onCancelRaiseError, causes the action to fail with the given exception, instead of becoming non-terminating on cancel. There’s no way to specify the cancellation reason from the cancelling fiber, but it’s possible to specify it in the cancelled fiber. On the other hand, in ZIO it’s only possible to specify the reason in the interrupting fiber, and the interrupted fiber is always terminating — with that exception.

Fiber supervisors ZIO has two additional mechanisms for fiber supervision which have no counterpart in Monix.

The first one are fiber supervisors. When forking an IO to a fiber it’s possible to specify a handler which will be invoked on any exceptions not handled by the fiber: fork0[E2](handler: Throwable => Infallible[Unit]). If this resembles supervisors in actors — it should!

If no supervisor is specified, a default one is used which logs the exception.

The second mechanism is the IO.supervised(t: Throwable) method which causes any fibers forked as part of evaluation of the given action to be interrupted with the given exception, once this action completes. Again, this is similar to all child actors being stopped when the parent actor is stopped, however here it’s optional, not mandatory.

We’ve seen an example of using this feature in part 2, where the worker fibers were automatically interrupted once the crawler finishes.

Summary

Is Monix or ZIO an alternative to Akka actors? Yes: state encapsulation, communication and error handling/supervision can all be implemented using Tasks or IO actions, without much effort, at the same time keeping the code readable and maintainable, in a more type-safe way.

However, Akka is not lagging behind, as there’s an alternative to “traditional” Akka actors in Akka itself: Akka typed, which is definitely a viable alternative as well.

Whichever approach we choose, as we have seen in the examples presented in the 3 parts of the series, the overall structure of solutions written using the four approaches is the same:

  • all of them use asynchronous message passing
  • all of them communicate using queues: implicit actor mailboxes or explicit queues
  • all of them use concurrently running, independent light-weight processes: actors or fibers

However, as the saying goes, the devil is in the details: the level of type-safety, the model of evaluation, supervision, cancelling and error handling differs significantly. Below is a summary of the various features that we have covered in the series (also available in textual format on Google Sheets):

1_KpWiksbgFDlrLLSOugGvVw.png

An important difference is the choice of primitives. In Akka the basic construct is an actor, while in Monix/ZIO there are Task/IO actions, which are more low-level, but hence also more flexible. Actors are a pre-defined recipe for creating an asynchronous process.

Is this a limitation of Akka? To some degree yes, however actors are also a very natural way to think about concurrent systems.

Actors which:

  • mind their own business
  • protect their internal state
  • communicate with others using fire-and-forget messages
  • form supervision hierarchies

constrain the ways in which we can define asynchronous processes, but also provide a great framework to model, understand and talk about concurrent systems, which is always a challenging task. Whatever helps in taming the inherent complexity of a distributed system is worth paying attention to.

That’s why even when describing a process using Monix or ZIO, in addition to many other „recipies” that can be defined using Tasks/IOs, the notion of an actor is still very useful and that’s why we try to emulate it.

If you’d like to further explore and experiment with the examples, or take Akka/ZIO/Monix for a testdrive using the examples as a skeleton, the code is available, together with tests, on GitHub. Just import to your favorite IDE and try to implement your use-case using the four approaches!

Summary: the short version

  • Akka: the most mature, popular solution, with a big ecosystem. However, the basic construct — actors — lack in type safety, which apart from programming errors might make it difficult to understand how communication in a system is organized.
  • Akka typed: a new approach to defining and running actors, using a type-safe, composable and lazy construct — behaviors. Communication patterns are easier to browse. Integrates well with the Akka ecosystem. Some side-effecting operations remain, such as scheduling or message sending.
  • Monix: a pretty well-established library, which provides a type-safe way to describe and manipulate lazy concurrent process using a rich set of combinators. In addition to fully encapsulating side effects, also has a module which implements reactive streaming.
  • ZIO: the newest contender which further develops the Monix approach of lazy computation descriptions by offering type-safe errors and supervision. Puts an emphasis on well-behaved interruption, fibers and resource safety constructs.

The code we write is increasingly concurrent, which creates new challenges and lifts the importance of code readability to the next level. The Scala ecosystem offers a wide range of solutions, which differ in choice of primitive constructs, programming styles and type-safety. It’s great to have a choice, though, and as the recent developments in ZIO, Monix and Akka Typed have shown, competition really drives innovation forward.

Good time to be a Scala programmer :)

Originally published on blog.softwaremill.com