Tuning Broadway RabbitMQ Pipelines for Latency

Brown and white rabbit in front of white brick wall
Elle Imhoff

Engineer

Elle Imhoff

That’s not very concurrent…

While working on a client project using broadway, we noticed something odd. One of our pipelines was processing jobs serially, even though we had set concurrency: 25. To figure out why, I had to dig into the library code and get a better mental model of how it works.

Our use case for this pipeline was essentially this:

  • We get jobs from RabbitMQ.
  • Each job requires processing which takes tens of seconds.
  • The processing involves network requests, so it’s IO-bound. This means we should be able to do a lot of it concurrently.

Yet somehow we were only processing one job at a time, at least under the light load we were seeing in production. Why?

First, I’ll explain how our usage of Broadway was giving us poor concurrency. Then, I’ll walk you through how I figured this out.

What Went Wrong

Broadway is built on top of gen_stage, which helps us build data processing pipelines. In a pipeline, consumers ask for work, and producers try to produce work to satisfy that demand. broadway provides producers which “produce” work by getting messages from an external queue. Our pipelines use broadway_rabbitmq to get work from RabbitMQ and to handle things like acknowledging messages we process and rejecting those that fail.

Following the documentation at the time, we set up the pipeline with multiple processors (consumers) and multiple producers. Our processors used the default values of max_demand: 10, min_demand: 5.

This seemed reasonable, but there was an important aspect of Broadway that we didn’t understand. Every producer will try to satisfy the demand of every processor. If a processor asks for two messages and there are two producers, the processor may get four messages. Since it can only process them one at a time, the others will sit in its process mailbox until it’s ready for them.

Under light load, this meant that a small number of processors were given all of our available messages, which sat in their inboxes to be processed serially. Meanwhile, our other processors were sitting idle.

What We Changed

Although it’s counterintuitive, the most important change we made to improve concurrency was to only use one producer. Getting messages from the queue was not our bottleneck; processing them was. Using a single producer meant that each processor got only as many messages as it asked for, leaving the rest for other processors to handle.

The second change that helped us was to reduce the processor max_demand to 1. Again, getting messages from the producer was not the slowest step for us; processing them was. So, rather than have a few processors hoard jobs in their mailboxes, we wanted those jobs to go to whichever processor was available next. By having processors only demand one job at a time, we left the others for other processors to demand.

These two changes might apply to any Broadway pipeline where producing work is fast and processing is slow.

Our final change was specific to RabbitMQ. RabbitMQ subscribers don’t actually fetch messages; RabbitMQ pushes messages to them. To enable back pressure, RabbitMQ has a prefetch setting, which broadway_rabbitmq supports. If you set it to 10, RabbitMQ will initially deliver up to 10 messages, but will pause there. Once three of those are acknowledged, it will try to deliver three more to replenish the buffer.

If a Broadway pipeline has a prefetch of 50, it will never be possible for more than 50 processors to be busy at a time. To ensure that all processors in a given pipeline can receive messages, the value should be set to at least max_demand * <number of processors>. In pipelines with batchers, it needs to be even higher.

By changing these settings, I was able to get much better concurrency. Armed with this knowledge, Nathan Long made a series of PRs to broadway, broadway_rabbitmq, amqp, and the RabbitMQ website to clarify their documentation.

Between this post and those PRs, we hope we can help you avoid having the same problems.

Now that you understand the fix, I’ll walk you through how I figured it out.

How I debugged this

In our project, we actually have two Broadway pipelines which run serially. Before we were aware of this issue, we had added some simple SVG timing graphs to the UI to help us visualize their performance. Here’s what we saw.

The time for the entire group to process, including both pipelines, is on top in gray. The gray bars in each picture are the same size, so that you can visually compare the timing.

In the first pipeline, jobs launched as a group are batched together (yellow bar) as they are submitted to the "external_jobs" queue for an external service where batching is more efficient.

Broadway.start_link(ExternalJobs,
  name: ExternalJobs,
  producer: [
    module: {BroadwayRabbitMQ.Producer, queue: "external_jobs"},
    concurrency: 2
  ],
  processors: [
    default: [
      concurrency: 25
    ]
  ],
  batchers: [
    {Manual, batch_size: 100, batch_timeout: 1_000, concurrency: 1},
    {Automatic, batch_size: 100, batch_timeout: 1_000, concurrency: 1}
  ]
)

