Introduction
I am routinely asked 2 questions that usually look something like this: I am building a startup/application/product/whatever,
- Should I use Elixir?
- Should I use Nx?
Usually, without hearing the use case, the best answer I can give to both of these questions is: Use whatever language you enjoy working in. Arguments over language preferences are about as productive as arguments over your favorite color or your favorite food. Generally, it’s just a matter of opinion.
The second best answer I can give to these questions is a resounding “YES.” Building applications with Elixir—especially machine-learning-powered applications—is easier than ever. In a few hours, you can build out a scalable machine-learning application with automatic retraining, labeling tools, and more. In this post, I am going to show you how. We’re going to build a simple enriched newsfeed that receives headlines in “real-time” and processes them with enrichments. We’ll also discuss how you can build on some of Elixir’s primitives to build continuous training, data labeling for providing feedback to the model, and more.
Note that this is a relatively simple example of a machine learning application; however, I believe the simplicity of the implementation holds as your application scales. One of the things that continuously impresses me about the Elixir ecosystem is how much you can accomplish with language primitives. A chief complaint I hear from developers coming from other ecosystems is the lack of library support for their specific use case. It’s true that the Elixir ecosystem lacks the breadth of library coverage of an ecosystem like Python, but that doesn’t make it any less powerful. In fact, whenever I find myself reaching for a library, I often realize the need is solved with some simple application of features built-in to the language. Even more impressive, these solutions are often simpler than the library or libraries I had envisioned using.
Now, don’t get me wrong, I do sometimes miss the convenience of pip install thekitchensink
; however, I find that when building large applications this overreliance on dependencies leads to less maintainable programs in the long term. Inevitably thekitchensink
doesn’t meet your niche use case, and you need to start monkey patching the library or coming up with your own primitives. Adopting Elixir has the potential to greatly simplify your application’s stack.
Getting Started
To begin, let’s create a new Phoenix application:
mix phx.new news
This application will make use of the Top-Headlines API from https://newsapi.org. To continue, you’ll need to create an account and grab an API Key. Now, you’ll need a way to interact with the API. First, install Req
as your HTTP Client:
{:req, "~> 0.3"}
Then create a new file api.ex
in lib/news/api.ex
and add the following code:
defmodule News.API do
@moduledoc """
A simple wrapper around the NewsAPI REST API.
"""
alias News.Article
@doc """
Returns the top headlines in the country.
"""
def top_headlines() do
"top-headlines"
|> url()
|> Req.get(params: %{country: "us", apiKey: api_key()})
|> handle_response()
end
defp url(path) do
base_url() <> path
end
defp handle_response({:ok, %{status: status, body: body}}) when status in 200..299 do
body["articles"]
|> Enum.reject(& &1["content"] == nil)
|> Enum.map(&normalize_article/1)
end
defp normalize_article(body) do
%Article{
author: body["author"],
content: body["content"],
description: body["description"],
published_at: body["published_at"],
source: get_in(body, ["source", "id"]),
title: body["title"],
url: body["url"],
url_to_image: body["urlToImage"]
}
end
defp base_url, do: "https://newsapi.org/v2/"
defp api_key, do: Application.get_env(:news, __MODULE__)[:api_key]
end
This code will make a request to the News API and return the top headlines normalized as News.Article
structs. Before running, you’ll need to create a separate module in lib/news/article.ex
:
defmodule News.Article do
defstruct [
:author,
:content,
:description,
:published_at,
:source,
:title,
:url,
:url_to_image
]
end
Next, you’ll need to add the following to your config.exs
to pick up your News API key:
config :news, News.API, api_key: System.get_env("NEWS_API_KEY")
Now you can start your program and verify your API works:
iex -S mix
iex> News.API.top_headlines()
[
%News.Article{
author: "Nouran Salahieh, Holly Yan",
content: "Editors Note: Affected by the storm? Use CNNs lite site for low bandwidth.\r\nHurricane Idalia has been linked to at least two deaths in Florida as it thrashes northern parts of the state and south Geo… [+7447 chars]",
description: "Hurricane Idalia has been linked to at least two deaths in Florida as it thrashes northern parts of the state and south Georgia – whipping winds up to 90 mph, dumping heavy rain and hurling seawater into flooded cities.",
published_at: nil,
source: "cnn",
title: "Hurricane Idalia turns deadly, pummeling Florida with record-breaking storm surge and catastrophic flooding. And more dangers loom - CNN",
url: "https://www.cnn.com/2023/08/30/weather/florida-hurricane-idalia-wednesday/index.html",
url_to_image: "https://media.cnn.com/api/v1/images/stellar/prod/230830081414-02-idalia-tarpon-springs-0830.jpg?c=16x9&q=w_800,c_fill"
},
...
]
This API is “pull-based.” That is to say updates aren’t real-time. Unfortunately, since Twitter killed the free streaming API, there aren’t many freely available streaming news and text APIs out there anymore. Fortunately, we can simulate real-time updates a bit using a GenServer. Create a new module News.Stream
in lib/news/stream.ex
and add the following code:
defmodule News.Stream do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def connect(pid) do
GenServer.call(__MODULE__, {:connect, pid})
end
@impl true
def init(_opts) do
{:ok, :unused_state}
end
@impl true
def handle_call({:connect, from}, _from, _unused_state) do
schedule_stream(from)
headlines = News.API.top_headlines()
{:reply, Enum.take_random(headlines, 3), :unused_state}
end
@impl true
def handle_info({:stream, pid}, _unused_state) do
schedule_stream(pid)
headlines = News.API.top_headlines()
send(pid, {:stream, Enum.take_random(headlines, 3)})
{:noreply, :unused_state}
end
defp schedule_stream(pid) do
# Send news articles to pid at a random interval
rand_minutes = :rand.uniform(10) * 1000 * 60
Process.send_after(self(), {:stream, pid}, rand_minutes)
end
end
This simulates a real-time news stream that continuously sends updates to a calling process at random intervals. Now you’ll want to start your GenServer in your application.ex
:
# Start "News Stream"
News.Stream,
Now, it’s time to set up some machine-learning enrichments to run on these news streams.
Enriching Articles
If you worked for a hedge fund, you’d likely want to perform some machine-learning enrichments on incoming headlines to identify sentiments, involved parties, etc. We can do just that using Elixir and Bumblebee—without even needing to train a model. We’ll start with a simple model that uses named-entity recognition to extract proper nouns such as places and things from a headline. First, add Nx, EXLA, and Bumblebee to your dependencies:
{:nx, "~> 0.6"},
{:exla, "~> 0.6"},
{:bumblebee, "~> 0.3"}
Next, create a new module in lib/news/enrichments/ner.ex
and add the following code:
defmodule News.Enrichments.NER do
@moduledoc """
Bumblebee-based NER on headlines
"""
alias News.Article
def predict(%Article{title: title} = article) do
%{entities: entities} = Nx.Serving.batched_run(__MODULE__, title)
%{article | entities: entities}
end
def predict(articles) when is_list(articles) do
preds = Nx.Serving.batched_run(__MODULE__, Enum.map(articles, & &1.title))
Enum.zip_with(articles, preds, fn article, %{entities: entities} ->
%{article | entities: entities}
end)
end
def serving() do
{:ok, model} = Bumblebee.load_model({:hf, "dslim/bert-base-NER"})
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "bert-base-uncased"})
Bumblebee.Text.token_classification(model, tokenizer,
aggregation: :same,
defn_options: [compiler: EXLA],
compile: [batch_size: 8, sequence_length: 128]
)
end
end
This creates a new %Nx.Serving{}
struct that will perform token classification with a named entity recognition model to extract named entities from the given article. It will then attach the entities to the article. In order for this to work, you’ll need to update your article struct:
defmodule News.Article do
defstruct [
:author,
:content,
:description,
:published_at,
:source,
:title,
:url,
:url_to_image,
:entities
]
end
Next, you’ll need to add your serving to your application.ex
:
{Nx.Serving,
serving: News.Enrichments.NER.serving(),
name: News.Enrichments.NER,
batch_size: 8,
sequence_length: 128}
The next enrichment you’ll implement is sentiment analysis. Sentiment analysis measures the overall sentiment (positive or negative) of a text. Sentiment scores can have ranges, but in this case we’ll just look at the articles as having positive, negative, or neutral impact. You can create a new serving for sentiment analysis by adding the following code in a new module lib/news/enrichments/sentiment.ex
:
defmodule News.Enrichments.Sentiment do
@moduledoc """
Bumblebee based financial sentiment analysis.
"""
alias News.Article
def predict(%Article{title: title} = article) do
%{predictions: preds} = Nx.Serving.batched_run(__MODULE__, title)
%{label: max_label} = Enum.max_by(preds, & &1.score)
%{article | sentiment: max_label}
end
def predict(articles) when is_list(articles) do
preds = Nx.Serving.batched_run(__MODULE__, Enum.map(articles, & &1.title))
Enum.zip_with(articles, preds, fn article, %{predictions: pred} ->
%{label: max_label} = Enum.max_by(pred, & &1.score)
%{article | sentiment: max_label}
end)
end
def serving() do
{:ok, model} = Bumblebee.load_model({:hf, "ahmedrachid/FinancialBERT-Sentiment-Analysis"})
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "ahmedrachid/FinancialBERT-Sentiment-Analysis"})
Bumblebee.Text.text_classification(model, tokenizer,
defn_options: [compiler: EXLA],
compile: [batch_size: 8, sequence_length: 128]
)
end
end
You’ll also need to update your Article
struct:
defmodule News.Article do
defstruct [
:author,
:content,
:description,
:published_at,
:source,
:title,
:url,
:url_to_image,
:entities,
:sentiment
]
end
And finally you’ll want to add this to your application.ex
:
{Nx.Serving,
serving: News.Enrichments.Sentiment.serving(),
name: News.Enrichments.Sentiment,
batch_size: 8},
With your enrichments set up, it’s time to create a simple Newsfeed!
A NewsFeed with LiveView
First, create a new route for your newsfeed:
live "/", FeedLive.Index, :index
Next, create a new file lib/news_web/live/feed_live/index.ex
and add the following:
defmodule NewsWeb.FeedLive.Index do
use NewsWeb, :live_view
@impl true
def mount(_params, _session, socket) do
{:ok, socket}
end
@impl true
def render(assigns) do
~H"""
<div class="max-w-2xl mx-auto my-8">
<h1 class="text-xl">News Feed</h1>
</div>
"""
end
end
If you navigate to the homepage of your app, you’ll see a simple header “News Feed”. Now, let’s set up your LiveView to connect to the News “stream”. First, adjust mount to connect:
def mount(_params, _session, socket) do
articles = News.Stream.connect(self())
{:ok, assign(socket, :articles, articles)}
end
Next, implement a handle_info
callback to handle incoming articles:
@impl true
def handle_info({:stream, new_articles}, socket) do
{:noreply, update(socket, :articles, fn articles ->
new_articles ++ articles
end)}
end
Now, update your News feed to render new articles as they come in:
@impl true
def render(assigns) do
~H"""
<div class="max-w-2xl mx-auto my-8">
<h1 class="text-xl">News Feed</h1>
<ul class="divide-y divide-gray-100">
<li :for={article <- @articles}>
<a href={article.url} target="_window">
<div class="px-4 py-4">
<h2 class="text-md font-medium"><%= article.title %></h2>
<p class="text-sm"><%= article.description %></p>
</div>
</a>
</li>
</ul>
</div>
"""
end
After refreshing, you will see a newsfeed that repeatedly adds new articles to the beginning of the feed. Now, we want to apply our enrichments to each of these articles; however, we don’t want the prediction to block rendering. We can implement some asynchronous processing such that our enrichments run concurrently and our feed updates accordingly. First, add a new function process_enrichments
to your LiveView:
defp process_enrichments(socket, articles) do
socket
|> do_ner_enrich(articles)
|> do_sentiment_enrich(articles)
end
Now implement the actual enrichment functions:
defp do_ner_enrich(socket, articles) do
ner_task = Task.async(fn ->
{ignore, enrich} = Enum.split_with(articles, & &1.entities != nil)
ignore ++ News.Enrichments.NER.predict(enrich)
end)
assign(socket, :ner_task, ner_task)
end
defp do_sentiment_enrich(socket, articles) do
sentiment_task = Task.async(fn ->
{ignore, enrich} = Enum.split_with(articles, & &1.sentiment != nil)
ignore ++ News.Enrichments.Sentiment.predict(enrich)
end)
assign(socket, :sentiment_task, sentiment_task)
end
Now add process_enrichments
to run on mount
:
@impl true
def mount(_params, _session, socket) do
articles = News.Stream.connect(self())
socket =
socket
|> process_enrichments(articles)
|> assign(:articles, articles)
{:ok, socket}
end
Next, add it to run every time you receive new articles:
@impl true
def handle_info({:stream, new_articles}, socket) do
socket =
socket
|> process_enrichments(new_articles)
|> update(:articles, fn articles -> new_articles ++ articles end)
{:noreply, socket}
end
Finally, we just have to handle the message that the LiveView receives when a task completes, in this case we just have to update the articles:
def handle_info({_task, articles}, socket) do
{:noreply, assign(socket, :articles, articles)}
end
And we should also handle this case:
def handle_info({:DOWN, _, _, _, _}, socket) do
{:noreply, socket}
end
And finally, let’s change the styles so that we can actually see our entities and sentiment:
@impl true
def render(assigns) do
~H"""
<div class="max-w-2xl mx-auto my-8">
<h1 class="text-xl">News Feed</h1>
<ul class="divide-y divide-gray-100">
<li :for={article <- @articles}>
<a href={article.url} target="_window">
<div class={[class_for_sentiment(article.sentiment), "px-4 py-4"]}>
<h2 class="text-md font-medium"><%= article.title %></h2>
<p class="text-sm"><%= article.description %></p>
<div class="inline-flex space-x-2">
<%= if article.entities do %>
<span
:for={entity <- article.entities}
class="p-[0.5] bg-gray-50 rounded-md text-xs"
><%= entity.phrase %>-<%= entity.label %></span>
<% end %>
</div>
</div>
</a>
</li>
</ul>
</div>
"""
end
defp class_for_sentiment("positive"), do: "bg-green-100"
defp class_for_sentiment("negative"), do: "bg-red-100"
defp class_for_sentiment(_class), do: ""
And that’s all you need! Now you’ll have a continuously updating enriched news feed!
What next?
This article demonstrated some very simple concepts, but I hope it convinces you how easy it is to build machine-learning applications that take advantage of Elixir’s concurrency model. If you’re an experienced Elixir programmer, you might already be thinking about how this could be much better extended with tools like Broadway. Broadway is a data processing library in Elixir, that can be used to scale pipelines to millions of events. Theoretically, you could build these enrichments directly on top of Broadway, and it would scale as much as you’d need.
Additionally, you can combine your application with a job processor like Oban, and implement continuous training of machine learning models. As you receive more data, you send it off for labeling, and trigger training runs as you reach a certain amount of data or as you see fit. With LiveView, it’s very easy to build labeling applications that rival and completely replace enterprise labeling solutions that would otherwise cost thousands of dollars to license. The possibilities are endless.
Until next time!