End-to-End Machine Learning in Elixir

Newspapers folded over and stacked against each other.
Sean Moriarity

Machine Learning Advisor

Sean Moriarity

Introduction

I am routinely asked 2 questions that usually look something like this: I am building a startup/application/product/whatever,

  1. Should I use Elixir?
  2. Should I use Nx?

Usually, without hearing the use case, the best answer I can give to both of these questions is: Use whatever language you enjoy working in. Arguments over language preferences are about as productive as arguments over your favorite color or your favorite food. Generally, it’s just a matter of opinion.

The second best answer I can give to these questions is a resounding “YES.” Building applications with Elixir—especially machine-learning-powered applications—is easier than ever. In a few hours, you can build out a scalable machine-learning application with automatic retraining, labeling tools, and more. In this post, I am going to show you how. We’re going to build a simple enriched newsfeed that receives headlines in “real-time” and processes them with enrichments. We’ll also discuss how you can build on some of Elixir’s primitives to build continuous training, data labeling for providing feedback to the model, and more.

Note that this is a relatively simple example of a machine learning application; however, I believe the simplicity of the implementation holds as your application scales. One of the things that continuously impresses me about the Elixir ecosystem is how much you can accomplish with language primitives. A chief complaint I hear from developers coming from other ecosystems is the lack of library support for their specific use case. It’s true that the Elixir ecosystem lacks the breadth of library coverage of an ecosystem like Python, but that doesn’t make it any less powerful. In fact, whenever I find myself reaching for a library, I often realize the need is solved with some simple application of features built-in to the language. Even more impressive, these solutions are often simpler than the library or libraries I had envisioned using.

Now, don’t get me wrong, I do sometimes miss the convenience of pip install thekitchensink; however, I find that when building large applications this overreliance on dependencies leads to less maintainable programs in the long term. Inevitably thekitchensink doesn’t meet your niche use case, and you need to start monkey patching the library or coming up with your own primitives. Adopting Elixir has the potential to greatly simplify your application’s stack.

Getting Started

To begin, let’s create a new Phoenix application:

mix phx.new news

This application will make use of the Top-Headlines API from https://newsapi.org. To continue, you’ll need to create an account and grab an API Key. Now, you’ll need a way to interact with the API. First, install Req as your HTTP Client:

  {:req, "~> 0.3"}

Then create a new file api.ex in lib/news/api.ex and add the following code:

defmodule News.API do
  @moduledoc """
  A simple wrapper around the NewsAPI REST API.
  """
  alias News.Article

  @doc """
  Returns the top headlines in the country.
  """
  def top_headlines() do
    "top-headlines"
    |> url()
    |> Req.get(params: %{country: "us", apiKey: api_key()})
    |> handle_response() 
  end

  defp url(path) do
    base_url() <> path
  end

  defp handle_response({:ok, %{status: status, body: body}}) when status in 200..299 do
    body["articles"]
    |> Enum.reject(& &1["content"] == nil)
    |> Enum.map(&normalize_article/1)  
  end

  defp normalize_article(body) do
    %Article{
      author: body["author"],
      content: body["content"],
      description: body["description"],
      published_at: body["published_at"],
      source: get_in(body, ["source", "id"]),
      title: body["title"],
      url: body["url"],
      url_to_image: body["urlToImage"]
    }
  end

  defp base_url, do: "https://newsapi.org/v2/"

  defp api_key, do: Application.get_env(:news, __MODULE__)[:api_key]
end

This code will make a request to the News API and return the top headlines normalized as News.Article structs. Before running, you’ll need to create a separate module in lib/news/article.ex:

defmodule News.Article do
  defstruct [
    :author,
    :content,
    :description,
    :published_at,
    :source,
    :title,
    :url,
    :url_to_image
  ]
end

Next, you’ll need to add the following to your config.exs to pick up your News API key:

config :news, News.API, api_key: System.get_env("NEWS_API_KEY")

Now you can start your program and verify your API works:

iex -S mix

