We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies.

We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies. Less

We use cookies and other tracking technologies... More

Login or register
to apply for this job!

Login or register to start contributing with an article!

Login or register
to see more jobs from this company!

Login or register
to boost this post!

Show some love to the author of this blog by giving their post some rocket fuel 🚀.

Login or register to search for your ideal job!

Login or register to start working on this issue!

Engineers who find a new job through Java Works average a 15% increase in salary 🚀

Blog hero image

Scalaz 8 IO vs Akka (typed) actors vs Monix (part 1)

Adam Warski 14 July, 2018 (24 min read)

There’s a couple of hot development areas in the Scala ecosystem, and the competition between the various side-effect wrappers is one of the most interesting. We have the bifunctor IO from Scalaz 8 (which is now a standalone project, ZIO), we have cats-effect and its IO, which is a simpler version of Monix’s Task, and finally we have the good old Akka actors.

1_JlXdWI_z18lwrwdgWTOi0Q.jpeg

Some people say that the IO/Task wrappers offer a viable replacement for Akka’s actors. But is that so? Can you really replace an Actor with an IO or Task? If yes, is it practical to do so? Does it offer any advantages? Let’s find out on some concrete examples!

First of all, we have to answer the question: what are we actually comparing? Akka is much more than just actors; there’s streaming, persistence and clustering, just to name the three most popular modules. Monix is a smaller library, but apart from the core concurrency library it also provides a reactive streaming implementation.

ZIO (Scalaz 8 IO), on the other hand, as well as cats-effect, is clearly focused only on encapsulating side effects and providing a concurrency library. And that’s what we’ll be comparing: akka-actor (just the base module), monix-task and scalaz-zio. This quite radically constraints the reasonable use-cases, and naturally rules out any problems which would be best solved e.g. with a streaming solution (ZIO doesn’t have a streaming library; for a purely functional streaming implementation, see fs2).

As a side-note: you might be wondering why we’ll be using Monix instead of cats-effect; this blog might provide some hints, but the exact relationship between the two has yet to play out. As for now, cats-effect is a poorer version of what Monix’s Task offers, having very similar semantics and design — which isn’t surprising, as both are lead by the same person, Alex Nedelcu. Will the IO from cats-effect replace Monix’s Task? Will Monix be totally replaced by cats-* projects? Time will tell.

A common complaint against Akka’s actors is their lack of type safety; however, there’s an ongoing effort to address this issue with the akka-typed module, hence we’ll add akka-typed-actors to the mix.

We’ll be using the following versions of the projects:

  • Akka: 2.5.12
  • Monix: 3.0.0-RC1 (2.3.3 should mostly work as well)
  • ZIO: 0.1-SNAPSHOT (no release yet)

What’s special about actors

Before we dive into technical comparisons, let’s first write down what is so special about actors, which might hint on the use-cases which are most naturally implemented with a “raw” actor, instead of e.g. using streams.

First of all, an actor encapsulates and manages state (statless actors are considered an anti-pattern). Access to the state is guaranteed to be serialized, so that the state is always accessed and changed by a single thread. An actor provides a “safe haven” for the data it manages.

Secondly, actors define a way to communicate between concurrently running processes, via message passing. Each actor is associated with a mailbox, to which incoming messages are enqueued and processed by the actor one-by-one in a first-in-first-out fashion. In other words, there’s a queue in front of each actor.

Finally, actors provide a way to manage errors. Not all errors have to be handled inside an actor; instead, errors can (and should!) be propagated to parent actors. The parent actor might have more context and decide what’s the best course of action: re-creating the child actor, stopping it, escalating, etc. This is known as supervisor hierarchies.

Action plan

We’ll go over some use-cases for actors and see if & potentially how they can be implemented using Monix / ZIO in three installments. This part covers the basics and state encapsulation. The next part will cover communication, and the final one error handling.

The goal is to keep the disussion close to “real life”, so each example is backed by runnable code, available on GitHub, implemented using all four approaches. The crucial code snippets are embedded in the article, but to test things interactively, it’s always useful to simply browse the code.

Rate limiter

Let’s start with an example which requires protected access to some non-trivial state. The goal will be to implement a rate limiter: we want to run a side-effecting computation (e.g. sending an HTTP request) so that in any perMillis time window, at most maxRuns computations are started. (Note that we only count the start of the computation, not the when the computation finishes; due to e.g. network latencies the target system might see at times a slightly different rate of requests.)

