In this post I’ll share with you a typical pattern for adding recurring jobs to your Elixir applications. I hope this can be of value to those new to Elixir/OTP.
Why?
Outside of the Elixir ecosystem you might find that it’s common practice to
reach for a third party tool and setup a secondary database to manage your
application’s background jobs. Resist the temptation to automatically do the
same when writing Elixir applications. There is no need to rush into a decision
like that in Elixir land. To run code concurrently in Elixir, the Task
module
provides conveniences for spawning and awaiting tasks. Additionally, OTP
provides the mechanisms needed to create recurring jobs. Become familiar with
this simple pattern and you’ll find yourself incorporating it in many different
ways into the design of your systems.
An example of a simple recurring job
# my_app/lib/my_app/my_job/recurring_job.ex
defmodule MyApp.MyJob.RecurringJob do
@moduledoc """
Responsible of handling my job's schedule.
* Runs job on start and every minute thereafter.
"""
use GenServer
alias MyApp.MyJob
def start_link() do
GenServer.start_link(__MODULE__, nil)
end
def init(_) do
schedule_initial_job()
{:ok, nil}
end
def handle_info(:perform, state) do
MyJob.perform()
schedule_next_job()
{:noreply, state}
end
defp schedule_initial_job() do
Process.send_after(self(), :perform, 5_000) # In 5 seconds
end
defp schedule_next_job() do
Process.send_after(self(), :perform, 60_000) # In 60 seconds
end
end
As you can see, we can use a simple GenServer to schedule a function to run on an interval. A few tips to keep in mind here:
-
It’s critical to keep these “scheduler” type processes completely isolated from the rest of your system. Run these processes under a supervisor so that if they fail they’ll get restarted. This will keep your scheduler firing away even after unexpected failures.
-
Keep these processes simple. The simpler they are the more reliable they are, so you know they’ll continue firing as needed.
-
Reschedule drift will happen. The job will run every minute plus the job duration time (the time it takes for the job to be completed). If you need to run your jobs every minute on the dot, reset that drift as necessary.
-
This is obvious but remember you can run the job async or not, depending on your needs.
Disabling a recurring job in your test environment
You most likely will want to disable scheduler processes of this type in your
test environment. To do so simply add an enabled?/0
function to the module
and use it within init/1
:
# my_app/lib/my_app/my_job/recurring_job.ex
def init(_) do
if enabled?() do
schedule_initial_job()
{:ok, nil}
else
:ignore
end
end
defp enabled?() do
Application.get_env(:my_app, __MODULE__)[:enabled]
end
Then in your config files configure the process as needed. For example, to
disable the process for it not to run in test env add the following to
my_app/config/test.exs
:
# my_app/config/test.exs
config :my_app, MyApp.MyJob.RecurringJob,
enabled: false
So what happens when enabled?/0
returns false? The init/1
callback will
return :ignore
which will cause the process to exit normally. The
application’s supervision tree will start as expected and the process will not
be restarted, effectively starting your application without the RecurringJob
process. To enable the process to run in prod and/or dev environments do the
same but set the enabled
key to true instead of false.
Recurring jobs & distributed Elixir
What will happen if we use this pattern in a distributed environment? Many of us want to deploy our applications to a cluster. If we deploy our application to a cluster where our application runs on two different nodes our recurring job will run twice on start and twice every minute thereafter. How can we edit the example above to make sure this process only runs once within our cluster? Furthermore, how can we make sure the process gets started on a surviving node if the node running the scheduler process dies unexpectedly?
Here’s one way we could do it. First we edit start_link/0
to register the
process globally with the given term and conditionally add a link to an already
started instance of the process. It should look something like this:
# my_app/lib/my_app/my_job/recurring_job.ex
def start_link() do
case GenServer.start_link(__MODULE__, nil, name: {:global, __MODULE__}) do
{:ok, pid} ->
{:ok, pid}
{:error, {:already_started, pid}} ->
Process.link(pid)
{:ok, pid}
:ignore ->
:ignore
end
end
If you’re wondering why Process.link(pid)
is necessary, remember that we want
to make sure that the scheduler keeps running even after the node that’s
running the one instance of the scheduler crashes. Whenever we try to start a
RecurringJob
we create a link to the already started instance of it, when
necessary, so that our RecurringJob
supervisor can be notified of crashes and
restart the scheduler (RecurringJob
) as soon as it crashes. If we don’t do
this the node running the scheduler could crash and surviving nodes won’t be
notified. If the surviving nodes are not notified then the scheduler’s
supervisor will not be able to attempt a restart, leaving our application
without a running scheduler.
Caveats
The pattern described above doesn’t guarantee that every single job will be executed. If you need execution guarantees and more control over retries, use a database to keep track of the job’s status, retry count, and other relevant details.
That’s all folks!
So there it is, this is a typical pattern that you will run into in the wild. I hope that this will be useful to you as you encounter the need to add recurring jobs to your Elixir applications. Explore what’s possible without dependencies before you default to the same techniques that served you well on other platforms. You might find a better way!
Further Reading
If your needs involve a more sophisticated job processing pipeline get familiar with GenStage.
For after you’re familiar with OTP and know what’s possible without dependencies, here’s a short list of tools you might find interesting depending on the job at hand: