We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies.

We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies. Less

We use cookies and other tracking technologies... More

Login or register
to apply for this job!

Login or register to start contributing with an article!

Login or register
to see more jobs from this company!

Login or register
to boost this post!

Show some love to the author of this blog by giving their post some rocket fuel 🚀.

Login or register to search for your ideal job!

Login or register to start working on this issue!

Engineers who find a new job through Java Works average a 15% increase in salary 🚀

Blog hero image

Akka vs ZIO vs Monix, part 2: communication

Adam Warski 24 July, 2018 (17 min read)

In part 1, we’ve explored how to implement a process which manages some non-trivial state using Akka, Akka Typed, Monix and ZIO. However, as a popular saying by Carl Hewitt goes, “one actor is no actor, they come in systems”. Hence, let’s explore examples which use multiple communicating actors, and see if it’s still possible and practical to implement them using ZIO or Monix.

1_tJDDOOwUafY9WaVWbNThZw.png

Crawler

Our first example will be an implementation of the popular master-worker pattern, where we have a single master process distributing work to a number of worker processes. When a worker finishes a work unit, it sends the results to the master process, which gathers them and includes in the overall computation results.

More concretely, the task will be to create a web crawler. Starting from a given URL, it should traverse all the links, counting which hosts are most popular. HTTP requests should be executed in parallel, however there’s one additional restriction: we don’t want to be suspicious in our crawling efforts, so we impose a restriction that at any given time, at most one request to any host should be executed (but requests to different hosts can be done in parallel).

As we are interested in the way a process is defined, not in the actual crawling, we’ll use a stub HTTP service, along with stub functions which extract interesting links (which we want to crawl) from the site’s content:

type Host = String
case class Url(host: Host, path: String)

trait Http[F[_]] {
  def get(url: Url): F[String]
}

type LinkParser = String => List[Url]

Traditional Akka

Like in part 1, let’s start with a “traditional” Akka solution, and then move to other implementations. Again, only crucial snippets will be included, but the full source code is available on GitHub.

We’ll have to define two actors: Crawler and Worker. To construct a Crawler actor, we need an interface for executing HTTP requests (http: Http[Future], as we are in Akka-land and everything is Future-based), a way to parse links (parseLinks), and a Promise waiting to be completed with the final result (once all pages have been crawled; we’re assuming that it’s a finite process, and parseLinks gives only “interesting” links, for example from a set of “interesting” hosts):

class Crawler(http: Http[Future], 
              parseLinks: String => List[Url], 
              result: Promise[Map[Host, Int]]) extends Actor {
  
  var referenceCount = Map[Host, Int]()
  var visitedLinks = Set[Url]()
  var inProgress = Set[Url]()
  var workers = Map[Host, ActorRef]()
  
  // ...
}

The internal state of the actor consists of:

  • referenceCount — the current host popularity
  • visitedLinks — which URLs have already been processed or are processed, to avoid processing them once again
  • inProgress — set of URLs which are currently processed. Once this becomes empty (after starting the process), the crawling is done and the result promise can be completed with referenceCount
  • workers — each host will have a dedicated worker actor, which will ensure that at most one request is done for each host at any time

There are two messages that the crawler actor can receive:

sealed trait CrawlerMessage
/**
  * Start the crawling process for the given URL. Should be sent only once.
  */
case class Start(url: Url) extends CrawlerMessage
case class CrawlResult(url: Url, links: List[Url]) extends CrawlerMessage

The Start message should be sent only once, to kickstart the whole process. CrawlResult messages will be sent by worker actors, once they have completed crawling the given URL and parsing the links.

Let’s start by looking at the crawlUrl method in the actor:

private def crawlUrl(url: Url): Unit = {
  if (!visitedLinks.contains(url)) {
    visitedLinks += url
    inProgress += url
    actorFor(url.host) ! Crawl(url)
  }
}

private def actorFor(host: Host): ActorRef = {
  workers.get(host) match {
    case None =>
      val workerActor = context.actorOf(Props(new Worker(http, parseLinks, self)))
      workers += host -> workerActor
      workerActor

    case Some(ar) => ar
  }
}

