DEV Community

Cover image for Using Language Models in a Streaming Context to Understand Financial Markets
Zander for bytewax

Posted on • Originally published at bytewax.io on

Using Language Models in a Streaming Context to Understand Financial Markets

For those who are eager to dive into the code, it's available:

GitHub logo bytewax / news-analyzer

Analyze financial news in real-time with Machine Learning




Effective analysis of news is crucial for understanding the world, especially when it comes to financial markets. Being able to quickly identify significant events, such as a major corporation being hacked and sensitive customer data being compromised, can enable you to respond rapidly and either capitalize on opportunities or minimize losses. In this blog post, we'll delve into how Bytewax and large language models can be leveraged to analyze financial news in real time, providing you with the ability to respond to breaking news more effectively. We need to answer at least three questions to implement our little project successfully:

  • Where do we get the data?
  • How do we analyze it?
  • How do we access the data source and perform analysis in real-time?

Data Source

For the data source used in this demo, we will use the Alpaca news API, which provides websocket access to news articles from Benzinga. To setup an account and create an API key and secret, you can follow the Alpaca documentation. You can use any websocket as a data source. A future follow-up will look at how we can build our own real-time news aggregation pipeline for analysis.

Content Analysis

We're obviously going to leverage Large Language Models (LLMs) to analyze news articles. And the best place which comes to mind when looking for LLMs is Hugging Face.Hugging Face is a company that provides a marketplace where researchers can release models and datasets on their hub that can then be used by other researchers and developers via their hosted model endpoints and their Transformers library. Firstly, we need to perform sentiment analysis on the headline, which can quickly provide valuable insights. For this, we'll use a fine-tuned BERT model called FinancialBERT. Then we will summarize the content of the article, and a fine-tuned BART model will come in handy for this. Both can be found on huggingface.co. We also are going to cover how we can use the Transformers library to run the models.

Real-Time Data Processing with Bytewax

If you are not familiar with Bytewax. Bytewax is a stateful stream processor that can be used to analyze data in real time with support for stateful operators like windowing and aggregation. Bytewax is especially suitable for workflows that leverage the Python ecosystem of tools, from data crunching tools like Pandas to machine learning-focused tools like Hugging Face Transformers. It also supports a variety of data sources, including websockets.

Let's get started analyzing the news in real-time. First things first! Dependencies:

!pip install bytewax transformers torch sentencepiece websocket-client
Enter fullscreen mode Exit fullscreen mode

Constructing Our Dataflow

A Bytewax dataflow is a sequence of steps that transform data from an input source and then write it to an output. At each step an operator is used to control the flow of data; whether it should be filtered, aggregated or accumulated. Developers writing dataflows will write Python code that will do the data transformation at each step.

Input

To begin the dataflow, we'll create an input using the Alpaca websocket, which we'll use to subscribe to articles on multiple tickers. It's important to note that you'll require an Alpaca API key and secret, and it's recommended to store them as environment variables.

import os
import json

from bytewax.dataflow import Dataflow
from bytewax.inputs import ManualInputConfig, distribute

from websocket import create_connection

API_KEY = os.getenv("API_KEY")
API_SECRET = os.getenv("API_SECRET")

ticker_list = ["*"]