All of the implementations will use the same data structure to store a queue of waiting computations and calculate when they can be run. The RateLimiterQueue case class includes:

  • waiting: a FIFO queue of computations waiting until the rate limit allows them to be started
  • lastTimestamp: a list of timestamps at which computations in the current time window (which is perMillis wide) have been started
    case class RateLimiterQueue[F](maxRuns: Int, perMillis: Long, 
      lastTimestamps: Queue[Long], waiting: Queue[F], scheduled: Boolean)
    

Apart from enqueueing a new request, we can compute which requests (if any) should be run given the current timestamp. The result of the def run(now: Long): (List[RateLimiterTask[F]], RateLimiterQueue[F]) method is:

  • a list of tasks, where each task can be either running a computation, or scheduling an invocation of run in the future (when the rate limit will allow computations to be started)
    sealed trait RateLimiterTask[F]
    case class Run[F](run: F) extends RateLimiterTask[F]
    case class RunAfter[F](millis: Long) extends RateLimiterTask[F]
    
  • as the RateLimiterQueue is immutable, an updated copy of the data structure, with modified lastTimestamps and waiting queues
    /**
      * Queue of rate-limited computations. The computations will be *started* so that at
      * any time, there's at most `maxRuns` in any time `perMillis` window.
      *
      * Note that this does not take into account the duration of the computations, when 
      * they end or when they reach a remote server.
      *
      * @param scheduled Is an invocation of `run` already scheduled (by returning an 
      *                  appropriate task in the previous invocation): used to prevent
      *                  scheduling too much runs; it's enough if there's only one run
      *                  scheduled at any given time.
      * @tparam F Type of computations. Should be a lazy wrapper, so that computations can
      *           be enqueued for later execution.
      */
    case class RateLimiterQueue[F](maxRuns: Int, perMillis: Long, 
                                   lastTimestamps: Queue[Long], 
                                   waiting: Queue[F], scheduled: Boolean) {
    
      /**
        * Given the timestamp, obtain a list of task which might include running a 
        * computation or scheduling a `run` invocation in the future, and an updated 
        * queue.
        */
      def run(now: Long): (List[RateLimiterTask[F]], RateLimiterQueue[F]) = {
        pruneTimestamps(now).doRun(now)
      }
    
      /**
        * Add a request to the queue. Doesn't run any pending requests.
        */
      def enqueue(f: F): RateLimiterQueue[F] = copy(waiting = waiting.enqueue(f))
    
      /**
        * Before invoking a scheduled `run`, clear the scheduled flag.
        * If needed, the next `run` invocation might include a `RunAfter` task.
        */
      def notScheduled: RateLimiterQueue[F] = copy(scheduled = false)
    
      private def doRun(now: Long): (List[RateLimiterTask[F]], RateLimiterQueue[F]) = {
        if (lastTimestamps.size < maxRuns) {
          waiting.dequeueOption match {
            case Some((io, w)) =>
              val (tasks, next) = 
                copy(lastTimestamps = lastTimestamps.enqueue(now), waiting = w).run(now)
              (Run(io) :: tasks, next)
            case None =>
              (Nil, this)
          }
        } else if (!scheduled) {
          val nextAvailableSlot = perMillis - (now - lastTimestamps.head)
          (List(RunAfter(nextAvailableSlot)), this.copy(scheduled = true))
        } else {
          (Nil, this)
        }
      }
    
      /**
        * Remove timestamps which are outside of the current time window, that is 
        * timestamps which are further from `now` than `timeMillis`.
        */
      private def pruneTimestamps(now: Long): RateLimiterQueue[F] = {
        val threshold = now - perMillis
        copy(lastTimestamps = lastTimestamps.filter(_ >= threshold))
      }
    }
    

Using Akka

First, let’s use the above RateLimiterQueue data structure to implement a rate limiter using pure, “traditional” Akka. As we are in the Akka ecosystem, computations will be represented as Future values. However, as a Future is a computation that is already running (eagerly), we have to make it lazy and make sure that it’s only constructed once the rate limiter allows it to be run.

This also requires attention from the user of the code, so that instead of passing an already created Future instance (which is a running computation), only blocks which lazily create a Future are used as arguments.

The actor will accept two messages:

  • LazyFuture— for scheduling a computation
  • ScheduledRunQueue— for scheduled invocations of run