That external service will generate more jobs for the 2nd, "internal_jobs" queue, which is processed by the second pipeline.

Broadway.start_link(InternalJobs,
  name: InternalJobs,
  producer: [
    module: {BroadwayRabbitMQ.Producer, queue: "internal_jobs"},
    concurrency: 2
  ],
  processors: [
    default: [
      concurrency: 25
    ]
  ]
)

These child jobs are run all internal to Elixir, so we’re able to use Tasks to parallelize them and don’t use a further Broadway pipeline. We see the rainbow stack for each set of internal jobs related to one external job. We didn’t expect the group of internal jobs belonging to an external job to be run all at once until completion before the other internal jobs related to a different external job would run. We expected all internal jobs to run in parallel because our processors concurrency is 25.

It’s a problem in our code

Although I am the creator of the IntelliJ Elixir plugin which has a very nice graphical debugger, I knew that using any debugging techniques that slow down or pause execution (such as the graphical debugger or IEx.pry) would alter the timing too much to be a valid test. So, that leaves us with print-based debugging.

My first suspicion was that maybe the graphs were lying. Maybe the two external jobs weren’t really being completed at the same time because there was some post-processing before we submit them to the "internal_jobs" queue.

2021-02-19 21:50:53.311 [debug] -
 [ExternalJobs.run] -
 Launching external batch
 jobs: ["job1", "job2"]

The external service’s output was parsed and sent to the “internal_jobs” queue.

2021-02-19 21:51:23.334 [debug] -
  [ExternalJobs][publish_to_internal_jobs_queue/1]
  Job: "job1"
2021-02-19 21:51:23.336 [debug] -
  [ExternalJobs][publish_to_internal_jobs_queue/1]
  Job: "job2"

At this point the internal jobs are being sent to the queue with only a 0.002 second (2 ms) gap, so effectively parallel; however, by the time they are taken off the "internal_jobs" queue by InternalJobs the Broadway pipeline, they’re serialized:

2021-02-19 21:51:23.337 [debug] -
 [InternalJobs][handle_message/3]
2021-02-19 21:51:44.988 [debug] -
 [InternalJobs][handle_message/3] 

That’s a gap of (44.988-23.337) 21.651 seconds! There’s no way that’s the reasonable “load” delay because this is using a local development machine. There are no other internal jobs processing when this happens. The only interpretation is that InternalJobs 25 processors aren’t seeing the second job until after the first job completes!