def input_builder(worker_index, worker_count, resume_state):
    state = resume_state or None
    worker_tickers = list(distribute(ticker_list, worker_index, worker_count))
    print({"subscribing to": worker_tickers})

    def news_input(worker_tickers, state):
        ws = create_connection("wss://stream.data.alpaca.markets/v1beta1/news")
        print(ws.recv())
        ws.send(json.dumps({"action":"auth","key":f"{API_KEY}","secret":f"{API_SECRET}"}))
        print(ws.recv())
        ws.send(json.dumps({"action":"subscribe","news":worker_tickers}))
        print(ws.recv())

        while True:
        # to use without API uncomment the below line and comment the one below that
        # articles = [{"T":"n","id":31248067,"headline":"Tesla Vehicles Could Be Banned From Leaving During A Hurricane In This State","summary":"A lawmaker in one American state could make it hard for owners of electric vehicles to get out of the state in the event of a hurricane. Here’s the potential law and why it’s important.","author":"Chris Katje","created_at":"2023-03-07T22:58:40Z","updated_at":"2023-03-07T22:58:40Z","url":"https://www.benzinga.com/news/23/03/31248067/tesla-vehicles-could-be-banned-from-leaving-during-a-hurricane-in-this-state","content":"\u003cp\u003eA lawmaker in one American state could make it hard for owners of electric vehicles to get out of the state in the event of a hurricane. Here\u0026rsquo;s the potential law and why it\u0026rsquo;s important.\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cstrong\u003eWhat Happened:\u003c/strong\u003e States have passed laws aimed at banning the sale of gas-powered vehicles in the future. One state took it a step further by seeking to ban electric vehicle \u003ca href=\"https://www.benzinga.com/news/23/01/30424292/taking-on-elon-musk-this-state-legislature-could-ban-electric-vehicle-sales-by-2035\"\u003esales in the future.\u003c/a\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003eOne of the leading states for electric vehicle purchases could now see a temporary ban on using electric vehicles during the time of a crisis.\u003c/p\u003e\r\n\r\n\u003cp\u003eFlorida Republican state Sen.\u0026nbsp;\u003cstrong\u003eJonathan Martin\u003c/strong\u003e is considering legislation to ban electric vehicles like those from \u003cstrong\u003eTesla Inc\u003c/strong\u003e (NASDAQ:\u003ca class=\"ticker\" href=\"https://www.benzinga.com/stock/TSLA#NASDAQ\"\u003eTSLA\u003c/a\u003e) to be used during hurricane evacuations in the state, according to \u003ca href=\"https://electrek.co/2023/03/06/florida-lawmaker-wants-to-ban-evs-from-hurricane-evacuations/\"\u003eElectrek\u003c/a\u003e.\u0026nbsp;\u003c/p\u003e\r\n\r\n\u003cp\u003eMartin told the state\u0026rsquo;s Department of Transportation that electric vehicles could block traffic during evacuations if they run out of battery charge.\u003c/p\u003e\r\n\r\n\u003cp\u003eMartin serves on the Committee on Environment and Natural Resources and the Select Committee on Resiliency.\u003c/p\u003e\r\n\r\n\u003cp\u003eThe Select Committee on Resiliency met with the Florida Department of Transportation executive director of transportation technologies in Florida.\u003c/p\u003e\r\n\r\n\u003cp\u003eAmong the topics discussed were the $198 million the state is going to get from the Bipartisan Infrastructure Law for electric vehicle charging infrastructure from the current administration led by \u003cstrong\u003ePresident Joe Biden.\u003c/strong\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003eThe legislation requires electric vehicle charging stations to be 50 miles apart and serve all electric vehicles.\u003c/p\u003e\r\n\r\n\u003cp\u003e\u0026ldquo;With a couple of guys behind you, you can\u0026rsquo;t get out of the car and push it to the side of the road. Traffic backs up. And what might look like a two-hour trip might turn into an eight-hour trip once you\u0026rsquo;re on the road,\u0026rdquo; Martin said.\u003c/p\u003e\r\n\r\n\u003cp\u003eMartin said his concern is with the electric vehicle infrastructure available in the state of Florida.\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cem\u003eRelated Link: \u003ca href=\"https://www.benzinga.com/trading-ideas/22/06/27568560/4-stocks-to-watch-this-hurricane-season\"\u003e4 Stocks To Watch This Hurricane Season\u0026nbsp;\u003c/a\u003e\u003c/em\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cstrong\u003eWhy It\u0026rsquo;s Important:\u003c/strong\u003e The Florida Department of Transportation told Martin it isn\u0026rsquo;t a fan of banning electric vehicles during hurricane evacuations and that it is looking into portable EV chargers.\u003c/p\u003e\r\n\r\n\u003cp\u003e\u0026ldquo;We have our emergency assistance vehicles that we deploy during a hurricane evacuation that have gas \u0026hellip; we need to provide that same level of service to electrical vehicles,\u0026rdquo; Department of Transportation director of transportation technologies \u003cstrong\u003eTrey Tillander \u003c/strong\u003esaid.\u003c/p\u003e\r\n\r\n\u003cp\u003eThe Tampa Bay Times \u003ca href=\"https://www.tampabay.com/hurricane/2023/02/24/florida-lawmaker-suggests-limiting-electric-vehicles-during-hurricane-evacuations/\"\u003ereported\u003c/a\u003e\u0026nbsp;around 1% of the vehicles in Florida are electric vehicles. One of the owners of an EV is state Sen.\u0026nbsp;\u003cstrong\u003eTina Polsky.\u003c/strong\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003e\u0026ldquo;I don\u0026rsquo;t think you can ban an electric vehicle from evacuating because that may be the only car someone has,\u0026rdquo; Polsky said.\u003c/p\u003e\r\n\r\n\u003cp\u003eIn December 2022, there were 203,094 electric vehicles registered in the state of Florida.\u003c/p\u003e\r\n\r\n\u003cp\u003eThe increased funding for charging infrastructure could help ease concerns over charging.\u003c/p\u003e\r\n\r\n\u003cp\u003eUltimately, once people are on the road headed out of the state, they likely won\u0026rsquo;t be able to stop at a charging station, similar to people not being able to quickly stop at a gas station.\u003c/p\u003e\r\n\r\n\u003cp\u003eJust like people prepare for the evacuation by filling up their vehicle with gas, owners of electric vehicles will likely need to fully charge their vehicle before evacuating the state.\u003c/p\u003e\r\n\r\n\u003cp\u003eThe comments from the state senator may have Florida residents thinking about owning at least one non-electric vehicle or a hybrid to ensure they have the best chance to exit the state without future restrictions and without the potential of running out of charge and not finding stations prevalent.\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cem\u003eRead Next:\u0026nbsp;\u003ca href=\"https://www.benzinga.com/analyst-ratings/analyst-color/23/03/31172188/tesla-analysts-praise-vertical-integration-after-investor-day-but-want-more-from-el\"\u003eTesla Analysts Praise Vertical Integration After Investor Day, But Want More From Elon Musk: \u0026#39;Long On Vision, Short On Specifics\u0026#39;\u003c/a\u003e\u003c/em\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cem\u003ePhoto:\u0026nbsp;\u003ca href=\"https://www.shutterstock.com/g/hsaduraphotos\"\u003eHenryk Sadura\u003c/a\u003e\u0026nbsp;via Shutterstock\u003c/em\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cbr /\u003e\r\n\u0026nbsp;\u003c/p\u003e\r\n","symbols":["TSLA"],"source":"benzinga"}]
          articles = json.loads(ws.recv())
          for article in articles:
            yield state, (article["source"], article)

    return news_input(worker_tickers, state)