iex> News.API.top_headlines()
[
  %News.Article{
    author: "Nouran Salahieh, Holly Yan",
    content: "Editors Note: Affected by the storm? Use CNNs lite site for low bandwidth.\r\nHurricane Idalia has been linked to at least two deaths in Florida as it thrashes northern parts of the state and south Geo… [+7447 chars]",
    description: "Hurricane Idalia has been linked to at least two deaths in Florida as it thrashes northern parts of the state and south Georgia – whipping winds up to 90 mph, dumping heavy rain and hurling seawater into flooded cities.",
    published_at: nil,
    source: "cnn",
    title: "Hurricane Idalia turns deadly, pummeling Florida with record-breaking storm surge and catastrophic flooding. And more dangers loom - CNN",
    url: "https://www.cnn.com/2023/08/30/weather/florida-hurricane-idalia-wednesday/index.html",
    url_to_image: "https://media.cnn.com/api/v1/images/stellar/prod/230830081414-02-idalia-tarpon-springs-0830.jpg?c=16x9&q=w_800,c_fill"
  },
  ...
]

This API is “pull-based.” That is to say updates aren’t real-time. Unfortunately, since Twitter killed the free streaming API, there aren’t many freely available streaming news and text APIs out there anymore. Fortunately, we can simulate real-time updates a bit using a GenServer. Create a new module News.Stream in lib/news/stream.ex and add the following code:

defmodule News.Stream do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def connect(pid) do
    GenServer.call(__MODULE__, {:connect, pid})
  end

  @impl true
  def init(_opts) do
    {:ok, :unused_state}
  end

  @impl true
  def handle_call({:connect, from}, _from, _unused_state) do
    schedule_stream(from)
    headlines = News.API.top_headlines()

    {:reply, Enum.take_random(headlines, 3), :unused_state}
  end

  @impl true
  def handle_info({:stream, pid}, _unused_state) do
    schedule_stream(pid)

    headlines = News.API.top_headlines()
    send(pid, {:stream, Enum.take_random(headlines, 3)})

    {:noreply, :unused_state}
  end

  defp schedule_stream(pid) do
    # Send news articles to pid at a random interval
    rand_minutes = :rand.uniform(10) * 1000 * 60
    Process.send_after(self(), {:stream, pid}, rand_minutes)
  end
end

This simulates a real-time news stream that continuously sends updates to a calling process at random intervals. Now you’ll want to start your GenServer in your application.ex:

      # Start "News Stream"
      News.Stream,

Now, it’s time to set up some machine-learning enrichments to run on these news streams.

Enriching Articles

If you worked for a hedge fund, you’d likely want to perform some machine-learning enrichments on incoming headlines to identify sentiments, involved parties, etc. We can do just that using Elixir and Bumblebee—without even needing to train a model. We’ll start with a simple model that uses named-entity recognition to extract proper nouns such as places and things from a headline. First, add Nx, EXLA, and Bumblebee to your dependencies:

{:nx, "~> 0.6"},
{:exla, "~> 0.6"},
{:bumblebee, "~> 0.3"}

Next, create a new module in lib/news/enrichments/ner.ex and add the following code:

defmodule News.Enrichments.NER do
  @moduledoc """
  Bumblebee-based NER on headlines
  """
  alias News.Article

  def predict(%Article{title: title} = article) do
    %{entities: entities} = Nx.Serving.batched_run(__MODULE__, title)
    %{article | entities: entities}
  end

  def predict(articles) when is_list(articles) do
    preds = Nx.Serving.batched_run(__MODULE__, Enum.map(articles, & &1.title))
    Enum.zip_with(articles, preds, fn article, %{entities: entities} ->
      %{article | entities: entities}
    end)
  end

  def serving() do
    {:ok, model} = Bumblebee.load_model({:hf, "dslim/bert-base-NER"})
    {:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "bert-base-uncased"})

    Bumblebee.Text.token_classification(model, tokenizer,
      aggregation: :same,
      defn_options: [compiler: EXLA],
      compile: [batch_size: 8, sequence_length: 128]
    )
  end
end

This creates a new %Nx.Serving{} struct that will perform token classification with a named entity recognition model to extract named entities from the given article. It will then attach the entities to the article. In order for this to work, you’ll need to update your article struct:

defmodule News.Article do
  defstruct [
    :author,
    :content,
    :description,
    :published_at,
    :source,
    :title,
    :url,
    :url_to_image,
    :entities
  ]
end

Next, you’ll need to add your serving to your application.ex:

  {Nx.Serving,
    serving: News.Enrichments.NER.serving(),
    name: News.Enrichments.NER,
    batch_size: 8,
    sequence_length: 128}

The next enrichment you’ll implement is sentiment analysis. Sentiment analysis measures the overall sentiment (positive or negative) of a text. Sentiment scores can have ranges, but in this case we’ll just look at the articles as having positive, negative, or neutral impact. You can create a new serving for sentiment analysis by adding the following code in a new module lib/news/enrichments/sentiment.ex:

defmodule News.Enrichments.Sentiment do
  @moduledoc """
  Bumblebee based financial sentiment analysis.
  """
  alias News.Article

  def predict(%Article{title: title} = article) do
    %{predictions: preds} = Nx.Serving.batched_run(__MODULE__, title)
    %{label: max_label} = Enum.max_by(preds, & &1.score)
    %{article | sentiment: max_label}
  end

  def predict(articles) when is_list(articles) do
    preds = Nx.Serving.batched_run(__MODULE__, Enum.map(articles, & &1.title))
    Enum.zip_with(articles, preds, fn article, %{predictions: pred} ->
      %{label: max_label} = Enum.max_by(pred, & &1.score)
      %{article | sentiment: max_label}
    end)
  end

  def serving() do
    {:ok, model} = Bumblebee.load_model({:hf, "ahmedrachid/FinancialBERT-Sentiment-Analysis"})
    {:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "ahmedrachid/FinancialBERT-Sentiment-Analysis"})

    Bumblebee.Text.text_classification(model, tokenizer,
      defn_options: [compiler: EXLA],
      compile: [batch_size: 8, sequence_length: 128]
    )
  end
end

You’ll also need to update your Article struct:

defmodule News.Article do
  defstruct [
    :author,
    :content,
    :description,
    :published_at,
    :source,
    :title,
    :url,
    :url_to_image,
    :entities,
    :sentiment
  ]
end

And finally you’ll want to add this to your application.ex:

      {Nx.Serving,
        serving: News.Enrichments.Sentiment.serving(),
        name: News.Enrichments.Sentiment,
        batch_size: 8},

With your enrichments set up, it’s time to create a simple Newsfeed!

A NewsFeed with LiveView

First, create a new route for your newsfeed:

live "/", FeedLive.Index, :index

Next, create a new file lib/news_web/live/feed_live/index.ex and add the following:

defmodule NewsWeb.FeedLive.Index do
  use NewsWeb, :live_view

  @impl true
  def mount(_params, _session, socket) do
    {:ok, socket}
  end

  @impl true
  def render(assigns) do
  ~H"""
    <div class="max-w-2xl mx-auto my-8">
      <h1 class="text-xl">News Feed</h1>
    </div>
  """
  end
end

If you navigate to the homepage of your app, you’ll see a simple header “News Feed”. Now, let’s set up your LiveView to connect to the News “stream”. First, adjust mount to connect:

  def mount(_params, _session, socket) do
    articles = News.Stream.connect(self())
    {:ok, assign(socket, :articles, articles)}
  end

Next, implement a handle_info callback to handle incoming articles:

  @impl true
  def handle_info({:stream, new_articles}, socket) do
    {:noreply, update(socket, :articles, fn articles ->
      new_articles ++ articles
    end)}
  end

Now, update your News feed to render new articles as they come in:

  @impl true
  def render(assigns) do
  ~H"""
    <div class="max-w-2xl mx-auto my-8">
      <h1 class="text-xl">News Feed</h1>
      <ul class="divide-y divide-gray-100">
        <li :for={article <- @articles}>
          <a href={article.url} target="_window">
            <div class="px-4 py-4">
              <h2 class="text-md font-medium"><%= article.title %></h2>
              <p class="text-sm"><%= article.description %></p>
            </div>
          </a>
        </li>
      </ul>
    </div>
  """
  end

