Batching for Operations with Elixir and Broadway

Pizza pie and hand holding slice

Broadway is a concurrent, multi-stage tool for building data ingestion and data processing pipelines.

If you’re getting started with Broadway, you may not have taken the time to dive into batchers and the handle_batch/4 callback. That’s okay! It’s optional, after all. But if your aim is to build efficient data pipelines, it’s worth taking another look.

What follows is a post in three acts, wherein we:

  1. Setup a Broadway pipeline,
  2. Improve its efficiency with batchers.
  3. Learn something about eventual consistency.

But first, we need just a sprinkle of…

Exposition

Let’s say we want to build a fulfillment pipeline for orders from our online pizza shop, Mystex Pizza. We’ve decided to try a run of Mystex Pizza on Broadway… (holds for applause)

We’re currently using a payment gateway called PizzaBarn, but their implementation comes with the following caveats:

  • Event types cannot be filtered.
  • Data is sent as Base64 encoded JSON.

Let’s start by creating our fulfillment pipeline using Broadway.start_link/2:

# lib/mystex_pizza/fulfillment_pipeline.ex

defmodule MystexPizza.FulfillmentPipeline do
  @moduledoc """
  Consumes events from the PizzaBarn producer.

  Processed messages are stored as entries in the database.
  """
  use Broadway
  alias Broadway.{BatchInfo, Message}
  alias MystexPizza.{Customer, Order, Repo}

  def start_link(_arg) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producers: [
        default: [
          module: {PizzaBarn.EventProducer, []}
        ]
      ],
      processors: [
        default: []
      ]
    )
  end
end

Act I: Processing Messages

To build our pipeline, we’ll need to decode the raw message data and filter on the event type. We only care about *.created messages for now.

Let’s start out by defining a handle_message/3 callback. This will be called by the processors:

# lib/mystex_pizza/fulfillment_pipeline.ex

@impl Broadway
def handle_message(_processor_name, message, _context) do
  message
  |> Message.update_data(&decode!/1)
  |> process_message()
end

The handle_message/3 callback accepts:

  1. The processor atom
  2. A Broadway.Message struct
  3. A context term

Inside this function, you are expected to process the message (update its data, do some work, etc.) and then return the message. Broadway.Message provides helper functions to make manipulating the message easy. One of the main features of Broadway is its ability to automatically acknowledge messages, so your only responsibility in handle_message/3 is to do whatever work you need to do from the data contained in the Message struct.

We’ll handle PizzaBarn’s double decoding requirement with a private decode!/1 function:

# lib/mystex_pizza/fulfillment_pipeline.ex

defp decode!(data) when is_binary(data) do
  data |> Base.decode64!() |> Jason.decode!()
end

Next, we’ll pipe the result to our private process_message/1 function that does the hard work:

# lib/mystex_pizza/fulfillment_pipeline.ex

# processes messages with expected data from PizzaBarn
defp process_message(
        %Message{data: %{"type" => event, "object" => attrs}} = message
      ) do
  # Splits the "type" value to match only on `*.created` events
  with [resource, "created"] <- String.split(event, ".", parts: 2),
        {:ok, _struct} <- create(resource, attrs) do
    message
  else
    {:error, reason} ->
      # fails the message when create/2 fails
      Message.failed(message, reason)

    _ ->
      # ignores event types we don't care about
      message
  end
end

# ignores messages we don't care about
defp process_message(message), do: message

process_message/1 starts off by matching on Message data, which it expects to be a map with "type" and "object" fields. Next, it splits the type value to match only on *.created events. If it gets an event we care about, it calls create/2 and inserts the new resource into the database. If create/2 returns an {:ok, struct} tuple, it returns the message so that Broadway will acknowledge it as successful. Otherwise, it will fail the message with the returned reason. We know it will be receiving other types of events as well, so it has added a catch-all to its else block to keep it from crashing on messages we don’t care about.

(I’ve omitted the implementation for create/2 because that’s not why we’re here.)

Act II: Batching for Operations

The implementation in Act I works fine, but the processor has to check out a database connection to insert a record for every single *.created event it processes. That’s pretty inefficient, especially if we’re processing lots of events.

Fortunately, with Broadway and Ecto we can make our pipeline really sing!

The key to this technique is grouping operations into batches, otherwise known as Partitioning. For *.created events, we want to insert entries into the database, but there’s certainly no need to do so one entry at a time.

Ecto provides some helpful functions for batch operations. Here we’ll be taking advantage of Repo.insert_all/3 in order to bulk insert multiple entries for a particular schema.

Starting the Batchers