The state of the actor consists of a single mutable variable, which holds the current rate limiter queue. Hence, even thoughRateLimiterQueue itself is immutable, the reference to the current queue is mutable, and will change over time. Since the state is encapsulated with an actor, this is a safe thing to do.

The code of the actor is quite straightforward; upon receiving a message a computation is enqueued or the scheduled flag cleared and a check for computations to run is being invoked:

private class RateLimiterActor(maxRuns: Int, per: FiniteDuration) extends Actor 
  with ActorLogging {

  import context.dispatcher

  // mutable actor state: the current rate limiter queue; the queue itself is 
  // immutable, but the reference is mutable and access to it is protected by 
  // the actor.
  private var queue = RateLimiterQueue[LazyFuture](maxRuns, per.toMillis)

  override def receive: Receive = {
    case lf: LazyFuture =>
      // enqueueing the new computation and checking if any computations can be run
      queue = queue.enqueue(lf)
      runQueue()

    case ScheduledRunQueue =>
      // clearing the `scheduled` flag, as we are in a scheduled run right now, so it's 
      // possible a new one has to be scheduled
      queue = queue.notScheduled
      runQueue()
  }

  private def runQueue(): Unit = {
    val now = System.currentTimeMillis()

    val (tasks, queue2) = queue.run(now)
    // Updating the mutable reference to store the new queue.
    queue = queue2
    // Each task returned by `queue.run` is turned into a side-effect: either running 
    // the lazy future (which amounts to running the block of code which creates the 
    // future - and hence makes the computation run), or scheduling a `ScheduledRunQueue` 
    // message to be sent to the actor after the given delay.
    tasks.foreach {
      case Run(LazyFuture(f)) => f()
      case RunAfter(millis)   => context.system.scheduler.scheduleOnce(
        millis.millis, self, ScheduledRunQueue)
    }
  }
}

Note that this “traditional” Akka actor doesn’t specify anywhere what type of messages it accepts; it’s possible to send it any message, at the risk of the message being not handled and discarded. The knowledge of what messages an actor accepts must be passed in another way, through documentation or an implied protocol.

Now that we have an actor, we still need a way to create the actor instance and schedule computations:

class AkkaRateLimiter(rateLimiterActor: ActorRef) {
  def runLimited[T](f: => Future[T])(implicit ec: ExecutionContext): Future[T] = {
    val p = Promise[T]
    val msg = LazyFuture(() => f.andThen { case r => p.complete(r) }.map(_ => ()))
    rateLimiterActor ! msg
    p.future
  }
}

object AkkaRateLimiter {
  def create(maxRuns: Int, per: FiniteDuration)(
    implicit actorSystem: ActorSystem): AkkaRateLimiter = {
    
    val rateLimiterActor = actorSystem.actorOf(
      Props(new RateLimiterActor(maxRuns, per)))
    new AkkaRateLimiter(rateLimiterActor)
  }
}

The rate limiter can be created given an ActorSystem. The created ActorRef is wrapped in an AkkaRateLimiter instance, with a convenient interface for running a rate-limited computation. runLimited uses a by-name — and hence lazily computed — Future parameter.

The result of the runLimited method has to be a (running) Future, which will be completed only once the rate limiter allows the computation to start and complete. That’s why we create an intermediate Promise; the computation that will be run by the rate limiter first runs f, and then completes p with its result (the operations are sequenced using andThen).

Using Akka Typed

If we were to write the akka-actor solution 5 years ago, the code would be more or less the same. But, things have changed since then; it’s about time to address the lack of type safety in actors.

There were several attempts to do so, but akka-typed seems to finally have a chance to mature to a stable solution.

The first crucial difference when using akka-typed is that we don’t write actor code, instead we define the actor’s behavior. The behavior is s a specification, written in Scala, of how the actor should behave — how it should handle incoming messages.

The result of message-receiving code should be new actor behavior, specifying what to do next. This takes the become construct known from “traditional” actors to a new level, making it a basic building block.

The actor’s behavior can be parametrized with the actor’s state. This way we can avoid using mutable actor state (variables) altogether, while keeping the most crucial actor feature: serializing and protecting access to the actor data. After receiving the message, the state can be changed, and new behavior — parametrized with the new state — returned.

The second difference is that each behavior has a type parameter, specifying the type of messages that the actor can handle. In our case, there are two such messages — exactly the same as before — but we need to introduce a common parent trait:

sealed trait RateLimiterMsg
case class LazyFuture(t: () => Future[Unit]) extends RateLimiterMsg
case object ScheduledRunQueue extends RateLimiterMsg

The core logic of the actor doesn’t diverge much from the “traditional” approach. The main difference is that we are calling the same behavior-creating method (rateLimit) recursively, after each handled message modifying the RateLimiterQueue and passing it to subsequent invocations:

def rateLimit(timer: TimerScheduler[RateLimiterMsg], 
  data: RateLimiterQueue[LazyFuture]): Behavior[RateLimiterMsg] =
    Behaviors.receiveMessage {
      case lf: LazyFuture    => rateLimit(timer, runQueue(timer, data.enqueue(lf)))
      case ScheduledRunQueue => rateLimit(timer, runQueue(timer, data.notScheduled))
    }

def runQueue(timer: TimerScheduler[RateLimiterMsg], 
  data: RateLimiterQueue[LazyFuture]): RateLimiterQueue[LazyFuture] = {

  val now = System.currentTimeMillis()

  val (tasks, data2) = data.run(now)
  tasks.foreach {
    case Run(LazyFuture(f)) => f()
    case RunAfter(millis)   => timer.startSingleTimer((), ScheduledRunQueue, 
                                                      millis.millis)
  }

  data2
}

As before, in the runQueue method, tasks obtained from the RateLimiterQueue are run in a side-effecting fashion: either forcing the lazy Futures or scheduling a timer. Note that the timer comes from a special behavior factory method (see Behaviors.withTimers below), and is also parametrized with the type of messages that can be sent. The timer is always bound to a specific actor and can schedule messages for this actor only.

But having a behavior is not enough; it’s just a description. We can create loads of behavior instances, and nothing will happen; we still need to create an actor:

class AkkaTypedRateLimiter(actorSystem: ActorSystem[RateLimiterMsg]) 
  extends StrictLogging {
    
  def runLimited[T](f: => Future[T])(implicit ec: ExecutionContext): Future[T] = {
    val p = Promise[T]
    actorSystem ! LazyFuture(() => f.andThen { case r => p.complete(r) }.map(_ => ()))
    p.future
  }
}

object AkkaTypedRateLimiter {
  def create(maxRuns: Int, per: FiniteDuration): AkkaTypedRateLimiter = {
    val behavior = Behaviors.withTimers[RateLimiterMsg] { timer =>
      rateLimit(timer, RateLimiterQueue(maxRuns, per.toMillis))
    }
    new AkkaTypedRateLimiter(ActorSystem(behavior, "rate-limiter"))
  }
}

To create an actor, we either need to create a new typed ActorSystem (as is the case here), or another typed actor has to spawn a (typed) child actor. This is different from traditional Akka, where an actor system could be used to create a number of actors. Here, there can only be one top-level actor.

In “real life” the actor would probably be created from a parent actor, but for demo purposes in create we are creating a new ActorSystem, passing the initial behavior and a name. Once the actor is created, we get a typed ActorRef (an ActorSystem is also an ActorRef), which — as everything — is parametrized with the type of messages that can be sent to the actor. That way, the ! (tell) method no longer accepts Any, but instead only messages which can be handled by our actor.

We are also using the same way of creating a Promise and from it a Future which will be completed once the rate-limited computation is done.

Using Monix

Let’s depart the comfortable world of actors and move to slightly less known territory, but one that is very rapidly evolving. We’ll start with Monix. The basic construct is the Task data type, which represents a description of a computation. Unlike a Future, which represents a running computation, a value of type Task doesn’t do anything by itself, in a similar fashion like creating a value of type Behavior didn’t create the actor. However, when run, a value of type Task[A] will produce a single value of type A, or an error.

The important part here is that the Task can be run multiple times (or no at all), each run will be independent and will run the side effects as described by the construction of that particular Task. A convenient analogy is to think about a Task as a lazy Future. (In the code above already had a very simple LazyFuture implementation, as a wrapper for () => Future[T]; Task is in reality much more complex and optimized for composition. But as the example shows, such a construct is useful even in the Akka-world!)

The various methods on the Task companion object and the methods on Task instances allow describing complex processes running concurrently. How to use them to create our rate-limiter example?

Note that our goal isn’t to imitate actors using Monix or ZIO. Instead, we want to solve the same problem that is being solved by the actor implementation, using the most natural approach given by a different set of tools.

We definitely need a process running in the background which:

  • accepts new computations to be run
  • queues them
  • when the rate limit allows, runs as much as possible.

Communicating using asynchronous message passing works great for concurrent processes, so let’s stick to that.

One of the tools available in Monix is MVar, which is a container for a single value. That value can be accessed through take and put operations. Taking a value empties the MVar if it’s full, and blocks/suspends (asynchronously, without blocking threads) otherwise. Putting a value works the other way round: blocks if the MVar is full, and stores a value otherwise.

This makes MVar an asynchronous blocking queue of size 1 (with simple back-pressure, as the operations are blocking), but that’s sufficient for our use case. We’ll use an MVar to communicate with a background process which manages which computations should be run and when. The messages passed through the MVar will be analogous to the ones used in the akka implementations:

sealed trait RateLimiterMsg
case object ScheduledRunQueue extends RateLimiterMsg
case class Schedule(t: Task[Unit]) extends RateLimiterMsg

Note that to pass a computation to the rate-limiter, we are using a Task instead of a Future, as we’ve moved from Akka-land to Monix-land. The Task is already lazy, it’s enough to simply store the reference passed by the user. It’s also safer: there’s no danger of prematurely running the computation by accident, as was the case with Future.

That safety property is a consequence of the often cited referential transparency, meaning (in this context) that it doesn’t matter (putting memory allocation considerations aside) if we use an already created value as a function argument, or if the value is created as part of the invocation, or if the value is created lazily when needed by the function.

The rate limiter logic is again a recursive function, parametrized with the current RateLimiterQueue instance, similar as in the akka-typed implementation. Additionally, we also have a reference to a queue: MVar[RateLimiterMsg], which will be used to communicate with the outside world (note that queue is used twice here in different meanings: once as the queue for rate limited computations encapsulated by RateLimiterQueue, and once as the 1-element queue of incoming messages which orchestrates the whole process):

private def runQueue(data: RateLimiterQueue[Task[Unit]], 
                     queue: MVar[RateLimiterMsg]): Task[Unit] = {
  queue
    // (1) take a message from the queue (or wait until one is available)
    .take
    // (2) modify the data structure accordingly
    .map {
      case ScheduledRunQueue => data.notScheduled
      case Schedule(t)       => data.enqueue(t)
    }
    // (3) run the rate limiter queue: obtain the rate-limiter-tasks to be run
    .map(_.run(System.currentTimeMillis()))
    .flatMap {
      case (tasks, d) =>
        tasks
          // (4) convert each rate-limiter-task to a Monix-Task
          .map {
            case Run(run)         => run
            case RunAfter(millis) => 
              Task.sleep(millis.millis)
                  .flatMap(_ => queue.put(ScheduledRunQueue))
          }
          // (5) fork each converted Monix-Task so that it runs in the background
          .map(_.fork)
          // (6) sequence a list of tasks which spawn background fibers
          // into one big task which, when run, will spawn all of them
          .sequence_
          .map(_ => d)
    }
    // (7) recursive call to handle the next message,
    // using the updated data structure
    .flatMap(d => runQueue(d, queue))
}

Going step-by-step: (1) first we take a message from the MVar, and (2) change our rate limiter data structure accordingly (enqueueing a new computation or clearing the scheduled flag). Then, (3) we run the rate limiter queue, obtaining a list of tasks and a new RateLimiterQueue instance.

Now we have to execute side-effects appropriate for each task (4). However, we can’t simply “execute” the side-effects — in the Monix/ZIO world that’s forbidden! Instead, once again we create a description of how these side-effects should be run — somewhere far off in the future, when the interpreter for the Task is actually run. This description is created by composing smaller descriptions into one big value, which describes the whole process.

Each rate-limiter-task is converted into a Monix-Task (it seems that name clashes are inevitable, sorry!): a rate-limiter-Run task is simply unwrapped into a Monix-Task (remember that we were storing the Task that the user submitted, which is already lazy), and a rate-limiter-RunAfter task is converted to a Monix-Task which sleeps for the given amount of time, and then puts a ScheduledRunQueue message on the queue. Each rate-limiter-task converted to a Monix-Task is then forked into a fiber (5) using Task.fork.

