We're planting a tree for every job application! Click here to learn more

Elixir Concurrent Programming

Bruno Ripa

6 Aug 2018

7 min read

Elixir Concurrent Programming
  • Elixir

Introduction

In Elixir if you need to write concurrent code, the GenServer module offers you lots of nice functions to handle the boilerplate it takes.

Let’s imagine we have a module called Concurrent and you want it to be run into a detached process; all it takes is to wrap it using the GenServer module: it exposes a bunch of special functions that help you define the way your concurrent module must work.

Let’s start with a very simple example.

defmodule Concurrent do
  use GenServer
  require Logger

  def start_link do
    GenServer.start_link(__MODULE__, [])
  end

end

The start_link function is used to launch a GenServer process.

To try it, just type iex -S mix and type Concurrent.start_link; here’s the output you should be seeing:

  iex(5)> Concurrent.start_link
  {:ok,# PID<0.164.0>}

As you can see, a tuple is returned: the :ok atom and the process id for the process.

Note: the special __MODULE__ variable is used to automatically provide the module name to the start_link function, which is supposed to receive it. Also, the second argument, the empty list [], is the initial state of the process, which in this example is empty.

If, by any chance, you need to make some stuff when the process starts, you can implement the init function (if you don’t provide your own implementation, a warning will be shown and a default implementation will be injected by the compiler).

def init(initial_data) do
  Logger.info("Starting #{__MODULE__}")
  {:ok, %{state: initial_data}}
end

Execute this (don’t forget to type recompile if you’re still in the old iex session) and you will see:

21:25:33.310 [info] Starting Elixir.Concurrent
{:ok,# PID<0.177.0>}

So now you can also see the log message we injected in our init implementation. Please also note that you must return the tuple {:ok, initial_data} when you provide your function implementation. On the other side, the initial_data is whatever is passed to the start_link function.

What about the state ?

I have previously mentioned that the GenServer module gives you the chance to store the state of the process. Let’s understand how.

We have seen that whatever is passed to start_link is injected into the initial_data, which represents the inital state of the module. So, we can retrieve the state of the process by using the call function of the GenServer module:

GenServer.call(pid, {:get_state})

Anytime the GenServer.call function is invoked the compiler looks for any existing implementation of the handle_call function, selecting the right one by applying pattern matching using the second argument provided. For example, assuming the following implementation is provided:

def handle_call({:get_state}, _from, current_state) do
  current_state = Map.get(current_state, :state)
  {:reply, current_state, current_state}
end

By typing in the iex session:

GenServer.call(pid, {:get_state})

will make you obtain:

iex(4)> GenServer.call(pid, {:get_state})
[]

The handle_call function takes 3 arguments:

  1. the identifier and the arguments for the function
  2. a reference to the caller
  3. the current state of the process

So this is the signature that your function must have. On the other side, the return value is always a tuple with the :reply atom as first argument, indicating that the function is actually returning something; the second one is the response value, and the last one is the new state that this process will hold (as you can understand, they are the same in our implementation since we are not changing anything here).

Note: the handle_call function is synchronous, meaning that it will be blocking the code execution until the response is ready and returned.

Of course, we might create an helper function to retrieve the state of the process:

def get_state(pid) do
  GenServer.call(pid, {:get_state})
end

Updating the state of a process

If i have not lost you so far, it might be pretty easy to understand that changing the state of a process can be a very easy task. For example:

def get_state(pid) do
  GenServer.call(pid, {:get_state})
end

def set_state(pid, state) do
  GenServer.call(pid, {:set_state, state})
end

def handle_call({:get_state}, _from, current_state) do
  current_state = Map.get(current_state, :state)
  {:reply, current_state, current_state}
end

def handle_call({:set_state, new_state}, _from, _current_state) do
  updated_state = %{
  state: new_state
  }

  {:reply, updated_state, updated_state}
end

which will give us:

iex(7)> {:ok, pid} = Concurrent.start_link
22:46:50.703 [info] Starting Elixir.Concurrent
{:ok,# PID<0.469.0>}
iex(8)> Concurrent.get_state(pid)
[]
iex(9)> Concurrent.set_state(pid, "test")
%{state: "test"}

Note: this is of course a pretty useless state management logic, consider it just a placeholder for something worth showing :D

Interprocess communication

In order to make possible for processes to communicate GenServer exposes, among the others, 3 functions:

  • handle_call
  • handle_cast
  • handle_info

All of them access the current state of the process, and all of them return it.

The one that differs the most from the others is the handle_call one, which receives the from parameter and returns a tuple with 3 elements, the second one being the response value. Do not confuse this with the last argument, the new state, which is used to update the current state of the process.

We have already seen how to use the handle_call one, which, i remember, is synchronous. handle_cast, instead, is triggered by the GenServer.cast call, and is the one to be used if you do not need immediate response and can afford to go asynchronous.

GenServer.cast(pid, {:something_to_cast})

This method will only return the :ok atom.

About GenServer.handle_info, we must observe that no GenServer.info will exist, and can be used to receive messages from any Elixir process except GenServer. For example, the one to send a message to our GenServer process can be the standard Elixir Process module, by using the send function. Like handle_cast it always returns the :ok atom.

An example of synchronous vs asynchronous scenario

Imagine we have a payment system, where clients send a payload that describes the item to purchase; we can have two types of implementation:

  • a synchronous one: the client must wait for a response from the process, and so the user
  • an asynchronous one: the client gets detached after the payload delivery and the user can keep on using the application

Of course, the second implementation requires other features, like the ability to notify a client about a successful (or not) purchase; it is by far the more convenient for a good user experience, but it’s not that unusual to find synchronous implementations around.

In this scenario we will be having the following code:

def handle_call({:payment, payload}, _from, _current_state) do

  # Of course here we can imagine a deeper analysis and use of
  # the payload
  updated_state = %{
    payment: payload
  }

  # Let's process the payment

  {:reply, updated_state, updated_state}
end

Also, we have added a new module, to better outline concepts and responsibilities:

defmodule PaymentProcessor do
  require Logger

  @moduledoc """
  Documentation for PaymentProcessor.
  """

  def process_payment(payload) do
    {:ok, pid} = Concurrent.start_link()
    Concurrent.process_payment(pid, "payload")
    Logger.info("#{__MODULE__}: payment processed")
  end
end

After recompiling the code in the iex session, we will see:

iex(20)> {:ok, pid} = Concurrent.start_link
10:08:51.953 [info] Starting Elixir.Concurrent
{:ok,# PID}
iex(21)> PaymentProcessor.process_payment("payload")
:ok
10:08:53.233 [info] Starting Elixir.Concurrent
10:08:53.233 [info] Elixir.Concurrent: done
10:08:53.233 [info] Elixir.PaymentProcessor: payment processed

So we start the Concurrent process, that will be handling payments, and use the PaymentProcess one to trigger the process of a payment.

If we want to go asynchronous, we must slightly modify our code. We need a function to handle the GenServer.cast call:

def handle_cast({:payment, payload}, _current_state) do

  # Of course here we can imagine a deeper analysis and use of the payload
  updated_state = %{
    payment: payload
  }
  :timer.sleep(3000)
  Logger.info("Payload #{payload} has been processed.")
  
  #  Let's process the payment
  # ...
  
  {:noreply, updated_state}
end

As well as a helper function in the Concurrent module:

def submit_payment(pid, payload) do
  GenServer.cast(pid, {:payment, payload})
  Logger.info("#{__MODULE__}: done")
end

Last, but not least, to have consistent names, we add the following function to the PaymentProcessor module:

def submit_payment(payload) do
  {:ok, pid} = Concurrent.start_link()
  Concurrent.submit_payment(pid, payload)
  Logger.info("#{__MODULE__}: payment processed. Process detached.")
end

Recompiling the code and executing again:

iex(48)> {:ok, pid} = Concurrent.start_link
10:54:29.690 [info] Starting Elixir.Concurrent
{:ok,# PID}
iex(49)> PaymentProcessor.submit_payment("test")
10:54:30.475 [info] Starting Elixir.Concurrent
10:54:30.475 [info] Elixir.Concurrent: done
10:54:30.475 [info] Elixir.PaymentProcessor: payment processed. Process detached.
:ok
iex(50)>
10:54:33.476 [info] Payload test has been processed.

As you can see, the client is totally detached from the payment process, and after 3 seconds (because of the :timer.sleep(3000) command) the Payload test has been processed pops up.

So, we made hopefully clear that if you have clear in mind what problem you are resoliving, if it needs concurrent code execution, GenServer module can be a great help.

The next reasonable question is: how can I be sure that a given process does not crash ? The answer is Supervisor.

The Supervisor module

The Supervisor module is able to control a GenServer process and respawn it if crashes, according to some specific policies.

Let’s jump straight on the code:

defmodule ConcurrentSupervisor do
  use Supervisor

  @moduledoc """
  Documentation for ConcurrentSupervisor.
  """

  def start_link do
    Supervisor.start_link(__MODULE__, [])
  end

  def init(initial_data) do
    children = [
      worker(Concurrent, [])
    ]

    supervise(children, strategy: :one_for_one)
  end
end

If you, as always, test it in the iex session, you will see:

iex(5)> ConcurrentSupervisor.start_link
09:55:47.943 [info] Starting Elixir.Concurrent
{:ok,# PID<0.142.0>}

So basically we are having back the pid of the Supervisor process, and we can see the log of the Concurrent process that is started.

As you can see, the children list is made by the modules we want to run, wrapped in a worker function, which represents a sort of template; it receives the module to wrap and the list of options to pass to it. Pay attention here: what you pass to the worker is used as argument of the module start_link function, where you have to correctly handle them before injecting in the GenServer.start_link call in your module.

Basically, if you pass values there, the signature of your module start_link must accept them.

Once the Supervisor starts and launches the workers, it calls, by default, the start_link method of the module it wraps.

Supervisor strategies

In the supervise command we must pass a strategy type. Possible values are:

  • :one_for_one: if any of the worker crashes, it gets restarted
  • :one_for_all: if any of the worker crashes, all of them are restarted
  • :rest_for_one: if one of the worker crashes, only the ones defined right below it (in the children list) get restarted
  • :simple_one_for_one: this is slightly different from other strategies; we’ll see this laters.

For example, imagine that we have a children like the following:

children = [  worker(ConcurrentA, []),
  worker(ConcurrentB, []),
  worker(ConcurrentC, [])
]

with 3 existing modules, :one_for_one would cause that any of the workers crashing will be automatically restarted, always; :one_for_all means that if one of the workers crashes, all of them will be restarted; :rest_for_one means that if ConcurrentB crashes, it and ConcurrentC will be restarted.

A different strategy: :simple_on_for_one

If you create a Supervisor that uses such strategy and you try to start it you will see that no workers are launched; it’s because this strategy pretends explicit and manual starting of the workers, by using Supervisor.start_child; this means that you have the flexibility to spawn multiple occurrences of the same process. Basically, the whole idea of this strategy is that you might need an undefined number of processes, spawning more of them, for example, due to some specific demand or needs. The lifecycle of the workers of a Supervisor so defined must explicitly be controlled, as already told, by using some Supervisor functions like stat_child and `terminate_chi

Did you like this article?

Bruno Ripa

CEO & Founder @ Polaris BR

See other articles by Bruno

Related jobs

See all

Title

The company

  • Remote

Title

The company

  • Remote

Title

The company

  • Remote

Title

The company

  • Remote

Related articles

JavaScript Functional Style Made Simple

JavaScript Functional Style Made Simple

Daniel Boros

12 Sep 2021

JavaScript Functional Style Made Simple

JavaScript Functional Style Made Simple

Daniel Boros

12 Sep 2021

WorksHub

CareersCompaniesSitemapFunctional WorksBlockchain WorksJavaScript WorksAI WorksGolang WorksJava WorksPython WorksRemote Works
hello@works-hub.com

Ground Floor, Verse Building, 18 Brunswick Place, London, N1 6DZ

108 E 16th Street, New York, NY 10003

Subscribe to our newsletter

Join over 111,000 others and get access to exclusive content, job opportunities and more!

© 2024 WorksHub

Privacy PolicyDeveloped by WorksHub