The method checks if the URL has already been visited; if not, the visitedLinks and inProgress structures are updated. We create or lookup a worker actor using actorFor, and tell it to Crawl the given address.

Notice that when creating a new worker, we’re passing the self: ActorRef reference so that the worker can send messages back to the crawler.

As we mentioned before, the actor can receive two types of messages:

override def receive: Receive = {
  case Start(start) =>
    crawlUrl(start)

  case CrawlResult(url, links) =>
    inProgress -= url

    links.foreach { link =>
      crawlUrl(link)
      referenceCount = referenceCount.updated(link.host, 
                                              referenceCount.getOrElse(link.host, 0) + 1)
    }

    if (inProgress.isEmpty) {
      result.success(referenceCount)
      context.stop(self)
    }
}

The worker actors are expected to reply to the crawler actor with the CrawlResult method. Once this message is received, again the inProgress and referenceCount structures are updated, and all the linked URLs crawled. If at the end there‘s nothing being crawled — we are done!

The worker isn’t complicated as well. It’s parametrised with a reference (ActorRef) to the master actor, which allows sending back messages, as well as the Http[Future] interface and a way to parse links:

class Worker(http: Http[Future], 
             parseLinks: String => List[Url], 
             master: ActorRef) extends Actor with ActorLogging {
  
  var urlsPending: Vector[Url] = Vector.empty
  var getInProgress = false
  
  // ...
}

The internal state of the actor consists of a list of URLs that should be crawled (urlsPending), and a flag indicating if there’s a request in progress (getInProgress). This is needed to ensure that there’s at most one request to a given domain executing at any time.

There are also two messages which the worker will receive:

sealed trait WorkerMessage
case class Crawl(url: Url) extends WorkerMessage
case class HttpGetResult(url: Url, result: Try[String]) extends WorkerMessage

The first one is sent, as we’ve seen, by the crawler actor:

override def receive: Receive = {
  case Crawl(url) =>
    urlsPending = urlsPending :+ url
    startHttpGetIfPossible()
    
  // ...
}

private def startHttpGetIfPossible(): Unit = {
  urlsPending match {
    case url +: tail if !getInProgress =>
      getInProgress = true
      urlsPending = tail

      import context.dispatcher
      http.get(url).onComplete(r => self ! HttpGetResult(url, r))

    case _ =>
  }
}

Once we get a new URL to crawl, we add it to the list of pending requests (urlsPending). If possible — that is, if there are no requests in progress — in the startHttpGetIfPossible method we start executing a new HTTP request. Once this completes, we send a HttpGetResult message to ourselves (the worker actor). Note that this is an asynchronous operation, and you always have to be cautious not to access or mutate the actor’s state from within such callbacks.

override def receive: Receive = {
  case Crawl(url) =>
    // ...

  case HttpGetResult(url, Success(body)) =>
    getInProgress = false
    startHttpGetIfPossible()

    val links = parseLinks(body)
    master ! CrawlResult(url, links)

  case HttpGetResult(url, Failure(e)) =>
    getInProgress = false
    startHttpGetIfPossible()

    log.error(s"Cannot get contents of $url", e)
    master ! CrawlResult(url, Nil)
}

Once the worker actor receives the HttpGetResult message, it sends a notification to the master with the results (CrawlResult), and starts another request, if there’s one pending.

Overall, it’s not a complicated process, but there’s some communication happening: both between the master and the worker, and the other way round. There are tests for the implementation (see AkkaCrawlerTest), which verify that indeed we get correct answers.

Akka Typed

With Akka Typed, instead of writing actors directly, we’ll be defining actor behaviors. The messages sent between the actors/behaviors will be exactly the same, however we’ll additionaly encapsulate the whole state in a case class:

case class CrawlerData(referenceCount: Map[Host, Int],
                       visitedLinks: Set[Url],
                       inProgress: Set[Url],
                       workers: Map[Host, ActorRef[WorkerMessage]])

The behaviors are parametrised with an interface for executing HTTP requests, a function to parse the links and an actor to which the reply with the results should be sent once available (this used to be a Promise in the previous example, but this way is more natural here):