After refreshing, you will see a newsfeed that repeatedly adds new articles to the beginning of the feed. Now, we want to apply our enrichments to each of these articles; however, we don’t want the prediction to block rendering. We can implement some asynchronous processing such that our enrichments run concurrently and our feed updates accordingly. First, add a new function process_enrichments to your LiveView:

  defp process_enrichments(socket, articles) do
    socket
    |> do_ner_enrich(articles)
    |> do_sentiment_enrich(articles)
  end

Now implement the actual enrichment functions:

  defp do_ner_enrich(socket, articles) do
    ner_task = Task.async(fn ->
      {ignore, enrich} = Enum.split_with(articles, & &1.entities != nil)
      ignore ++ News.Enrichments.NER.predict(enrich)
    end)

    assign(socket, :ner_task, ner_task)
  end

  defp do_sentiment_enrich(socket, articles) do
    sentiment_task = Task.async(fn ->
      {ignore, enrich} = Enum.split_with(articles, & &1.sentiment != nil)
      ignore ++ News.Enrichments.Sentiment.predict(enrich)
    end)

    assign(socket, :sentiment_task, sentiment_task)
  end

Now add process_enrichments to run on mount:

  @impl true
  def mount(_params, _session, socket) do
    articles = News.Stream.connect(self())

    socket =
      socket
      |> process_enrichments(articles)
      |> assign(:articles, articles)

    {:ok, socket}
  end

Next, add it to run every time you receive new articles:

  @impl true
  def handle_info({:stream, new_articles}, socket) do
    socket =
      socket
      |> process_enrichments(new_articles)
      |> update(:articles, fn articles -> new_articles ++ articles end)

    {:noreply, socket}
  end

Finally, we just have to handle the message that the LiveView receives when a task completes, in this case we just have to update the articles:

def handle_info({_task, articles}, socket) do
  {:noreply, assign(socket, :articles, articles)}
end

And we should also handle this case:

def handle_info({:DOWN, _, _, _, _}, socket) do
  {:noreply, socket}
end

And finally, let’s change the styles so that we can actually see our entities and sentiment:

  @impl true
  def render(assigns) do
  ~H"""
    <div class="max-w-2xl mx-auto my-8">
      <h1 class="text-xl">News Feed</h1>
      <ul class="divide-y divide-gray-100">
        <li :for={article <- @articles}>
          <a href={article.url} target="_window">
            <div class={[class_for_sentiment(article.sentiment), "px-4 py-4"]}>
              <h2 class="text-md font-medium"><%= article.title %></h2>
              <p class="text-sm"><%= article.description %></p>
              <div class="inline-flex space-x-2">
                <%= if article.entities do %>
                  <span
                    :for={entity <- article.entities}
                    class="p-[0.5] bg-gray-50 rounded-md text-xs"
                  ><%= entity.phrase %>-<%= entity.label %></span>
                <% end %>
              </div>
            </div>
          </a>
        </li>
      </ul>
    </div>
  """
  end

  defp class_for_sentiment("positive"), do: "bg-green-100"
  defp class_for_sentiment("negative"), do: "bg-red-100"
  defp class_for_sentiment(_class), do: ""

And that’s all you need! Now you’ll have a continuously updating enriched news feed!

What next?

This article demonstrated some very simple concepts, but I hope it convinces you how easy it is to build machine-learning applications that take advantage of Elixir’s concurrency model. If you’re an experienced Elixir programmer, you might already be thinking about how this could be much better extended with tools like Broadway. Broadway is a data processing library in Elixir, that can be used to scale pipelines to millions of events. Theoretically, you could build these enrichments directly on top of Broadway, and it would scale as much as you’d need.

Additionally, you can combine your application with a job processor like Oban, and implement continuous training of machine learning models. As you receive more data, you send it off for labeling, and trigger training runs as you reach a certain amount of data or as you see fit. With LiveView, it’s very easy to build labeling applications that rival and completely replace enterprise labeling solutions that would otherwise cost thousands of dollars to license. The possibilities are endless.

Until next time!

Newsletter

Stay in the Know

Get the latest news and insights on Elixir, Phoenix, machine learning, product strategy, and more—delivered straight to your inbox.

Narwin holding a press release sheet while opening the DockYard brand kit box