What’s a fiber? It’s easiest to think about it as a lightweight thread. Given a compute: Task[A], compute.fork results in a Task[Fiber[A]]: a description of a Task which, when run (and only then!):

  • will start running the compute task asynchronously, in the background
  • will return a Fiber[A] to the parent (forking) process, which can be used to wait for the background computation to complete (using the join: Task[A] method), or cancel it (using the cancel: Task[Unit] method).

Because fibers can sleep or wait for external resources, a small number of threads can be used to run a large number of fibers.

Doesn’t that sound familiar? Yes — there’s a strong similarity with actors. Actors are also lightweight processes, and a large number of actors can be run on a small number of threads. Both fibers and actors can wait without blocking the thread on which they are running. However, unlike an actor, where the process is coupled with a queue and state, in Monix these are separate: creating a lightweight process is just another combinator (fork), and state/queues can (and have to) be managed independently.

Coming back to the code: each rate-limiter-task converted to Monix-Task is forked (5) so that it runs in the background and doesn’t block the process which consumes messages from the queue. Finally, we sequence (6) the list of all such forked computations into one big computation (converting a List[Task[Fiber]] into a Task[List[Fiber]] — the same thing that Future.sequence does), forget the results (we don’t need the fiber instances here) and (7) recursively call the runQueue method, using the updated RateLimiterQueue data structure.

Keep in mind, that all that we are doing is creating process descriptions. Nothing happens yet — no recursion, forking, etc. We only get back a data structure, which describes what should happen, then the task is executed.

What about bootstrapping the whole process, and providing a nice interface for the user?

class MonixRateLimiter(queue: MVar[RateLimiterMsg], queueFiber: Fiber[Task, Unit]) {
  def runLimited[T](f: Task[T]): Task[T] = {
    for {
      mv <- MVar.empty[T]
      _ <- queue.put(Schedule(f.flatMap(mv.put)))
      r <- mv.take
    } yield r
  }
}

object MonixRateLimiter extends StrictLogging {
  def create(maxRuns: Int, per: FiniteDuration): Task[MonixRateLimiter] =
    for {
      queue <- MVar.empty[RateLimiterMsg]
      runQueueFiber <- 
        runQueue(RateLimiterQueue[Task[Unit]](maxRuns, per.toMillis), queue)
          .fork
    } yield new MonixRateLimiter(queue, runQueueFiber)
}

Creating an MVar is itself a side-effecting computation (as we need to allocate the reference), and it results in a Task. Once we have the queue instance in the create method, we can create the Task which describes running the rate-limiting process, and fork it into a background fiber.

The instance that is being returned to the user, MonixRateLimiter, upon receiving a Task to be rate-limited creates an MVar, but for another purpose — here it’s used similarly to scala.concurrent.Promise. That MVar instance (mv) will be filled with the result of the computation, once it is run by the rate limiter. Note that the task that is passed to the rate limiter contains that logic (f.flatMap(mv.put)): it is a description of a computation, which first runs f, and then puts the result into mv. The task that is being returned — mv.take — will block (again, asynchronously) until the computation is done.

However, so far all that we’ve been doing is creating descriptions. When do we actually run things? As late as possible! The general goal should be to create, using composition, larger and larger descriptions of the logic of our program.

But in the end, be it in tests or the main method of our application, we need to run the side effects. That is possible using the run* methods on Task, such as Task.runSync or Task.runAsync— which run the described computations and block, or return a Future. In our case, these methods are used in the tests.

Using ZIO

Finally, let’s move to an implementation using ZIO IO. If you look at the code, you’ll notice that it’s surprisingly similar to the Monix version. And that’s not a coincidence. While Monix has been around for some time now, the relatively recent development of ZIO brought a fresh set of ideas to the table, and the two projects started competing both on the performance and functionality side, resulting in implementations that are distinct in a couple of key areas, but overall quite similar.

Despite the scalaz origins, ZIO, as a stand-alone project, can be used both with Scalaz 7, Scalaz 8, Cats, or even without any of them. In fact in the example project, just for the fun of it, we’re using ZIO with Cats to get convenient methods such as sequence_.

The basic ideas behind an IO value is the same as in Monix: it’s a description of a computation which, when run, can yield a value or throw an error. However, there’s a significant difference: while Monix’s Task takes a single type parameter, here we have two: IO[E, A]. The first type parameter specifies the types of errors, which the computation can return. The second — the type of the value that will be produced. If that sounds similar to an Either — it is: it’s as if you stacked Either over a Task, but without the performance and memory penalties. A blog by John de Goes explores this topic further.