By adding the pid to the log output of InternalJobs.handle_message/3, we can see that the same process is being used (#PID<0.744.0>):

2021-02-22 08:29:54.991 [debug] -
  [InternalJobs][handle_message/3] -
  in pid (#PID<0.744.0>) 
2021-02-22 08:30:16.693 [debug] -
  [InternalJobs][handle_message/3] -
  in pid (#PID<0.744.0>)

Since the same pid is being used, it makes sense that it is being done serially, so the concurrency count isn’t the problem, the problem is the work is being given to the same processor. I decided to investigate why Broadway is doing this next.

It’s a problem in the library code

I did Find Usages for InternalJobs.handle_messages/3, but since I haven’t extended that to Behaviour callbacks yet in IntelliJ Elixir, I had to drop down to Find In Path. I hypothesized that handle_message/3 would be called in broadway and not gen_stage or broadway_rabbitmq because handle_message/3 is defined in Broadway.

To investigate this, I started adding logging to the library code. Since mix compile assumes that library code does not change, I used mix deps.compile broadway to compile my changes.

Adding the logging where I found calls to functions called handle_message* in broadway, I found that Broadway.Topology.ProcessorStage.handle_message/4 is called immediately before InternalJobs.handle_messages/3.

2021-02-22 08:49:34.757 [debug] -
  [Broadway.Topology.ProcessorStage][handle_messages/4]
2021-02-22 08:49:34.759 [debug] -
  [InternalJobs][handle_message/3] -
  in pid (#PID<0.2109.0>)
2021-02-22 08:49:56.477 [debug] - 
 [Broadway.Topology.ProcessorStage][handle_messages/4]
2021-02-22 08:49:56.478 [debug] - 
 [InternalJobs][handle_message/3] 
 in pid (#PID<0.2109.0>)

This makes sense since there is a direct call to module.handle_message(processor_key, message, context).

  defp handle_messages([message | messages], successful, failed, state) do
    %{
      module: module,
      context: context,
      processor_key: processor_key,
      batchers: batchers,
      name: name
    } = state

    start_time = System.monotonic_time()
    emit_message_start_event(start_time, processor_key, name, message)

    try do
      message =
        module.handle_message(processor_key, message, context)
        |> validate_message(batchers)

Logging in BroadwayRabbitMQ.Producer.handle_info/2 and the previous logging in Broadway.Topology.ProcessorStage.handle_messages/4 we can see the gap between when the producer produces the message and the processor processes the message:

Produced:

2021-02-22 09:46:50.606 [debug] -
 [BroadwayRabbitMQ.Producer][handle_info/2] -
 message = %Broadway.Message{..., data: "external_job_1_info"}

Processed:

2021-02-22 09:47:12.364 [debug] -
 [Broadway.Topology.ProcessorStage][handle_messages/4] -
 message = %Broadway.Message{..., data: "external_job_1_info"}

The elapsed time between produced and processed for external_job_1_info is 21.758 seconds.

Produced:

2021-02-22 09:46:50.605 [debug] -
 [BroadwayRabbitMQ.Producer][handle_info/2] -
 message = %Broadway.Message{..., data: "external_job_2_info"}

Processed:

2021-02-22 09:46:50.606 [debug] -
 [Broadway.Topology.ProcessorStage][handle_messages/4] -
 message = %Broadway.Message{..., data: "external_job_2_info"}

However, for the other message, external_job_2_info is only 0.001 seconds!

So, it is possible for the processor to get a produced message in 0.001 seconds, or ~1 millisecond (it is approximate because 0.001 seconds is the lowest shown timing amount and it could vary based on how close 50.605X was to 50.606.

This variability from effectively no-time to the time it takes to do all the internal jobs further confirmed my suspicion that serialization was occurring, but I still couldn’t explain why.

This seemed like the work wasn’t being divied up correctly between all the processors. Reading the docs for Broadway, I had seen that Broadway supports partitioning, but as I showed above, we’re not using the partition_by option to Broadway.start_link/2. I needed to find out how Broadway sent work to processors when NOT partitioned.

In Broadway.Topology.build_producers_specs/2, I found that when the producers are assigned a demand dispatcher

    # The partition of the producer depends on the processor, so we handle it here.
    dispatcher =
      case processor_config[:partition_by] do
        nil ->
          GenStage.DemandDispatcher

        func ->
          n_processors = processor_config[:concurrency]
          hash_func = fn msg -> {msg, rem(func.(msg), n_processors)} end
          {GenStage.PartitionDispatcher, partitions: 0..(n_processors - 1), hash: hash_func}
      end

Since we’re not using :partition_by, we must be using GenStage.DemandDispatcher.

It’s a problem in the library’s dependency’s code

So, that meant looking for a bug in GenStage.DemandDispatcher. Adding logging to GenStage.DemandDispatcher.dispatch_demand/3.

    defp dispatch_demand(events, length, [{counter, pid, ref} | demands]) do
      Logger.debug("self = #{inspect(self())}; events = #{inspect(events)}; length = #{inspect(length)}; counter = #{inspect}; pid = #{inspect(pid)}; ref = #{inspect(ref)}; demands = #{inspect(demands)}")
      {deliver_now, deliver_later, length, counter} = split_events(events, length, counter)
      Process.send(pid, {:"$gen_consumer", {self(), ref}, deliver_now}, [:noconnect])
      demands = add_demand(counter, pid, ref, demands)
      dispatch_demand(deliver_later, length, demands)
    end

GenStage.DemandDispatcher dispatches both messages to #PID<0.2109.0>, so it is serialized on that Process.

2021-02-22 15:17:23.269 [debug] -
 [GenStage.DemandDispatcher][dispatch_demand/3] -
 self = #PID<0.2104.0>;
 events = [%Broadway.Message{..., data: "messag1"}];
 length = 1;
 counter = 10;
 pid = #PID<0.2109.0>; ref = #Reference<0.1611094919.4259840002.14116>;
 demands = [{10, #PID<0.2110.0>, #Reference<0.1611094919.4259840009.13280>}, {10, #PID<0.2111.0>, #Reference<0.1611094919.4259840010.14487>}, ...]
2021-02-22 15:17:23.274 [debug] - 
 [InternalJobs][handle_message/3] - 
 pid = #PID<0.2109.0>;
 term = "external_job_1_info"

The elapsed time between dispatched and processed is 0.003 seconds.

2021-02-22 15:17:23.272 [debug] -
 [GenStage.DemandDispatcher][dispatch_demand/3] -
 self = #PID<0.2100.0>;
 events = [%Broadway.Message{..., data: "messag2"}];
 length = 1;
 counter = 10;
 pid = #PID<0.2109.0>; 
 ref = #Reference<0.1611094919.4259840002.14115>; 
 demands = [{10, #PID<0.2110.0>, #Reference<0.1611094919.4259840009.13279>}, {10, #PID<0.2111.0>, #Reference<0.1611094919.4259840010.14488>}, ...]
2021-02-22 15:17:46.751 [debug] - 
 [InternalJobs][handle_message/3] - 
 pid = #PID<0.2109.0>;
 term = "external_job_2_info" 

The elapsed time between dispatched and processed is 23.477 seconds. However, the elapsed time between the two dispatches is only 0.003 seconds, so all the delay is on the processor’s side, not the dispatchers.

In both cases, when the dispatch_demand/3 is called, the counter has been reset to 10, which is the default max_demand. What I needed to figure out next was how the counter resets to 10 instead of another processor getting the next event. Looking at the GenStage.Dispatcher behaviour, I noticed ask/3:

  @doc """
  Called every time a consumer sends demand.
  The demand will always be a positive integer (more than 0).
  This callback must return the `actual_demand` as part of its
  return tuple. The returned demand is then sent to producers.
  It is guaranteed the reference given in `from` points to a
  reference previously given in subscribe.
  """
  @callback ask(demand :: pos_integer, from :: {pid, reference}, state :: term) ::
              {:ok, actual_demand :: non_neg_integer, new_state}
            when new_state: term

So, was the first processor somehow asking for demand too much and so resetting its demand?

Simplify the test case

As I stated in the beginning, we had a producer concurrency of 25, so when I showed the log as:

demands = [{10, #PID<0.2110.0>, #Reference<0.1611094919.4259840009.13279>}, {10, #PID<0.2111.0>, #Reference<0.1611094919.4259840010.14488>}, ...]

It was much worse than that - the ... represented 21 more entries. The full list was:

[{10, #PID<0.2110.0>, #Reference<0.1611094919.4259840009.13280>}, 
 {10, #PID<0.2111.0>, #Reference<0.1611094919.4259840010.14487>}, 
 {10, #PID<0.2112.0>, #Reference<0.1611094919.4259840010.14490>}, 
 {10, #PID<0.2113.0>, #Reference<0.1611094919.4259840010.14491>}, 
 {10, #PID<0.2114.0>, #Reference<0.1611094919.4259840009.13286>}, 
 {10, #PID<0.2115.0>, #Reference<0.1611094919.4259840009.13288>}, 
 {10, #PID<0.2116.0>, #Reference<0.1611094919.4259840009.13290>}, 
 {10, #PID<0.2117.0>, #Reference<0.1611094919.4259840009.13291>}, 
 {10, #PID<0.2118.0>, #Reference<0.1611094919.4259840009.13294>}, 
 {10, #PID<0.2119.0>, #Reference<0.1611094919.4259840009.13296>}, 
 {10, #PID<0.2120.0>, #Reference<0.1611094919.4259840009.13297>}, 
 {10, #PID<0.2121.0>, #Reference<0.1611094919.4259840009.13299>}, 
 {10, #PID<0.2122.0>, #Reference<0.1611094919.4259840008.17507>}, 
 {10, #PID<0.2123.0>, #Reference<0.1611094919.4259840002.14161>}, 
 {10, #PID<0.2124.0>, #Reference<0.1611094919.4259840002.14164>}, 
 {10, #PID<0.2125.0>, #Reference<0.1611094919.4259840002.14166>}, 
 {10, #PID<0.2126.0>, #Reference<0.1611094919.4259840002.14168>}, 
 {10, #PID<0.2127.0>, #Reference<0.1611094919.4259840002.14169>}, 
 {10, #PID<0.2128.0>, #Reference<0.1611094919.4259840002.14171>}, 
 {10, #PID<0.2129.0>, #Reference<0.1611094919.4259840002.14177>}, 
 {10, #PID<0.2130.0>, #Reference<0.1611094919.4259840008.17532>}, 
 {10, #PID<0.2131.0>, #Reference<0.1611094919.4259840002.14179>}, 
 {10, #PID<0.2132.0>, #Reference<0.1611094919.4259840008.17539>}, 
 {10, #PID<0.2133.0>, #Reference<0.1611094919.4259840002.14186>}]

Before adding in the ask/3 logging, I needed a simpler test case, so I reduced the concurrency to 3 and max_demand to 1, to try to force each processor process to get 1 internal jobs apiece.

Then adding the logging in GenStage.handle_info/2:

  def handle_info(
        {:"$gen_producer", {consumer_pid, ref} = from, {:ask, counter}},
        %{consumers: consumers} = stage
      )
      when is_integer(counter) do
    case consumers do
      %{^ref => _} ->
        %{dispatcher_state: dispatcher_state} = stage
        Logger.debug("consumer_pid (#{inspect(consumer_pid)}) is asking  for #{counter}")
        dispatcher_callback(:ask, [counter, from, dispatcher_state], stage)

      %{} ->
        msg = {:"$gen_consumer", {self(), ref}, {:cancel, :unknown_subscription}}
        send_noconnect(consumer_pid, msg)

        {:noreply, stage}
    end
  end

Processor #PID<0.2109.0>

It is started:

2021-02-23 12:19:15.639 [info] -
[supervisor][report_progress/2] -
Child InternalJobs.Broadway.Processor_default_0 of Supervisor InternalJobs.Broadway.ProcessorSupervisor started
Pid: #PID<0.2109.0>
Start Call: Broadway.Topology.ProcessorStage.start_link(
    [name: InternalJobs.Broadway.Processor_default_0, 
     module: InternalJobs,
     ...
     processor_config: [
       hibernate_after: 15000, 
       concurrency: 3, 
       max_demand: 1
     ]], 
    ...
  )
Restart: :permanent
Shutdown: 30000
Type: :worker

0.146 seconds after starting, it asks twice for 1 within 0.001 seconds without processing any messages:

2021-02-23 12:19:15.785 [debug] -
[GenStage][handle_info/2] -
consumer_pid (#PID<0.2109.0>) is asking for 1
2021-02-23 12:19:15.786 [debug] -
[GenStage][handle_info/2] -
consumer_pid (#PID<0.2109.0>) is asking for 1

49.671 seconds later it processes its first message:

2021-02-23 12:20:05.457 [debug] -
[InternalJobs][handle_message/3] -
pid = #PID<0.2109.0>

21.963 seconds later it asks immediately before it starts processing:

2021-02-23 12:20:27.420 [debug] -
[GenStage][handle_info/2] -
consumer_pid (#PID<0.2109.0>) is asking for 1
2021-02-23 12:20:27.421 [debug] -
[InternalJobs][handle_message/3] -
pid = #PID<0.2109.0>

The 4th ask from #PID<0.2109.0> is 52.175 seconds later, likely after the message above was processed completely.

2021-02-23 12:21:19.595 [debug] -
[GenStage][handle_info/2] -
consumer_pid (#PID<0.2109.0>) is asking for 1

Process #PID<0.2110.0>

It is started:

2021-02-23 12:19:15.640 [info] -
[supervisor][report_progress/2] -
Child InternalJobs.Broadway.Processor_default_1 of Supervisor InternalJobs.Broadway.ProcessorSupervisor started
Pid: #PID<0.2110.0>
Start Call: Broadway.Topology.ProcessorStage.start_link(
    [name: InternalJobs.Broadway.Processor_default_1,
     module: InternalJobs,
     ...
     processor_config: [
       hibernate_after: 15000, 
       concurrency: 3, 
       max_demand: 1
     ]],
     ... 
  )
Restart: :permanent
Shutdown: 30000
Type: :worker

0.152 seconds after starting, it asks twice for 1 without any measurable time elapsing

2021-02-23 12:19:15.792 [debug] -
[GenStage][handle_info/2] -
consumer_pid (#PID<0.2110.0>) is asking for 1
2021-02-23 12:19:15.792 [debug] -
[GenStage][handle_info/2] -
consumer_pid (#PID<0.2110.0>) is asking for 1

49.665 seconds later it starts processing.

2021-02-23 12:20:05.457 [debug] -
[InternalJobs][handle_message/3] -
pid = #PID<0.2110.0>

The 3rd ask from #PID<0.2110.0> is 80.242 seconds later, likely after the message above processed completely.

2021-02-23 12:21:25.699 [debug] -
[GenStage][handle_info/2] -
consumer_pid (#PID<0.2110.0>) is asking for 1

Processor #PID<0.2111.0>

It is started:

2021-02-23 12:19:15.640 [info] -
[supervisor][report_progress/2] -
Child InternalJobs.Broadway.Processor_default_2 of Supervisor InternalJobs.Broadway.ProcessorSupervisor started
Pid: #PID<0.2111.0>
Start Call: Broadway.Topology.ProcessorStage.start_link([name: InternalJobs.Broadway.Processor_default_2, partition: 2, type: :consumer, resubscribe: 100, terminator: InternalJobs.Broadway.Terminator, module: InternalJobs, context: %InternalJobs{http_client: %HttpEngine.HttpClient{ac_auth_client_server: AcAuthClient, hackney: :hackney, httpoison: HTTPoison}, queue: "scan_engine.scans.internal"}, dispatcher: nil, processor_key: :default, processor_config: [hibernate_after: 15000, concurrency: 3, max_demand: 1], producers: [InternalJobs.Broadway.Producer_0, InternalJobs.Broadway.Producer_1], batchers: :none], [name: InternalJobs.Broadway.Processor_default_2, hibernate_after: 15000])
Restart: :permanent
Shutdown: 30000
Type: :worker

Seconds after starting, it asks twice for 1 without any measurable time elapsing.

2021-02-23 12:19:15.792 [debug] -
[GenStage][handle_info/2] -
consumer_pid (#PID<0.2111.0>) is asking for 1
2021-02-23 12:19:15.792 [debug] -
[GenStage][handle_info/2] -
consumer_pid (#PID<0.2111.0>) is asking for 1

… it never is given any work even though its ask is before #PID<0.2109.0> is given its second chunk of work.

Timeline

I needed a timeline to wrap my head around what was going on and if the processing gaps made sense.

Time Process
Absolute Delta Elapsed #PID<0.2109.0> #PID<0.2110.0> #PID<0.2111.0>
2021-02-23 12:19:15.639 00:00.000 Start
2021-02-23 12:19:15.640 00:00.001 00:00.001 Start
2021-02-23 12:19:15.640 00:00.000 00:00.001 Start
2021-02-23 12:19:15.785 00:00.145 00:00.146 Ask 1
2021-02-23 12:19:15.786 00:00.001 00:00.147 Ask 1
2021-02-23 12:19:15.792 00:00.006 00:00.153 Ask 1
2021-02-23 12:19:15.792 00:00.000 00:00.153 Ask 1
2021-02-23 12:19:15.792 00:00.000 00:00.153 Ask 1
2021-02-23 12:19:15.792 00:00.000 00:00.153 Ask 1
2021-02-23 12:20:05.457 00:49.665 00:49.818 Handle 1
2021-02-23 12:20:05.457 00:00.000 00:49.818 Handle 1
2021-02-23 12:20:27.420 00:21.963 01:11.781 Ask 1
2021-02-23 12:20:27.421 00:00.001 01:11.782 Handle 1
2021-02-23 12:21:19.595 00:52.174 02:03.956 Ask 1
2021-02-23 12:21:25.699 00:06.104 02:10.060 Ask 1

Asking for seconds when some haven’t gotten their first helping

Looking at the timeline table, it’s more obvious that #PID<0.2111.0> is being starved while there is work to be done, but that work waits until #PID<0.2109.0> is done with its first job to handle it.

Who am I talking to?

I needed to explain the double asks. I added more logging to GenStage.handle_info/2:

  def handle_info(
        {:"$gen_producer", {consumer_pid, ref} = from, {:ask, counter}},
        %{consumers: consumers} = stage
      )
      when is_integer(counter) do
    case consumers do
      %{^ref => _} ->
        %{dispatcher_state: dispatcher_state} = stage
        {:registered_name, consumer_registered_name} = Process.info(consumer_pid, :registered_name)
        {:registered_name, registered_name} = Process.info(self(), :registered_name)
        Logger.debug("consumer_pid (#{inspect(consumer_pid)} #{registered_name}) is asking for #{counter} with stage (#{stage}) in GenStage (#{inspect(self())} #{registered_name})")
        dispatcher_callback(:ask, [counter, from, dispatcher_state], stage)

      %{} ->
        msg = {:"$gen_consumer", {self(), ref}, {:cancel, :unknown_subscription}}
        send_noconnect(consumer_pid, msg)

        {:noreply, stage}
    end
  end

InternalJobs.Broadway.Processor_default_0 asks for 1 from InternalJobs.Broadway.Producer_1:

2021-02-23 22:11:19.724 [debug] -
 [GenStage][handle_info/2] -
 consumer_pid (#PID<0.2109.0> InternalJobs.Broadway.Processor_default_0) is asking for 1 with stage (...)
  in GenStage (#PID<0.2104.0> InternalJobs.Broadway.Producer_1)

InternalJobs.Broadway.Processor_default_0 asks for 1 from InternalJobs.Broadway.Producer_0:

2021-02-23 22:11:19.724 [debug] -
 [GenStage][handle_info/2] -
 consumer_pid (#PID<0.2109.0> InternalJobs.Broadway.Processor_default_0) is asking for 1 with stage (...)
   in GenStage (#PID<0.2100.0> InternalJobs.Broadway.Producer_0)

InternalJobs.Broadway.Processor_default_1 asks for 1 from InternalJobs.Broadway.Producer_1:

2021-02-23 22:11:19.728 [debug] -
 [GenStage][handle_info/2] -
 consumer_pid (#PID<0.2110.0> InternalJobs.Broadway.Processor_default_1) is asking for 1 with stage (...)
   in GenStage (#PID<0.2104.0> InternalJobs.Broadway.Producer_1)

InternalJobs.Broadway.Processor_default_2 asks for 1 from InternalJobs.Broadway.Producer_1:

2021-02-23 22:11:19.731 [debug] -
 [GenStage][handle_info/2] -
  consumer_pid (#PID<0.2111.0> InternalJobs.Broadway.Processor_default_2) is asking for 1 with stage (...)
    in GenStage (#PID<0.2104.0> InternalJobs.Broadway.Producer_1)

InternalJobs.Broadway.Processor_default_1 asks for 1 from InternalJobs.Broadway.Producer_0:

2021-02-23 22:11:19.732 [debug] -
 [GenStage][handle_info/2] -
 consumer_pid (#PID<0.2110.0> InternalJobs.Broadway.Processor_default_1) is asking for 1 with stage (...)
  in GenStage (#PID<0.2100.0> InternalJobs.Broadway.Producer_0)

InternalJobs.Broadway.Processor_default_2 asks for 1 from InternalJobs.Broadway.Producer_0:

2021-02-23 22:11:19.741 [debug] -
 [GenStage][handle_info/2] -
 consumer_pid (#PID<0.2111.0> InternalJobs.Broadway.Processor_default_2) is asking for 1 with stage (...)
   in GenStage (#PID<0.2100.0> InternalJobs.Broadway.Producer_0)

InternalJobs.Broadway.Processor_default_0 handles a message:

2021-02-23 22:16:33.532 [debug] -
 [InternalJobs][handle_message/3] -
 pid = #PID<0.2109.0>; registered_name = InternalJobs.Broadway.Processor_default_0; term = ...

InternalJobs.Broadway.Processor_default_1 handles a message:

2021-02-23 22:16:33.535 [debug] -
 [InternalJobs][handle_message/3] -
 pid = #PID<0.2110.0>; registered_name = InternalJobs.Broadway.Processor_default_1; term = ...

InternalJobs.Broadway.Processor_default_0 asks for 1 from InternalJobs.Broadway.Producer_1:

2021-02-23 22:16:55.341 [debug] -
 [GenStage][handle_info/2] -
 consumer_pid (#PID<0.2109.0> InternalJobs.Broadway.Processor_default_0) is asking for 1 with stage (...)
   in GenStage (#PID<0.2104.0> InternalJobs.Broadway.Producer_1)

InternalJobs.Broadway.Processor_default_0 handles a message:

2021-02-23 22:16:55.341 [debug] -
 [InternalJobs][handle_message/3] -
 pid = #PID<0.2109.0>; registered_name = InternalJobs.Broadway.Processor_default_0; term ...

InternalJobs.Broadway.Processor_default_1 asks for 1 from InternalJobs.Broadway.Producer_1:

2021-02-23 22:17:20.908 [debug] -
 [GenStage][handle_info/2] -
 consumer_pid (#PID<0.2110.0> InternalJobs.Broadway.Processor_default_1) is asking for 1 with stage (...) 
   in GenStage (#PID<0.2104.0> InternalJobs.Broadway.Producer_1)

InternalJobs.Broadway.Processor_default_0 asks for 1 from InternalJobs.Broadway.Producer_0:

2021-02-23 22:17:46.585 [debug] -
 [GenStage][handle_info/2] -
 consumer_pid (#PID<0.2109.0> InternalJobs.Broadway.Processor_default_0) is asking for 1 with stage (...)
   in GenStage (#PID<0.2100.0> InternalJobs.Broadway.Producer_0)

With the addition of the in GenStage (PID REGISTERED_NAME) to the end of the consumer_pid ... is asking messages, we can more clearly see that the “repeated” :asks from prior runs is actually :asks to 2 different producers. As the Broadway.start_link call in the InternalJobs, shows 2 producers are setup:

  Broadway.start_link(__MODULE__,
      ...
      producer: [
        module: {producer, config},
        concurrency: 2
      ]
  )

One is the loneliest number

The next trial reduced the producer concurrency to 1 and it worked!

Broadway.start_link(InternalJobs,
  name: InternalJobs,
  producer: [
    module: {BroadwayRabbitMQ.Producer, queue: "internal_jobs"},
    concurrency: 1
  ],
  processors: [
    default: [
      concurrency: 25,
      max_demand: 1
    ]
  ]
)

All three processors run their internal jobs concurrently without a job getting stuck queued to a processor that is busy.

Why less producer concurrency leads to better processor concurrency

This bug showed up due to

  1. Testing locally under low-load conditions
  2. An incomplete understanding of how Broadway.start_link options affect how work is distributed.

The new mental model for how producer concurrency and processors max_demand works is that the maximum number of messages that a single processor can get in its mailbox is producer[:concurrency] * processors[:default][:max_demand], NOT processors[:default][:max_demand] alone. Each producer tracks demand fulfillment separately. I said producer[:concurrency] * processors[:default][:max_demand] is the maximum, but because we’re using RabbitMQ as the producer and we know that RabbitMQ round-robins each message in the queue to each client, the producer will alternate and we’ll get exactly the maximum. RabbitMQ round-robins single messages and the producers have independent demand tracking for each processor, while the DemandDispatcher round-robins entire buckets leads to the max fill scenario.

To get the processors concurrency being 25 to actually mean we will always parallelize 25 jobs whenever we have 1-25 available, we need to eliminate the queuing, which means having the processors max_demand be 1 AND the producer concurrency be 1, so that we don’t multiply that 1 into multiple slots.

This bug and its fix only applies to scenarios where you have low-load and can’t fill all the queues, but you’re suffering from high-latency for the first jobs.

The rest of the story

If you’d like to see a demonstration of this problem yourself, Nathan Long made a demo repo that shows this and a later problem we found with tuning prefetch_count, where it must be no less than the processor concurrency.

processor_concurrency = 25

Broadway.start_link(InternalJobs,
  name: InternalJobs,
  producer: [
    module: {BroadwayRabbitMQ.Producer, queue: "internal_jobs", qos: [prefetch_count: processor_concurrency]},
    concurrency: 1
  ],
  processors: [
    default: [
      concurrency: processor_concurrency,
      max_demand: 1
    ]
  ]
)

Newsletter

Stay in the Know

Get the latest news and insights on Elixir, Phoenix, machine learning, product strategy, and more—delivered straight to your inbox.

Narwin holding a press release sheet while opening the DockYard brand kit box