Before we can start partitioning our messages, we’ll need to define some batchers. These are groups of processes responsible for calling the handle_batch/4 callback on your pipeline. We haven’t defined this callback yet, but we will shortly.

First, we’ll configure our batchers alongside our producers and processors when we call Broadway.start_link/2:

# lib/mystex_pizza/fulfillment_pipeline.ex

def start_link(_arg) do
  Broadway.start_link(__MODULE__,
    ...producers/processors...
    batchers: [
      default: [],
      insert_all: [
        batch_size: 10,
        batch_timeout: 1_000
      ]
    ]
  )
end

This call defined two groups of batchers:

  • :default - All Broadway Messages are routed here by default.
  • :insert_all - The group of processes that will handle bulk inserts into the database.

Broadway supports multiple batchers, each with their own configuration, so it’s important to consult the Batchers options section of the Broadway docs for more information and a full list of options.

Routing Messages

In order to route Messages anywhere other than the :default batchers, we need to tell Broadway which batcher should handle the Message. We’ll do this in handle_message/3, which is called by the processors. We’ll use Message.put_batcher/2 to specify the proper batcher.

For a better visual of the pipeline, take a look at the diagram in the Batching section of the Broadway docs.

Let’s update our process_message/1 function to batch our *.created events into bulk insert operations:

# lib/mystex_pizza/fulfillment_pipeline.ex

defp process_message(%Message{} = message) do
  # Determine which batcher gets the message.
  case message.data do
    %{"type" => "customer.created"} ->
      message
      |> Message.put_batcher(:insert_all)
      |> Message.put_batch_key(Customer)

    %{"type" => "order.created"} ->
      message
      |> Message.put_batcher(:insert_all)
      |> Message.put_batch_key(Order)

    _ ->
      # If the message is returned unchanged, it
      # will be routed to the `:default` batcher.
      message
  end
end

Our process_message/1 function now works as a router: it decides which batcher and batch_key to put on the Message, causing Broadway to route the Message to the specified batchers.

Remember that the batcher is the atom for the group of batchers we defined in Broadway.start_link/2. But the real star of our show here is the batch_key. We’re going to use it to ensure Messages get partitioned into groups of the same resource type, which is exactly what we need for bulk inserts!

Message.put_batch_key/2: Defines the batch key within a batcher for the message. Inside each batcher, we attempt to build batch_size within batch_timeout for each batch_key.

Processing Batches

Now that our messages will be put into proper batches, we can define our handle_batch/4 callback:

# lib/mystex_pizza/fulfillment_pipeline.ex

@impl Broadway
def handle_batch(:insert_all, messages, %BatchInfo{batch_key: schema}, _) do
  batch_insert_all(schema, messages)
end

# Ensure all other batches get acknowledged
def handle_batch(_batcher, messages, _batch_info, _context), do: messages

The handle_batch/4 callback accepts:

  1. The batcher atom
  2. A list of Broadway.Message structs from the processors
  3. A Broadway.BatchInfo struct, containing the batch_key
  4. A context term

Inside this callback, you are expected to process the batch of messages and then return the batch of messages. This callback is the last stop for your messages before they are ultimately acknowledged as successful or failed.

To handle batches of messages for bulk inserts, we match on the batcher atom (:insert_all) and extract the batch_key from the BatchInfo struct. Remember, when we previously set the batcher to :insert_all, we also set the batch_key to the schema module for the event type.

Then, we call our private batch_insert_all/2 function:

# lib/mystex_pizza/fulfillment_pipeline.ex

# Bulk inserts Message data into the schema table
defp batch_insert_all(schema, messages, opts \\ []) do
  entries = convert_batch_to_entries(schema, messages)

  # TODO: insert all entries
end

However before we can perform a bulk insert operation, we need to convert the list of Message structs into a list of entries.

What are Entries?

Let’s look at the spec for Repo.insert_all/3:

insert_all(
  schema_or_source :: binary() | {binary(), module()} | module(),
  entries :: [map() | [{atom(), term() | Ecto.Query.t()}]],
  opts :: Keyword.t()
) :: {integer(), nil | [term()]}

This bit from the docs provides more context:

The second argument is a list of entries to be inserted, either as keyword lists or as maps. The keys of the entries are the field names as atoms and the value should be the respective value for the field type or, optionally, an Ecto.Query that returns a single entry with a single value.

Let’s look at how we might build our list of entries:

# lib/mystex_pizza/fulfillment_pipeline.ex

defp convert_batch_to_entries(schema, messages) do
  Enum.map(messages, fn %Message{data: %{"object" => attrs}} ->
    %Changeset{changes: changes} =
      schema
      |> struct!()
      |> schema.changeset(attrs)

    Map.merge(changes, timestamps(schema))
  end)