Of course, it’s not possible to guarantee that the code inside an IO won’t throw an arbitrary exception — but the idea behind the design of IO is to divide errors into recoverable errors — the ones that are expected, and which are of type E, and programming defects or catastrophic errors, which are not recoverable, and are caused either by a programming error or a VM problem, such as running out of memory. Again, there’s a John de Goes blog diving into that design.

This gives us additional possibility to enhance type safety. Not only we can describe what type of value will be produced, but also which errors are expected, but not handled. For example, a value of type IO[Nothing, A] specifies that this is a description of a computation which, when run, can’t result in errors, and will produce a single A value. There is no way to convey this information through a Task or Future.

Otherwise, for the purposes of this example, the concepts are analogous as in Monix. In ZIO we can also use the fork method to run a computation in the background, obtaining a Fiber (which also has two type parameters). A minor annoyance is that sometimes the type inferencer can’t infer the correct error type and it needs to be given explicitly — especially for methods which are polymorphic in the error type (e.g. IO.sleep can produce an IO which can result in any type of error). Scala is even more reluctant to infer Nothing, causing some additional pain when this “no-error” type is used.

Some of the method names are different, but otherwise the implementation of runQueue is almost the same:

private def runQueue(data: RateLimiterQueue[IO[Nothing, Unit]], 
                     queue: IOQueue[RateLimiterMsg]): IO[Nothing, Unit] = {
  queue
    // (1) take a message from the queue (or wait until one is available)
    .take
    // (2) modify the data structure accordingly
    .map {
      case ScheduledRunQueue => data.notScheduled
      case Schedule(t)       => data.enqueue(t)
    }
    // (3) run the rate limiter queue: obtain the rate-limiter-tasks to be run
    .map(_.run(System.currentTimeMillis()))
    .flatMap {
      case (tasks, d) =>
        tasks
          // (4) convert each rate-limiter-task to an IO
          .map {
            case Run(run)         => run
            case RunAfter(millis) => 
              IO.sleep[Nothing](millis.millis)
                .flatMap(_ => queue.offer(ScheduledRunQueue))
          }
          // (5) fork each converted IO so that it runs in the background
          .map(_.fork[Nothing])
          // (6) sequence a list of IOs which spawn background fibers
          // into one big IO which, when run, will spawn all of them
          .sequence_
          .map(_ => d)
    }
    // (7) recursive call to handle the next message,
    // using the updated data structure
    .flatMap(d => runQueue(d, queue))
}

The thing that is different is that there’s no construct analogous to MVar in ZIO. Instead, to communicate with the outside world, there’s a back-pressured IOQueue, which is a bounded queue backed by IO. We can enqueue messages using the offer method (for example, above we enqueue ScheduledRunQueue instances) and dequeue using the take method.

What about creating the rate limiter and providing the user with an interface? Again, things are quite similar as before:

class ZioRateLimiter(queue: IOQueue[RateLimiterMsg], 
                     runQueueFiber: Fiber[Nothing, Unit]) {
  def runLimited[E, T](f: IO[E, T]): IO[E, T] = {
    for {
      p <- Promise.make[E, T]
      toRun = f.flatMap(p.complete).catchAll[Nothing](p.error).fork[Nothing].toUnit
      _ <- queue.offer[E](Schedule(toRun))
      r <- p.get
    } yield r
  }
}

object ZioRateLimiter extends StrictLogging {
  def create(maxRuns: Int, per: FiniteDuration): IO[Nothing, IOEffectRateLimiter] =
    for {
      queue <- IOQueue.make[Nothing, RateLimiterMsg](32)
      runQueueFiber <- runQueue(RateLimiterQueue(maxRuns, per.toMillis), queue)
        .fork
    } yield new ZioRateLimiter(queue, runQueueFiber)
}

One important difference is again related to the absence of MVar. Instead, we have at our disposal a (ZIO) Promise, a concept known from the scala.concurrent.Future-world, but this time backed by IO. Again, as all of the computations are lazy, we can create a description of a computation which will first run the given IO (f: IO[E, T]), and then complete the p: Promise using it. Once that is ready, we enqueue a Schedule message, which will then be taken off the queue by the background process.

Another difference comparing to the Monix version is that we have to make a hard choice and decide what the capacity of the queue going to be. This can be as low as 1 (effectively making the IOQueue an MVar), or a higher value, if we suspect that this might improve the performance.

At “the end of the world” — which might be our main method or tests — we’ll need to run the computations executing the side-effects. There are no methods on IO directly, but instead we need to create an object which mixes in the RTS trait. This gives us access to def unsafePerformIO[E, A](io: IO[E, A]): A and unsafePerformIOAsync, and a couple of other variants.

John De Goes, the author of ZIO, also did a more high-level talk on how IO can replace Akka’s actors during Scalar 2018. You’ll probably notice that in his example, when modelling an actor, he manages the actor’s internal state explicitly, using an IORef. While that is a possibility, here we don’t even need that, as it’s easier to just use recursion.

akka-monix-zio so far

If you take a look at the four implementations of the rate limiter, the code isn’t THAT different. All of them use the same two messages to communicate with the background process which runs the concurrent computation: one for scheduling a computation (LazyFuture or Scheduled), the other for running computations after some time (ScheduledRunQueue). And more importantly, all of the variants use asynchronous message passing as the primary means of communication in the asynchronous world.

The general structure is the same: all of the implementations read from the queue, modify the internal state (represented using RateLimiterQueue), and act upon the generated rate-limiter-tasks.

However, there are crucial differences as to the level of type safety; from the implementation that provides the least type safety, to the most type safe one:

  • “traditional” Akka gives very limited type safety: the types of messages are not checked. Running rate limited computations and scheduling messages to be sent are side-effecting operations.
  • Akka Typed: radically improved type safety, all types of messages are checked. Still, running rate limited computations and scheduling messages is side-effecting.
  • Monix: communication channels are typed. Running rate limited computations and scheduling messages is part of the program description, no side-effecting computations.
  • ZIO: communication channels are fully typed, both in the possible errors and the values accepted. As before, computations and scheduling is part of the program description.

It’s also worth noting that only the “traditional” Akka implementation uses mutable state to store the actor’s internal data. This can lead to subtle bugs which are hard to spot, such as accessing the mutable state from Future callbacks, or even worse, mutating it from other threads. All of the other implementations are free from this problem, as they recursively invoke the Behavior/Task/IO-creating methods using modified, immutable data structures.

Another important difference is how concurrent processes are started. In Akka, in order to start a computation in the background, either an actor (which is always bound to a mailbox — a queue of incoming messages) or a Future has to be created. In Monix / ZIO, there’s only one way: specifying that a computation should run asynchronously using fork. The forking operation closely resembles a lazy version of Future creation, however thanks to some additional tools, such as MVars, IOQueue and Promises, can be used to easily model actor-style communication with the outside world.

In a sense, Akka comes with a pre-defined recipe for defining an asynchronous computation. An actor is nothing else than an asynchronous process, reading in a loop from its mailbox — a queue of messages — and running message-handling logic. Additionally, it maintains internal state; that state can be explicitly mutated by the user (as in the “traditional” example), or implicitly by the library (as is the case in the “typed” example — the state here is the current behavior). This recipe is common and important enough to become an abstraction on its own.

With Monix / ZIO, you don’t get such a pre-defined recipe, but it’s straighforward to implement it yourself: using MVars or IOQueues, as this simple example has shown.

Finally, there’s the difference in execution models: eager Futures vs lazy Tasks/IOs influences heavily how you write code. Let’s take for example the fork operation. Notice that when we define the queue-consuming logic (runQueue), the fork method is applied to the result of this method. If we wrote the same logic using eager Futures, instead of lazy Tasks, the fork would never get executed — as there’s infinite recursion happening in the runQueue method body (the last step is a recursive call). However, if we separate the definition of a process from its execution, we get many more options: for example, we can define how the computation should run — here asynchronously, in the background.

That possibility to manipulate the descriptions of computations, which are just ordinary Scala values, very often simply built out of case classes (yes, these are free monads over a base instruction set for encapsulating side-effects and concurrency) is the defining characteristic of this style of programming. The interpreter — invoked using runSync or unsafePerformIO — has then full information and can run the computation as specified.

See you soon

For this simple use-case, Monix / ZIO is definitely a viable alternative for the “traditional” and “typed” Akka flavors. Stay tuned for the next installment, where we’ll explore a use case for which actors were made for, communication!

Originally published on blog.softwaremill.com