DEV Community

Cover image for Quick tip: Analysing Stock Tick Data in SingleStoreDB using LangChain and OpenAI's Whisper
Akmal Chaudhri for SingleStore

Posted on • Updated on

Quick tip: Analysing Stock Tick Data in SingleStoreDB using LangChain and OpenAI's Whisper

Disclaimer

The stock data used in this article is entirely fictitious. It is purely for demo purposes. Please do not use this data for making any financial decisions.

Abstract

This article will show how to connect a Kafka broker, streaming example stock tick data, to SingleStoreDB. We'll then query the data using English sentences through LangChain, which provides a basic question-and-answer capability for the tick data. We'll build a Python application, through several design iterations, to use OpenAI's Whisper to ask questions through speech and use speech synthesis to reply.

The notebook file, SQL and Python code are available on GitHub.

Introduction

The ability to ask questions about a database system using natural language is familiar. However, it has become much easier to implement with modern tools like LangChain and OpenAI's Whisper. In this article, we'll see how.

Create a SingleStoreDB Cloud account

A previous article showed the steps to create a free SingleStoreDB Cloud account. We'll use the following settings:

  • Workspace Group Name: Whisper Demo Group
  • Cloud Provider: AWS
  • Region: US East 1 (N. Virginia)
  • Workspace Name: whisper-demo
  • Size: S-00
  • Settings:
    • None selected

Once the workspace is available, we'll make a note of our password, host and port. The host and port will be available from whisper-demo > Connect > SQL IDE > Host. We'll need this information later for a Python application. We'll temporarily allow access from anywhere by configuring the firewall under Whisper Demo Group > Firewall.

Create a Database and Tables

From the left navigation pane, we'll select DEVELOP > Data Studio > Open SQL Editor to create a timeseries_db database, tick and stock_sentiment tables, as follows:

DROP DATABASE IF EXISTS timeseries_db;
CREATE DATABASE IF NOT EXISTS timeseries_db;

USE timeseries_db;

DROP TABLE IF EXISTS tick;
CREATE TABLE IF NOT EXISTS tick (
    symbol VARCHAR(10),
    ts     DATETIME SERIES TIMESTAMP,
    open   NUMERIC(18, 2),
    high   NUMERIC(18, 2),
    low    NUMERIC(18, 2),
    price  NUMERIC(18, 2),
    volume INT,
    KEY(ts)
);

DROP TABLE IF EXISTS stock_sentiment;
CREATE TABLE IF NOT EXISTS stock_sentiment (
    headline  VARCHAR(250),
    positive  FLOAT,
    negative  FLOAT,
    neutral   FLOAT,
    url       TEXT,
    publisher VARCHAR(30),
    ts        DATETIME,
    symbol    VARCHAR(10)
);
Enter fullscreen mode Exit fullscreen mode

Create a Pipeline

Pipelines allow us to create streaming ingest feeds from various sources, such as Kafka, S3 and HDFS, using a single command. With pipelines, we can also perform ETL operations.

For our use case, we'll create a simple pipeline in SingleStoreDB as follows:

CREATE PIPELINE tick
AS LOAD DATA KAFKA 'public-kafka.memcompute.com:9092/stockticker'
BATCH_INTERVAL 45000
INTO TABLE tick
FIELDS TERMINATED BY ','
(symbol,ts,open,high,low,price,volume);
Enter fullscreen mode Exit fullscreen mode

We'll control the rate of data ingestion using the BATCH_INTERVAL. Initially, we'll set this to 45000 milliseconds.

We'll configure the pipeline to start from the earliest offset, as follows:

ALTER PIPELINE tick SET OFFSETS EARLIEST;
Enter fullscreen mode Exit fullscreen mode

and we'll test the pipeline before we start running it, as follows:

TEST PIPELINE tick LIMIT 1;
Enter fullscreen mode Exit fullscreen mode

The output should be similar to the following:

+--------+---------------------+-------+-------+-------+-------+--------+
| symbol | ts                  | open  | high  | low   | price | volume |
+--------+---------------------+-------+-------+-------+-------+--------+
| AIV    | 2023-09-05 06:47:53 | 44.89 | 44.89 | 44.88 | 44.89 |    719 |
+--------+---------------------+-------+-------+-------+-------+--------+
Enter fullscreen mode Exit fullscreen mode

Remember, this is fictitious data

We'll now start the pipeline:

START PIPELINE tick;
Enter fullscreen mode Exit fullscreen mode

Load Stock Sentiment Data

We'll now load the data into the stock_sentiment table. The data are derived from Kaggle. First, we'll unpack the zip file that contains the SQL file. Next, we'll load the SQL file into SingleStoreDB Cloud using a MySQL client, as follows:

mysql -u admin -p<password> -h <host> timeseries_db < /path/to/stock_sentiment.sql
Enter fullscreen mode Exit fullscreen mode

The <password> and <host> being replaced with the values we obtained from SingleStoreDB Cloud earlier. The /path/to/ replaced with the actual path to where the SQL file is located.

Load the Notebook

We'll use the notebook file available on GitHub. From the left navigation pane in SingleStoreDB Cloud, we'll select Data Studio. In the top right of the web page will be New Notebook with a pulldown that has two options:

  1. New Notebook
  2. Import From File

We'll select the second option, locate the notebook file we downloaded from GitHub and load it into SingleStoreDB Cloud.

Run the Notebook

We'll start by selecting the Connection (whisper-demo) and Database (timeseries_db) using the drop-down menus above the notebook.

Analysing Time Series Data

Part 1 of the notebook contains a set of Time Series operations on the data in the tick table. These operations were described in greater detail in a previous article.

LangChain OnlinePDFLoader

Part 2 of the notebook loads the contents of a PDF document and vector embeddings into a table called fintech_docs. These operations were described in greater detail in a previous article. Replace <PDF document URL> with the hyperlink of your chosen FinTech document:

loader = OnlinePDFLoader("<PDF document URL>")

data = loader.load()
Enter fullscreen mode Exit fullscreen mode

The Fully Qualified Domain Name (FQDN) where the PDF file is located must be added to the firewall by selecting the Data Studio > Firewall option.

We'll use ChatGPT to answer questions on the PDF file, such as:

"What are the best investment opportunities in Blockchain?"
Enter fullscreen mode Exit fullscreen mode

LangChain SQL Agent

Part 3 of the notebook contains LangChain agent operations on the data in the tick and stock_sentiment tables. This will be the main focus of this article.

We'll configure the LangChain toolkit and agent, as follows:

db = SQLDatabase.from_uri(connection_url, include_tables = ["tick", "stock_sentiment"])

llm = ChatOpenAI(model = "gpt-4o-mini", temperature = 0, verbose = False)

toolkit = SQLDatabaseToolkit(db = db, llm = llm)

agent_executor = create_sql_agent(
    llm = llm,
    toolkit = toolkit,
    max_iterations = 15,
    max_execution_time = 60,
    top_k = 3,
    verbose = False
)
Enter fullscreen mode Exit fullscreen mode

We'll now test the code using an example query, as follows:

query1 = (
    "From the tick table, which stock symbol saw the\n"
    "least volatility in share trading in the dataset?"
)

result1 = run_agent_query(query1, agent_executor, error_string)
print(result1)
Enter fullscreen mode Exit fullscreen mode

Here is some example output:

The stock symbol with the least volatility in share trading is FTR, with a volatility of 0.55.
Enter fullscreen mode Exit fullscreen mode

We'll make this a little more interactive using the following code:

query2 = input("Please enter your question:")

result2 = run_agent_query(query2, agent_executor, error_string)
print(result2)
Enter fullscreen mode Exit fullscreen mode

We'll test this with an example query:

How many records are in the tick table?
Enter fullscreen mode Exit fullscreen mode

Here is some example output:

There are 21,188,124 records in the tick table.
Enter fullscreen mode Exit fullscreen mode

To see the chain output, we'll set verbose to True, as follows:

agent_executor = create_sql_agent(
    llm = llm,
    toolkit = toolkit,
    max_iterations = 15,
    max_execution_time = 60,
    top_k = 3,
    verbose = True
)
Enter fullscreen mode Exit fullscreen mode

An example of chain output from another database was shown in a previous article.

Bonus: Build a Visual Python Application

We'll now build a simple Python application that uses OpenAI's Whisper to ask questions about the database system. An excellent article (🔗❌) inspired this application.

Install the Required Software

For this article, this is the software that was required in a clean install of Ubuntu 22.04.2 running in a VMware Fusion Virtual Machine:

sudo apt install ffmpeg
sudo apt install libespeak1
sudo apt install portaudio19-dev
sudo apt install python3-tk
sudo apt install python3-pil python3-pil.imagetk
Enter fullscreen mode Exit fullscreen mode

as well as the following packages:

langchain
langchain-community
langchain-openai
matplotlib
openai
openai-whisper
pyaudio
pyttsx3
sqlalchemy-singlestoredb
wave
Enter fullscreen mode Exit fullscreen mode

These can be found in the requirements.txt file on GitHub. Run the file as follows:

pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode

openai-whisper may take a while to install.

We'll need to provide an OpenAI API Key in our environment. For example:

export OPENAI_API_KEY="<OpenAI API Key>"
Enter fullscreen mode Exit fullscreen mode

Replace <OpenAI API Key> with your key.

In each of the following applications, we have the following code:

s2_user = "<username>"
s2_password = "<password>"
s2_host = "<host>"
s2_port = <port>
s2_db = "<database>"
db = SQLDatabase.from_uri(
        f"singlestoredb://{s2_user}:{s2_password}@{s2_host}:{s2_port}/{s2_db}"
        "?ssl_ca=/path/to/singlestore_bundle.pem",
        include_tables = ["tick", "stock_sentiment"]
)
Enter fullscreen mode Exit fullscreen mode

We'll fill-in the values from our environment. The singlestore_bundle.pem file is required if using the Free Shared Tier, otherwise this line can be removed:

"?ssl_ca=/path/to/singlestore_bundle.pem"
Enter fullscreen mode Exit fullscreen mode

First Iteration

Let's start with a simple visual Python application using Tkinter. We'll also add voice recognition using OpenAI's Whisper. The application enables up to 20 seconds of recorded speech. It can be run as follows:

python3 record-transcribe.py
Enter fullscreen mode Exit fullscreen mode

Example output is shown in Figure 1.

Figure 1. First Iteration.

Figure 1. First Iteration.

Second Iteration

In the next iteration, we'll add an Audio Waveform. We'll run the program as follows:

python3 record-transcribe-visualise.py
Enter fullscreen mode Exit fullscreen mode

Example output is shown in Figure 2.

Figure 2. Second Iteration.

Figure 2. Second Iteration.

Third Iteration

In the third and final iteration, we'll remove a text-based response by the application and replace it with speech synthesis. We'll run the program as follows:

python3 record-transcribe-visualise-speak.py
Enter fullscreen mode Exit fullscreen mode

Example output is shown in Figure 3.

Figure 3. Third Iteration.

Figure 3. Third Iteration.

In the code for all three iterations, we can call the OpenAI Whisper API instead of using the local Whisper installation. To do this, we'd uncomment these lines of code in the transcribe_audio function:

            # from openai import OpenAI
            # client = OpenAI()
            # transcript = client.audio.transcriptions.create(
            #     model = "whisper-1",
            #     file = audio_file,
            #     response_format = "text",
            #     language = "en"
            # )
            # return transcript.strip()
Enter fullscreen mode Exit fullscreen mode

and comment out these lines of code:

            transcript = model.transcribe(filename)
            return transcript["text"].strip()
Enter fullscreen mode Exit fullscreen mode

However, calling the OpenAI Whisper API would incur additional costs. The local Whisper installation already provides excellent results.

Summary

In this article, we've seen that without creating vector embeddings, we've been able to access our data quite effectively using LangChain. However, our queries need to be more focused, and we need to understand the database schema before asking questions. Integrating a speech capability enables our applications to be more widely used.

Top comments (0)