class Crawler(http: Http[Future], 
              parseLinks: String => List[Url], 
              reportTo: ActorRef[Map[Host, Int]]) {
  def crawlerBehavior: Behavior[CrawlerMessage] = ???
}

The crawler behavior that we’ll define will use the actor’s context, so we’ll wrap the method which define the message-processing behavior with a factory method which obtains the context. The context, as everything in Akka Typed, is parameterised with the type of the messages that the actor handles. That’s needed for example to obtain a well-typed self actor reference, which needs to know what kind of messages it accepts:

def crawlerBehavior: Behavior[CrawlerMessage] = 
  Behaviors.setup[CrawlerMessage] { ctx =>
    def receive(data: CrawlerData): Behavior[CrawlerMessage] = ???
    def crawlUrl(data: CrawlerData, url: Url): CrawlerData = ???
    def workerFor(data: CrawlerData, 
                  host: Host): (CrawlerData, ActorRef[WorkerMessage]) = ???
  }

Let’s start from the end, with the method for looking up a worker actor for a given host. Since in this implementation we’re not using mutable state, but instead returning modified actor behaviors which wrap the state (CrawlerData), all methods will:

  • as a parameter, take a CrawlerData instance
  • return the modified CrawlerData as part of the return type
    def workerFor(data: CrawlerData, host: Host): (CrawlerData, ActorRef[WorkerMessage]) = {
      data.workers.get(host) match {
        case None =>
          val workerActor = ctx.spawn(workerBehavior(ctx.self), s"worker-$host")
          (data.copy(workers = data.workers + (host -> workerActor)), workerActor)
    
        case Some(ar) => (data, ar)
      }
    }
    

If there’s no worker for the given domain yet, we’re spawning a new child actor, using the worker behavior, and returning an updated actor state, together with the created actor reference.

As the type of the behavior which we are passing to spawn is Behavior[WorkerMessage], the result of this method will be ActorRef[WorkerMessage].

The crawl and receive method are quite similar to the “traditional” Akka implementation, with the significant difference being that we need to thread through the modified actor state — sometimes there’s a couple of modifications, hence we get a chain of data, data2, data3 references:

def receive(data: CrawlerData): Behavior[CrawlerMessage] = Behaviors.receiveMessage {
  case Start(start) =>
    receive(crawlUrl(data, start))

  case CrawlResult(url, links) =>
    val data2 = data.copy(inProgress = data.inProgress - url)

    val data3 = links.foldLeft(data2) {
      case (d, link) =>
        val d2 = d.copy(referenceCount = d.referenceCount.updated(
          link.host, d.referenceCount.getOrElse(link.host, 0) + 1))
        crawlUrl(d2, link)
    }

    if (data3.inProgress.isEmpty) {
      reportTo ! data3.referenceCount
      Behavior.stopped
    } else {
      receive(data3)
    }
}

def crawlUrl(data: CrawlerData, url: Url): CrawlerData = {
  if (!data.visitedLinks.contains(url)) {
    val (data2, worker) = workerFor(data, url.host)
    worker ! Crawl(url)
    data2.copy(
      visitedLinks = data.visitedLinks + url,
      inProgress = data.inProgress + url
    )
  } else data
}

Communication in both Akka variants looks the same: we use the ! (tell) method to send a message to a (typed) actor. Don’t be mistaken, though: here everything is well-typed. You won’t be able to send a message of an incorrect type to an actor.

The worker behavior also corresponds closely to what we’ve seen before, again with the exception that we’re not using mutable state (and hence there’s no possibility of accidentaly modifying it within callbacks):

def workerBehavior(master: ActorRef[CrawlResult]): Behavior[WorkerMessage] = 
  Behaviors.setup[WorkerMessage] { ctx =>
    
  def receive(urlsPending: Vector[Url], getInProgress: Boolean): Behavior[WorkerMessage] =
    Behaviors.receiveMessage {
      case Crawl(url) =>
        startHttpGetIfPossible(urlsPending :+ url, getInProgress)

      case HttpGetResult(url, Success(body)) =>
        val links = parseLinks(body)
        master ! CrawlResult(url, links)

        startHttpGetIfPossible(urlsPending, getInProgress = false)

      case HttpGetResult(url, Failure(e)) =>
        ctx.log.error(s"Cannot get contents of $url", e)
        master ! CrawlResult(url, Nil)

        startHttpGetIfPossible(urlsPending, getInProgress = false)
    }

  def startHttpGetIfPossible(urlsPending: Vector[Url], 
                             getInProgress: Boolean): Behavior[WorkerMessage] =
    urlsPending match {
      case url +: tail if !getInProgress =>
        import ctx.executionContext
        http.get(url).onComplete(r => ctx.self ! HttpGetResult(url, r))

        receive(tail, getInProgress = true)

      case _ =>
        receive(urlsPending, getInProgress)
    }

  receive(Vector.empty, getInProgress = false)
}

ZIO

Once again, let’s leave the eager scala.concurrent.Future world, and venture into the lazy land of IO. In the example from the previous article we’ve been using an IOQueue to communicate with the process from an outside world. Here, we’ll be using multiple IOQueues.

The Crawler process will also use a CrawlerData case class for storing the current state, but instead of a map from the domain to the worker’s ActorRef, it will contain an IOQueue:

case class CrawlerData(referenceCount: Map[Host, Int], 
                       visitedLinks: Set[Url], 
                       inProgress: Set[Url], 
                       workers: Map[Host, IOQueue[Url]])

Instead of actor classes, we’ll be defining methods, which will return IO instances: descriptions of how to compute the host popularity counts. The method will take an Http[IO] interface, but this time when executing the request, we won’t get a Future[String], but as we’re in ZIO-world, an IO[String]. That is, we’ll get back a description of how to execute a GET request to the given address:

def crawl(crawlUrl: Url, 
          http: Http[IO[Throwable, ?]],
          parseLinks: String => List[Url]): IO[Nothing, Map[Host, Int]] {
          
  def crawler(crawlerQueue: IOQueue[CrawlerMessage], 
              data: CrawlerData): IO[Nothing, Map[Host, Int]] = // ...
  
  def worker(workerQueue: IOQueue[Url], 
             crawlerQueue: IOQueue[CrawlerMessage]
            ): IO[Nothing, Fiber[Nothing, Unit]] = // ...
          
  // ...
}

In Akka Typed we had to define two behaviors for the crawler and the worker, here we’ll be defining two process descriptions. The first one, the crawler, contains the same parts as in the previous implementation:

def crawler(crawlerQueue: IOQueue[CrawlerMessage], 
            data: CrawlerData): IO[Nothing, Map[Host, Int]] = {
  
  def handleMessage(msg: CrawlerMessage, 
                    data: CrawlerData): IO[Nothing, CrawlerData] = ???

  def crawlUrl(data: CrawlerData, 
               url: Url): IO[Nothing, CrawlerData] = ???

  def workerFor(data: CrawlerData, 
                url: Host): IO[Nothing, (CrawlerData, IOQueue[Url])] = ???
  
  ???
}

Let’s again start from the bottom, with the description of how to obtain a worker for a given host. Even though we’ve travelled from Akka to Scalaz, we still need a way to ensure that there’s at most one request to a given host done at any given time. A separate asynchronous process which makes sure that’s the case is a good fit:

def workerFor(data: CrawlerData, 
              host: Host): IO[Nothing, (CrawlerData, IOQueue[Url])] = {
  
  data.workers.get(host) match {
    case None =>
      for {
        workerQueue <- IOQueue.make[Nothing, Url](32)
        _ <- worker(workerQueue, crawlerQueue)
      } yield {
        (data.copy(workers = data.workers + (url -> workerQueue)), workerQueue)
      }
    case Some(queue) => IO.now((data, queue))
  }
}

Here of course we also don’t have any mutable state, so we need to take in the CrawlerData as a parameter, and return an updated copy. If there’s no worker for a given address yet, we first create a (bounded) queue which will be used to communicate with that worker, then create the worker process (we’ll get to the definition of worker soon), and finally store the queue in our data structure. Again, that is not that different from the Akka Typed implementation.

The crawlUrl method should look familiar as well:

def crawlUrl(data: CrawlerData, url: Url): IO[Nothing, CrawlerData] = {
  if (!data.visitedLinks.contains(url)) {
    workerFor(data, url.host).flatMap {
      case (data2, workerQueue) =>
        workerQueue.offer(url).map { _ =>
          data2.copy(
            visitedLinks = data.visitedLinks + url,
            inProgress = data.inProgress + url
          )
        }
    }
  } else IO.now(data)
}

The major difference is that sending a message to a worker isn’t a side effecting operation as before. Instead, we use the workerQueue.offer method, which returns a description of how to send a message to the queue. We need to combine this description with the overall description of how our code should run, or it will never be executed. Hence the need for the flatMap/map.

The handleMessage method corresponds to receive from the Akka Typed implementation and should return the crawler data modified after handling a single, given message:

def handleMessage(msg: CrawlerMessage, 
                  data: CrawlerData
                 ): IO[Nothing, CrawlerData] = msg match {
  case Start(url) =>
    crawlUrl(data, url)

  case CrawlResult(url, links) =>
    val data2 = data.copy(inProgress = data.inProgress - url)

    links.foldM(data2) {
      case (d, link) =>
        val d2 = d.copy(referenceCount = d.referenceCount.updated(
          link.host, d.referenceCount.getOrElse(link.host, 0) + 1))
        crawlUrl(d2, link)
    }
}

While before when handling the CrawlResult message we did a simple foldLeft on the resulting links, updating the data structure and running the side-effecting crawlUrl method, here we need to combine all the IOs returned by every crwalUrl invocation into one big description. That’s what the foldlM method does: def foldlM[G[_], B](z: B)(f: B => A => G[B])(implicit M: Monad[G]): G[B], giving us the final IO[CrawlerData] which composes all side-effects into a single description.

But, that’s not the end! We have helper methods to handle the messages, but what about the main loop? Unlike in an actor, which as we’ve summarized before, is a pre-defined recipe for an asynchronous process reading messages from its inbox in a loop, here we need to create the loop by hand:

crawlerQueue.take.flatMap { msg =>
  handleMessage(msg, data).flatMap { data2 =>
    if (data2.inProgress.isEmpty) {
      IO.now(data2.referenceCount)
    } else {
      crawler(crawlerQueue, data2)
    }
  }
}

The loop takes the form of recursive invocations of the main crawler method, with updated queue data. Unless of course, there are no more requests in progress: then we simply return the result.

Having the crawler ready, let’s look at the worker process. It can in fact be simpler than in the Akka implementations. The key observation is that we are in full control over when we take a new message from the queue. An actor has the mailbox-read-loop baked-in, we cannot wait with receiving the next message until some condition is satisified (it is possible to stash messages, but that requires additional logic). Here, however, we have that possibility.

Hence the worker, after getting a new request to crawl an URL from a queue, can simply execute the request and only take the next URL after the request completes:

def worker(workerQueue: IOQueue[Url], 
           crawlerQueue: IOQueue[CrawlerMessage]
          ): IO[Nothing, Fiber[Nothing, Unit]] = {
           
  def handleUrl(url: Url): IO[Nothing, Unit] = {
    http
      .get(url)
      .attempt[Nothing]
      .map {
        case Left(t) =>
          logger.error(s"Cannot get contents of $url", t)
          List.empty[Url]
        case Right(b) => parseLinks(b)
      }
      .flatMap(r => crawlerQueue.offer(CrawlResult(url, r))
                                .fork[Nothing].toUnit)
  }

  workerQueue
    .take[Nothing]
    .flatMap(handleUrl)
    .forever
    .fork
}

The worker process is an infinite loop (created with forever), which takes a message from the queue and handles it. It is also forked into a fiber, so that it runs asynchronously in the background. The fiber instance is returned, but it’s never used by the crawler process.

There’s a very important detail here, however. Notice that when we send the crawl result to the message queue, we fork the operation into a fiber (crawlerQueue.offer(…).fork). Why is that?

Recall that unlike the mailboxes of actors, the IOQueue that we are using in ZIO is bounded, and when the queue is full, the offer operation blocks. That’s good on one side — it gives a bound on memory usage, and also provides back-pressure. However, it can also lead to deadlocks.