flow = Dataflow()
flow.input("inp", ManualInputConfig(input_builder))
Enter fullscreen mode Exit fullscreen mode

The resulting data returned from the news API looks like the json shown here.

[{"T":"n","id":31248067,"headline":"Tesla Vehicles Could Be Banned From Leaving During A Hurricane In This State","summary":"A lawmaker in one American state could make it hard for owners of electric vehicles to get out of the state in the event of a hurricane. Here’s the potential law and why it’s important.","author":"Chris Katje","created_at":"2023-03-07T22:58:40Z","updated_at":"2023-03-07T22:58:40Z","url":"https://www.benzinga.com/news/23/03/31248067/tesla-vehicles-could-be-banned-from-leaving-during-a-hurricane-in-this-state","content":"\u003cp\u003eA lawmaker in one American state could make it hard for owners of electric vehicles ... ertical Integration After Investor Day, But Want More From Elon Musk: \u0026#39;Long On Vision, Short On Specifics\u0026#39;\u003c/a\u003e\u003c/em\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cem\u003ePhoto:\u0026nbsp;\u003ca href=\"https://www.shutterstock.com/g/hsaduraphotos\"\u003eHenryk Sadura\u003c/a\u003e\u0026nbsp;via Shutterstock\u003c/em\u003e\u003c/p\u003e\r\n\r\n\u003cp\u003e\u003cbr /\u003e\r\n\u0026nbsp;\u003c/p\u003e\r\n","symbols":["TSLA"],"source":"benzinga"}]
Enter fullscreen mode Exit fullscreen mode

We will use this in the next steps in our dataflow to analyze the sentiment and provide a summary.

Managing Duplicates and Updates

When working with news stories from RSS/Atom feeds or news APIs, it's common to receive duplicates as they're created and then updated. To prevent these duplicates from being analyzed multiple times and incurring additional overhead of running ML models on the same story, we'll use the Bytewax operator stateful_map to create a simplified storage layer. We'll store a list of unique identifiers for each news article we encounter. If an article has been seen before, we'll mark it as an update. Otherwise, we'll add the article's ID to the stateful object. To filter out the updates and avoid reclassifying and summarizing them, we'll use the filter operator. Think of this process as the equivalent of checking a database for a unique ID.