end

Our private convert_batch_to_entries/2 function accepts the schema module – the one that we stashed in the Message’s batch_key – and the batch of messages. We map over each message, extracting the "object" data, which contains a map of attributes. We invoke the schema module’s changeset/2 function to build an Ecto.Changeset for the entry, but we only retain the map of changes.

Why no validation?

Great catch! We are, in fact, ignoring any validation errors from the Changeset. This is by design – we believe we can trust the source of our data, at least in the sense that if any message fails due to validation errors, we expect all messages will fail, so there’s no virtue in validating them individually.

Keep this in mind when designing your pipelines: if you determine that you need to validate individual messages, you may choose to do so in handle_message/3, that way you can call Message.failed/2 to fail the message early so it won’t be sent to the batchers.

In any case, always remember to return the whole batch of messages from handle_batch/4, otherwise Broadway won’t be able to acknowledge them.

But what’s this all about?

Map.merge(changes, timestamps(schema))

There’s another important note on Repo.insert_all/3 (emphasis mine):

When a schema module is given, the entries given will be properly dumped before being sent to the database. If the schema contains an autogenerated ID field, it will be handled either at the adapter or the storage layer. However any other autogenerated value, like timestamps, won’t be autogenerated when using insert_all/3. This is by design as this function aims to be a more direct way to insert data into the database without the conveniences of insert/2. This is also consistent with update_all/3 that does not handle timestamps as well.

So we’re merging timestamps into each entry, using our own timestamps/1 function, which may or may not work exactly as written for your schemas:

# lib/mystex_pizza/fulfillment_pipeline.ex

defp timestamps(_schema) do
  now = DateTime.utc_now()
  %{inserted_at: now, updated_at: now}
end

Whew, what a ride! We’re finally ready to call Repo.insert_all/3:

  # lib/mystex_pizza/fulfillment_pipeline.ex

  # Replace the TODO in batch_insert_all/2 with this
  case Repo.insert_all(schema, entries, opts) do
    {n, _} when n == length(entries) ->
      messages

    result ->
      batch_failed(messages, {:insert_all, schema, result})
  end

The match on the return value ensures we inserted a number of entries equal to the batch size (number of messages). If not, we fail the entire batch of messages.

Note: This match works great for inserts, but pay close attention to the docs for idiosyncrasies related to return values for updates and/or upserts.

Our batch_failed/2 helper function is about as exciting as you would expect it to be:

# lib/mystex_pizza/fulfillment_pipeline.ex

defp batch_failed(messages, reason) when is_list(messages) do
  Enum.map(messages, &Message.failed(&1, reason))
end

We’re in the home stretch for the batchers, but there’s just one problem: our Order schema doesn’t quite match the PizzaBarn data for an order.

The orders table in the database has a customer_id field that is the foreign key to the customers table. This is important information, as it tells us which order goes to which customer.

But the PizzaBarn "order.created" schema looks like this:

{
  "type": "order.created",
  "object": {
    "id": "ord_000000001",
    "customer": "cus_10000000"
    "amount": 4857
  }
}

Fortunately, we just need to add a match to our convert_batch_to_entries/2 function to handle Order entries separately:

# lib/mystex_pizza/fulfillment_pipeline.ex

# Put this above the other convert_batch_to_entries/2 function
defp convert_batch_to_entries(Order, messages) do
  Enum.map(messages, fn %Message{data: %{"object" => attrs}} ->
    {customer_id, attrs} = Map.pop(attrs, "customer")

    %Changeset{changes: changes} =
      %Order{}
      |> Order.changeset(attrs)
      |> Changeset.put_change(:customer_id, customer_id)

    Map.merge(changes, timestamps(Order))
  end)
end

We did it! Mystex Pizza Order Fulfillment is ready to start ingesting events from PizzaBarn. The crew deployed their changes and were just about to head out to the cast party, when someone noticed something in the logs:

14:53:23.451 [error] ** (Postgrex.Error) ERROR 23503 (foreign_key_violation) insert or update on table "orders" violates foreign key constraint "orders_customer_id_fkey"

    table: orders
    constraint: orders_customer_id_fkey

Key (customer_id)=(cus_035751878) is not present in table "customers".

Act III: Database Constraints and Eventual Consistency

The Problem with Orders

As we pointed out in Act II, the customer_id field on the Order schema is pretty important. It turns out, the database thinks so, too. So much so, that creating an Order that references a Customer when that Customer doesn’t exist just isn’t possible.

So what do we do? We need to make some changes in handle_message/3.

When we moved our work to the :insert_all batchers, we left the processors with little to do beyond decoding the original message data and pattern matching for the batchers. Let’s give them some more responsibility.

