Your digital product needs to be rock-solid reliable to keep your users. Elixir can do that for you. Book a free consultation to learn how we can put it to work for you.
In today’s digital era, applications can be complex, consisting of multiple interconnected components. These systems may need to handle myriad tasks, often simultaneously. A single user action may lead to multiple background jobs, communication with a number external services etc. It is expected that regardless of the workload, the user experience should be seamless.
This begs the question: How do we handle such tasks, especially when limited by constraints such as response page sizes and strict request rates that external services enforce upon us? This is exactly the problem that we faced in a recent project.
The Problem
Our backend had to interact with an external service to fetch crucial user data so that customers could use the service. The data would be processed and saved into our database.
However, there were a few constraints enforced upon the API:
- 180 requests per minute
- The response could contain at most 50 results. For example, if I requested a list of users, the API would respond with max 50 users.
API constraints are enforced by every external service for load management and efficiency and it is up to us to properly design the systems around it.
To elaborate, the problem now looked like this: We had to create a background job that would periodically fetch business-critical data from the external service using their APIs, process the data and store it in the database. Since we could only fetch max 50 users at once, it would mean that even for 1000 users we would have to make 20 calls.
Making these calls serially would be a tremendous underutilization of resources since the request would be processed one by one. With the advent of multicore systems, it’s possible to execute CPU-intensive tasks in parallel. That’s the aim here as well. We want to make and process multiple requests in parallel to get users the access they need more quickly.
In the next section we will dive into how Elixir, with its battle-tested foundation of the Erlang VM helped us achieve this.
Achieving Parallelism with Elixir
Elixir leverages BEAM (the Erlang Virtual Machine), which is known for its exceptional support for concurrency, fault tolerance, and distributed computing. With its concurrency model, it allows us to spawn thousands of lightweight processes without any significant load on the system.
Moreover, with its functional programming paradigm, it makes it simpler to write and maintain parallel code.
Erlang provides us with a number of tools and abstractions that Elixir can leverage to build resilient and fault tolerant systems:
- Genserver is a behavior that allows us to create a process server. It will respond to messages and execute callbacks based on that.
- Supervisor allows us to monitor processes. It will restart processes under its supervision.
Now let’s discuss our requirements for the background job:
- The workers should execute the job in parallel utilizing all the cores.
- The job data should be persistent, as should any errors that the worker may encounter while executing it.
- The workers should retry automatically in case of failures.
- We needed to implement a backoff algorithm so that if multiple workers failed at the same time they would not attempt a retry at the same time.
- We needed rate limiting so that we didn’t breach the API rate limit set by the service.
As discussed above, Elixir has all the building blocks necessary to implement background job workers with the required features. However, we didn’t want to reinvent the wheel as it would lead to a considerable amount of code. So, we started looking into some good libraries that would allow us to create jobs that fit into our requirements.
Oban ticked all the boxes, so we decided to move forward with it. In the next sections we will discuss how Oban provided all the features out of the box that we needed along with some code sections.
Oban: A Perfect Fit
Oban is a powerful library that allows us to create and execute background jobs. It allows us to configure queues where the jobs will be executed as well as how many jobs it will execute concurrently. The jobs are executed by workers defined for each queue. These worker modules contain the execution logic for each job. The job data including parameters and errors (if any) are stored in Postgresql. This makes it robust and also helps us to audit and inspect the job data in the future. If it fails, each job will be retried by default using a backoff algorithm with some randomization so that workers don’t attempt retries at the same time. By default the number of retries is set to 20.
Now let’s go through some code examples. We will begin with the start_sync function that inserts the jobs into Postgres to be picked up by Oban and sent to the respective queues where it will be executed by the workers.
def sync_users() do
case ApiClient.get_user_count() do
{:ok, count} ->
insert_job(count, Worker)
{:error, error} = err ->
Logger.error("Couldn't start sync_users. Get user count count api failed. #{inspect(error)}")
err
end
end
defp insert_job(count, Worker) do
0..count
|> Stream.take_every(@page_size)
|> Stream.map(&Worker.new(%{page_number: &1}))
|> Stream.chunk_every(@db_batch_size)
|> Enum.each(&Oban.insert_all(&1))
end
The sync_users()
function calls the external service API to fetch the count of users. This is important because, as
mentioned before, the API responds with at most 50 results. Let’s take an example to understand what exactly is
happening in the above flow. The @page_size
is a configurable module attribute in case we want to reduce the response
size but we will assume that it is 50.
Let us assume that the count = 490
. Stream.take_every(0..count, 50)
will return the following enumerable: [0, 50, 100, 150, 200, 250, 300, 350, 400, 450]
.
Stream.map(&Worker.new(%{page_number: &1}))
will apply the function Worker.new(%{page_number: &1})
over each element
of the enumerable. This function creates a changeset. On insertion into the Oban database, the Oban queue picks it up
and starts a worker with the parameters in the struct. The enumerable will consist of changesets with page_number: 0,
50, 100 till 450.
The Stream.chunk_every(@db_batch_size)
function, as the name suggests, creates chunks of such changesets to be
inserted in bulk in the next step.
As you might have noticed, we used Stream and not Enum. This is for optimization. Streams are lazy enumerables. They merely create an instruction of how to enumerate and what to apply but never perform the actual enumeration. The enumeration will happen once the stream is passed to an actual Enum function. This ensures that we are not unnecessarily performing enumeration at every step.
In the end we call Enum.each(&Oban.insert_all(&1))
. At this step all the instructions in streams are computed and a
final enumerable is produced. As mentioned above, the final enumerable is an enum consisting of chunks of job
changesets. Something like [[changeset1, changeset2], [changeset3 changeset4]]
. The Oban.insert_all function is
applied to each chunk. The insert_all
is a bulk insert function which is more efficient than inserting one by one.
The changesets contain the parameters that the worker uses for the job execution. In our case, upon insertion into the Oban table, the queue would distribute the jobs across the workers.
Now let’s look into the job code:
@impl Worker
def perform(%{args: %{"page_number" => page_number}}) do
case ApiClient.fetch_users(page_number) do
{:error, reason} = err ->
err
users ->
users_list = process_user_data(users)
Nobs.Repo.insert_all(User, users_list, on_conflict: :nothing, conflict_target: :email)
:ok
end
end
This is a heavily simplified version of the actual code that we wrote in our project but the essence remains the same.
The worker receives a page number as a parameter and then uses that to fetch users. For example, if the page number is 0 then it will fetch 50 users from 0 to 49.
Backoff and Rate Limiting
def backoff(%Job{attempt: attempt}) do
trunc(attempt * 2 + :rand.uniform_real() * attempt)
end
We wrote a custom logic for backoff because the default one was exponential in nature. We didn’t want subsequent retries to be delayed that much.
Rate limiting is configured in the application config itself. For example:
queues:
[users:
[rate_limit: [allowed: System.schedulers_online(), period: {4, :minute}]]
]
-
Here
users
is the name of the queue -
allowed: System.schedulers_online()
signifies the number of workers. Here it is equal to the number of cores in the machine. -
period: {4, :minute}
Signifies how many jobs per minute will be executed concurrently in a minute.
It’s important to note that the rate limiting functionality is part of Oban Pro, a more advanced and paid version of the standard Oban library. This feature, along with other enhanced capabilities, requires a subscription to Oban Pro.
To summarize the entire logic:
-
Get the count of users.
-
Create a list of numbers starting with 0 and growing incrementally by 50 until you reach the count. For example,
[0, 50, 100, 150, 200]
-
Create a list of changesets by enumerating over the above list and inserting them in bulk into the Oban table.
-
Each page_number above will correspond to a job. Oban will distribute these jobs over multiple workers for concurrent execution.
-
Each worker receives a page number and uses it to fetch users. For example: A worker that received page number 0 would fetch users from 0-49. The one that received 50 would fetch from 50-99 and so on.
Conclusion
So that wraps up the article. We learned:
-
How to parallelize http requests for efficiency, keeping in mind the various constraints in the external API service.
-
How Erlang provides all the necessary building blocks for parallel execution of tasks.
-
How to use Oban to create background jobs for parallel request processing and how to use its features like backoff and rate limiting to create robust job processor.