def update_articles(articles, news):
    if news['id'] in articles:
        news['update'] = True
    else:
        articles.append(news['id'])
        news['update'] = False
    return articles, news

flow.stateful_map("source_articles", lambda: list(), update_articles)

flow.filter(lambda x: not x[1]['update'])
Enter fullscreen mode Exit fullscreen mode

Sentiment Analysis

Sentiment analysis is the next step in our process. Our approach involves using a fine-tuned Hugging Face model to analyze the article's headline sentiment. We will be leveraging a BERT model for this purpose. BERT, which stands for Bidirectional Encoder Representations from Transformers, was developed by Google. For a detailed understanding of how this model operates and was trained, you can refer to the model card on Hugging Face or the accompanying research paper. Since we want to analyze each news article independently, the sentiment classification will take place in a map operator. Despite the extensive research that goes into designing novel model architectures and creating training datasets, implementing sentiment analysis is remarkably straightforward. Note that if you're following along in a notebook, the model will take some time to download initially.

from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline, AutoModelForSeq2SeqLM

sent_tokenizer = AutoTokenizer.from_pretrained("ahmedrachid/FinancialBERT-Sentiment-Analysis")
sent_model = AutoModelForSequenceClassification.from_pretrained("ahmedrachid/FinancialBERT-Sentiment-Analysis")
sent_nlp = pipeline("sentiment-analysis", model=sent_model, tokenizer=sent_tokenizer)

def sentiment_analysis(ticker__news):
    ticker, news = ticker__news
    sentiment = sent_nlp([news["headline"]])
    news['sentiment'] = sentiment[0]
    print(sentiment[0])
    return (ticker, news)

flow.map(sentiment_analysis)
Enter fullscreen mode Exit fullscreen mode

Article Summarization

After analyzing the article sentiment, we will utilize a BART (Bidirectional Auto-Regressive Transformers) model architecture, which is a combination of Google's BERT and OpenAI's GPT architectures, to summarize its content. Despite the significant effort that goes into creating the model, implementing it with the Hugging Face Transformers library is relatively easy. We can generate a summarization pipeline and apply it in a map step. To obtain better results, we also incorporated an extra step into this map process, which involved cleaning the text before summarizing it.

import re

# Let's create a summarization pipeline
sum_tokenizer = AutoTokenizer.from_pretrained("facebook/bart-large-cnn")
sum_model = AutoModelForSeq2SeqLM.from_pretrained("facebook/bart-large-cnn")
summarizer = pipeline("summarization", tokenizer=sum_tokenizer, model=sum_model)
tag_re = re.compile(r'(<!--.*?-->|<[^>]*>)')

def summarize(ticker__news):
    ticker, news = ticker__news
    article = news['content']
    article_no_tags = tag_re.sub('', article)
    article_no_tags = article_no_tags.replace("\r", "").replace("\n", "")
    summary = summarizer(article_no_tags, max_length=130, min_length=30, do_sample=False)
    news['bart_summary'] = summary[0]['summary_text']
    print(f"bart summary:{summary[0]['summary_text']}")
    return (ticker, news)

flow.map(summarize)
Enter fullscreen mode Exit fullscreen mode

Output

With our news analyzed, we can set a capture step to output the modified news object and then run our dataflow. For this instance we are going to write the output to StdOut so we can easily view it, but in a production system we could write the results to a downstream kafka topic or database for further analysis.

If you are following along in a notebook, remember you have to be authenticated for this to work and will need to set your Alpaca API key and secret

from bytewax.execution import run_main
from bytewax.outputs import StdOutputConfig

flow.capture(StdOutputConfig())

if __name__ == '__main__':
    run_main(flow)
Enter fullscreen mode Exit fullscreen mode

Wrapping up

While our example is simplified, it showcases the power of Bytewax and Hugging Face's language models. We can easily analyze financial news articles in real-time, identify significant events, and make informed decisions: using the Alpaca news API as our data source, we were able to construct a dataflow that deduplicate stories and summarizes the content of each article.

The ease of implementation through Python native Bytewax and the Hugging Face Transformers library makes it accessible for data engineers and researchers to utilize these state-of-the-art language models in their own projects. We hope this blog post serves as a useful guide for anyone looking to leverage real-time news analysis in their financial decision-making process.

Top comments (0)