In our example, imagine that there’s a lot of links from one page to a single host (but different paths), so we’ll be sending a lot of messages from the crawler process to a single worker process. If the number of links (URLs) is higher than the queue capacity, then at some point the crawler will become blocked and won’t be able to send any more URLs — as the queue will be full. The worker will slowly work through the requests, replying with results and processing messages from its queue — but it can get immediately filled up with new Crawl messages.

If the total number of URLs sent from the crawler to a one worker during a single crawlUrl invocation exceeds the combined capacities of the crawler and worker queues, at some point the crawler’s queue will fill up as well — as the crawler will be still sending Crawl messages, and won’t get a chance to process the CrawlResult messages it receives; now the worker will block as well. Hence the deadlock.

However, if we send the replies in the background — in a background fiber, the worker will be able to continue working through the Crawl requests. All of the spawned offer(CrawlResult(...))-fibers might wait blocking, until the crawler finishes enqueueing all Crawl requests, but that’s not a problem.

EDIT: as pointed out by John De Goes, another way to solve the problem is to use an unbounded queue with IOQueue.make(Int.MaxValue). That way with ZIO we have a choice between bounded queues — which require more caution — and unbounded queues.

That way our memory usage is still bounded (by the total size of the queues), and we won’t get a deadlock, however we need to carefully design the way the processes interact to avoid that situation.

If the processes form a hierarchy — as here, there’s a parent proces (crawler) and a number of children processes (worker), a good rule might be to directly send messages only from parent processes to child processes (down the hierarchy tree). Any replies — going up the hierarchy tree — should be sent in the background, using a forked fiber.

Finally, we need to bootstrap the whole process: create the queue to communicate with the crawler, enqueue the initial message, and create the IO which describes the crawling process:

val crawl = for {
  crawlerQueue <- IOQueue.make[Nothing, CrawlerMessage](32)
  _ <- crawlerQueue.offer[Nothing](Start(crawlUrl))
  r <- crawler(crawlerQueue, CrawlerData(Map(), Set(), Set(), Map()))
} yield r

IO.supervise(crawl, new RuntimeException)

There’s one small but important feature here: the IO.supervise call which wraps the whole process. What this method does is instruct the interpreter that when the wrapped computation completes (crawl), all fibers created by it should be interrupted (and terminated). And that’s exactly what we want: any forked worker fibers should be terminated once we have the final result, as they won’t be ever used.

This closely resembles a hierarchy of actors in Akka: once a parent actor is stopped, all child actors are stopped as well. In ZIO it’s not the default, but the option is there. When defining a computation which spawns multiple fibers, it’s very handy not to have to worry about the cleanup, but delegate the task to supervise.

Monix

Finally, let’s move to Monix. As we noted in the previous installment of the series, Monix and ZIO solutions are very closely related. Here the situation is the same. There are two important differences however.

First of all, we cannot use MVars (which behave like bounded queues of size 1) to communicate between the crawler and the worker. As putting a value to a full Mvar is a blocking operation it could very quickly lead to a deadlock (as described above).

That’s why we need a proper queue. Monix does have an unbounded async queue implementation, monix.execution.misc.AsyncQueue, but it’s Future-based, so we’ll create a thin Task-wrapper around it:

class MQueue[T](q: AsyncQueue[T]) {
  def take: Task[T] = {
    Task.deferFuture(q.poll())
  }
  def offer(t: T): Task[Unit] = {
    Task.eval(q.offer(t))
  }
}
object MQueue {
  def make[T]: MQueue[T] = new MQueue(AsyncQueue.empty)
}

The interface to our MQueue is the same as to Scalaz’s IOQueue, but with an important difference: IOQueue is bounded, and when the queue is full, IOQueue.offer will (asynchronously) block. Here we have an unbounded queue, which corresponds to unbounded actor mailboxes in Akka. Hence, we won’t have problems with deadlocks (but we also don’t get a bound on memory usage).

The second difference is that there’s no construct analogous to IO.supervise in Monix, so we have to manage fibers manually. That means that we are storing the fibers in the CrawlerData data structure, next to the worker queues:

case class WorkerData(queue: MQueue[Url], 
                      fiber: Fiber[Unit])
case class CrawlerData(referenceCount: Map[Host, Int], 
                       visitedLinks: Set[Url], 
                       inProgress: Set[Url], 
                       workers: Map[Host, WorkerData])

When a new worker process is created, we have to store the fiber on which it is running:

def workerFor(data: CrawlerData, url: Host): Task[(CrawlerData, MQueue[Url])] = {
  data.workers.get(url) match {
    case None =>
      val workerQueue = MQueue.make[Url]
      worker(workerQueue, crawlerQueue).map { workerFiber =>
        val workerData = WorkerData(workerQueue, workerFiber)
        val data2 = data.copy(workers = data.workers + (url -> workerData))
        (data2, workerQueue)
      }
    case Some(wd) => Task.now((data, wd.queue))
  }
}

And once the computation is done, all fibers need to be cancelled. This manual fiber management complicates slightly the Task construction when we know that we are done with the crawling and want to return the result:

crawlerQueue.take.flatMap { msg =>
  handleMessage(msg, data).flatMap { data2 =>
    if (data2.inProgress.isEmpty) {
      data2.workers.values.map(_.fiber.cancel).toList.sequence_
        .map(_ => data2.referenceCount)
    } else {
      crawler(crawlerQueue, data2)
    }
  }
}

The data2.workers.values.map(_.fiber.cancel).toList.sequence_ creates a Task description which cancels all the fibers (Fiber.cancel: Task[Unit]) in sequence, and then returns the final result.

Otherwise the code is very similar to the ZIO implementation. Here’s the full source for you to browse.

Both the Scalaz and Monix implementations come with tests which simulate deep and wide chains of crawled links. This way we can verify that the solutions are not only correct, but also stack-safe.

Sockets example

The repository also contains another example, called sockets. It shows how to deal with two common problems:

  1. interfacing with a legacy, blocking API. Here, we have a server socket (Socket) with a blocking and exception-throwing accept method, and client sockets (ConnectedSocket) with blocking send/receive methods.
  2. broadcasting messages to a large number of clients. This is a common requirement e.g. when dealing with websockets

All examples use several processes:

  • the router process (Actor/Behavior/Task/IO) manages the server socket and broadcasts messages received from any connected client sockets to all other connected client sockets
  • the socket process accepts new client connections, which result in new instances of a ConnectedSocket
  • the client send/receive processes are created for each client ConnectedSocket and send message or listen for new ones

If at any time a SocketTerminatedException is thrown by a client socket send/receive operation, the client socket needs to be closed and removed from the router.

The code is constructed in the same way as before, no significant new ideas are introduced. Still, it might be educational to explore the code on your own. As in the other examples, there’s also a test suite which might be useful for verifying that the code actually works.

Summary

In this part we’ve built on the ideas presented in the introductory article, adding communication to our asynchronous processes. As in the last part, the overall structure of the code for all of the different implementations isn’t that different. There are significant differences in type safety, the exact semantics of the constructed objects — but the way communication is performed, via asynchronous message passing — is the same.

It’s quite easy to identify how concepts from actors can be mirrored to the ZIO/Monix worlds. In Akka, each actor is associated with a mailbox: which is a queue of incoming messages. In Monix/ZIO, if we need to model communication, we need to create a queue.

While in Akka we pass around (typed or untyped) ActorRefs, so that one actor can send messages to another actor, in ZIO we pass around (typed) IOQueues and in Monix MQueues or MVars, depending on the use-case. Again, this is not that different.

This gives us another piece of the answer tothe questions stated in the first part of the series: can ZIO/Monix offer an alternative to Akka/Akka Typed? As far as state management and communication is involved: yes. Keep in mind however, that we are looking at a small portion of what Akka is: while things like remoting, clustering or persistence could be implemented using the ZIO/Monix approach as well, there are no libraries which implement these functionalities (at least yet).

In the final part, we’ll look at failure management, supervision and cancellation. Stay tuned!

Originally published on blog.softwaremill.com