Pre-Processing for Orders

When we receive an "order.created" event, we need to ensure that a corresponding Customer entry exists before inserting the Order. But how can we do that if all we have is the "customer" field?

Let’s insert what we know about the customer when we receive the message.

To do this, we’ll add another step to our handle_message/3 callback:

# lib/mystex_pizza/fulfillment_pipeline.ex

@impl Broadway
def handle_message(_processor_name, message, _context) do
  message
  |> Message.update_data(&decode!/1)
  |> pre_process_message()
  |> process_message()
end

In our private pre_process_message/1 function, we’ll take care of whatever prerequisites a particular message might have. For "order.created", it’s going to look like this:

# lib/mystex_pizza/fulfillment_pipeline.ex

# Inserts a placeholder Customer for the Order if one does not exist.
defp pre_process_message(
       %Message{data: %{"type" => "order.created"}} = message
     ) do
  %{data: %{"object" => %{"customer" => customer_id}}} = message

  customer = %Customer{id: customer_id}
  changeset = Customer.changeset(customer, %{})

  case Repo.insert(changeset, on_conflict: :nothing) do
    {:ok, _} ->
      message

    {:error, _} ->
      Message.failed(message, "Could not preprocess customer #{customer_id}")
  end
end

# Returns other messages to continue the pipeline
defp pre_process_message(%Message{} = message), do: message

That function could probably be made a lot prettier, but the important part to call out is this:

Repo.insert(changeset, on_conflict: :nothing)

That on_conflict: :nothing is what’s going to keep us from overriding a real Customer entry if it already exists.

Inserts as Upserts

Our bulk inserts for "order.created" messages should now be able to operate properly, but we’ve introduced a problem for "customer.created": how can we insert a unique Customer if one already exists?

This problem essentially boils down to a race condition: Both the pre-processor for Orders and the batcher for Customers are going to be racing each other to try and create a customer record.

The Order pre-processor is nice enough not to stomp all over an existing entry if we win the race, but if it wins the race, we still need to get the additional Customer data into the database.

And so our final scene unfolds as follows, wherein we close the loop on the eventual consistency of our Customer data, with a return to handle_batch/4:

# lib/mystex_pizza/fulfillment_pipeline.ex

@impl Broadway
def handle_batch(:insert_all, messages, %{batch_key: Customer}, _) do
  batch_insert_all(Customer, messages,
    on_conflict: :replace_all_except_primary_key,
    conflict_target: [:id]
  )
end

def handle_batch(:insert_all, messages, %{batch_key: schema}, _) do
  batch_insert_all(schema, messages)
end

# Ensure all other batches get acknowledged
def handle_batch(_, messages, _, _), do: messages

Inserting batches of Customers is the same as inserting batches of Orders, except that we may have to update some existing entries, as well as insert new entries. Using the on_conflict and conflict_target options with Repo.insert_all/3, we move this responsibility to the database, where the inserts and updates can happen consistently and atomically, all without our intervention.

Note: Recall the previous warning about database idiosyncrasies with upserts. Matching on the number of entries returned will continue to work fine with a Postgres database, but MySQL will return unexpected values for updated entries. If you’re using MySQL, you may need to find an alternative method to ensure your upserts completed successfully.

Curtain Call

If you want to see this pattern in action, please check out the demo repository that serves as the basis for this post: https://github.com/mcrumm/mystex_pizza_on_broadway

If you find any bugs, or see room for improvement, please submit an issue or, even better, a pull request.

I want to thank Mike Binns, Jason Goldberger, and Aaron Renner for taking the time to review this post and for providing invaluable feedback. I sincerely appreciate it!

The Show Must Go On

Before we turn on the ghost light, it’s worth mentioning that this pattern has a lot of room to grow. Partitioning your messages for efficient bulk operations takes some planning, but the benefits are well worth the effort, especially if you’re pushing data into services that are more expensive to call than your database.

Fortunately, Broadway makes this process seem almost too easy, so take another look at your data pipelines to see where you can really make them sing.

DockYard is a digital product agency offering custom software, mobile, and web application development consulting. We provide exceptional professional services in strategy, user experience, design, and full stack engineering using Ember.js and Elixir. With staff nationwide, we’ve got consultants in key markets across the U.S., including Portland, Los Angeles, Salt Lake City, Minneapolis, Dallas, Miami, Washington D.C., and Boston.

Newsletter

Stay in the Know

Get the latest news and insights on Elixir, Phoenix, machine learning, product strategy, and more—delivered straight to your inbox.

Narwin holding a press release sheet while opening the DockYard brand kit box