DEV Community

Cover image for Elastic D&D - Update 8 - Streamlit Changes
Joe
Joe

Posted on • Updated on

Elastic D&D - Update 8 - Streamlit Changes

Last week we talked about setting up port forwarding in order to access the application remotely. If you missed it, you can check that out here!

Streamlit Changes

As I began working on the AI assistant, I quickly ran into an issue: Streamlit's chat widgets could not be added into a sidebar, tab, or other existing container. This meant that I either had to include it on the first "page" in my current code, which was the logon page so that was impossible, or implement a true page system. Luckily enough, Streamlit had support for this per their documentation.

Because the code is mostly the same, I won't go in-depth on most of it.

Structure

The Streamlit application now consists of 6 parts: variables, functions, a home page, a note input page, an AI assistant page, and an account page. The home page is in the same directory that "main.py" was in, along with the functions and variables, and the pages are in a new directory named "pages".

Image description

Image description

Functions and Variables

The purpose of splitting the variables and functions into their own files was mainly a cleanliness thing. It keeps everything separate and out of the page files.

Doing this and loading them into other scripts the way I did is generally frowned upon, but it worked out great for me.

functions.py

# Elastic D&D
# Author: thtmexicnkid
# Last Updated: 10/04/2023
# 
# Streamlit - Backend - Houses all functions used in pages of the application.

import json
import requests
import streamlit as st
import streamlit_authenticator as stauth
import time
import yaml
from elasticsearch import Elasticsearch
from PIL import Image
from variables import *
from yaml.loader import SafeLoader

### FUNCTIONS ###
def api_get_question_answer(question,query_results):
    # returns an answer to a question asked to virtual DM

    fastapi_endpoint = "/get_question_answer/"
    full_url = fastapi_url + fastapi_endpoint + question + "/" + query_results
    response = requests.get(full_url)

    try:
        answer = response.json()
    except:
        answer = None
        print(response.content)

    return answer

def api_get_vector_object(text):
    # returns vector object from supplied text

    fastapi_endpoint = "/get_vector_object/"
    full_url = fastapi_url + fastapi_endpoint + text
    response = requests.get(full_url)

    try:
        message_vector = response.json()
    except:
        message_vector = None
        print(response.content)

    return message_vector

def text_cleanup(text):
    punctuation = ["/", "?"]
    for symbol in punctuation:
        text = text.replace(symbol," ")

    return text

def clear_session_state(variable_list):
    # deletes variables from streamlit session state
    for variable in variable_list:
        try:
            del st.session_state[variable]
        except:
            pass

def display_image(image_path):
    # displays an image via path relative to streamlit app script
    image = Image.open(image_path)
    st.image(image)

def elastic_ai_notes_query(vector_object):
    # queries Elastic via a KNN query to return answers to questions via virtual DM

    # creates Elastic connection
    client = Elasticsearch(
        elastic_url,
        ca_certs=elastic_ca_certs,
        api_key=elastic_api_key
    )

    # sends document to index with success or failure message
    response = client.search(index="dnd-notes-*",knn={"field":"message_vector","query_vector":vector_object,"k":10,"num_candidates":100})

    return response['hits']['hits'][0]['_source']["message"]

    # close Elastic connection
    client.close()

def elastic_get_quests():
    # queries Elastic for unfinished quests and returns array    
    quest_names = []

    # creates Elastic connection
    client = Elasticsearch(
        elastic_url,
        ca_certs=elastic_ca_certs,
        api_key=elastic_api_key
    )

    # gets unfinished quests
    response = client.search(index=st.session_state.log_index,size=0,query={"bool":{"must":[{"match":{"type.keyword":"quest"}}],"must_not":[{"match":{"finished":"true"}}]}},aggregations={"unfinished_quests":{"terms":{"field":"name.keyword"}}})

    for line in response["aggregations"]["unfinished_quests"]["buckets"]:
        quest_names.append(line["key"])

    return quest_names

    # close Elastic connection
    client.close()

def elastic_index_document(index,document,status_message):
    # sends a document to an Elastic index

    # creates Elastic connection
    client = Elasticsearch(
        elastic_url,
        ca_certs=elastic_ca_certs,
        api_key=elastic_api_key
    )

    # sends document to index with success or failure message
    response = client.index(index=index,document=document)

    if status_message == True:
        if response["result"] == "created":
            success_message("Note creation successful")
        else:
            error_message("Note creation failure",2)
    else:
        pass

    # close Elastic connection
    client.close()

def elastic_kibana_setup(yml_config):
    # creates empty placeholder indices and data views for each player, as well as for transcribed notes

    # builds list of index patterns and descriptive data view names from YAML configuration
    kibana_setup = {"dnd-notes-*":"All Notes","dnd-notes-transcribed":"Audio Transcription Notes","virtual_dm-questions_answers":"Virtual DM Notes"}
    for username in yml_config["credentials"]["usernames"]:
        index = "dnd-notes-" + username
        name = yml_config["credentials"]["usernames"][username]["name"] + "'s Notes"
        kibana_setup[index] = name

    # creates indices and data views from usernames
    for entry in kibana_setup:
        index = entry
        name = kibana_setup[entry]

        # creates Elastic connection
        client = Elasticsearch(
            elastic_url,
            ca_certs=elastic_ca_certs,
            api_key=elastic_api_key
        )

        # creates index if it does not already exist
        response = client.indices.exists(index=index)
        if response != True:
            try:
                client.indices.create(index=index)
            except:
                pass

        # close Elastic connection
        client.close()

        # check if data view already exists
        url = kibana_url + "/api/data_views/data_view/" + index
        auth = "ApiKey " + elastic_api_key
        headers = {"kbn-xsrf":"true","Authorization":auth}
        response = requests.get(url,headers=headers)
        # if data view doesn't exist, create it
        if response.status_code != 200:
            url = kibana_url + "/api/data_views/data_view"
            json = {"data_view":{"title":index,"name":name,"id":index,"timeFieldName":"@timestamp"}}
            response = requests.post(url,headers=headers,json=json)
            # could put some error message here, don't think I need to yet

def elastic_update_quest_status(quest_name):
    # queries Elastic for unfinished quests and returns array

    # creates Elastic connection
    client = Elasticsearch(
        elastic_url,
        ca_certs=elastic_ca_certs,
        api_key=elastic_api_key
    )

    # gets unfinished quests
    query_response = client.search(index=st.session_state.log_index,size=10000,query={"bool":{"must":[{"match":{"name.keyword":quest_name}}],"must_not":[{"match":{"finished":"true"}}]}})

    for line in query_response["hits"]["hits"]:
        line_id = line["_id"]
        update_response = client.update(index="dnd-notes-corver_flickerspring",id=line_id,doc={"finished":st.session_state.quest_finished})

    # close Elastic connection
    client.close()

def error_message(text,timeframe):
    # displays error message    
    error = st.error(text)

    if timeframe == False:
        pass
    else:
        time.sleep(seconds)
        error.empty()

def initialize_session_state(variable_list):
    # creates empty variables in streamlit session state
    for variable in variable_list:
        if variable not in st.session_state:
            st.session_state[variable] = None

def load_yml():
    # loads login authentication configuration    
    with open(streamlit_project_path + "auth.yml") as file:
        config = yaml.load(file, Loader=SafeLoader)

    authenticator = stauth.Authenticate(
        config['credentials'],
        config['cookie']['name'],
        config['cookie']['key'],
        config['cookie']['expiry_days'],
        config['preauthorized']
    )

    return config, authenticator

def success_message(text):
    # displays success message
    success = st.success(text)
    time.sleep(2)
    success.empty()

def transcribe_audio(file):
    # transcribes an audio file to text

    # get file url
    headers = {'authorization':assemblyai_api_key}
    response = requests.post('https://api.assemblyai.com/v2/upload',headers=headers,data=file)
    url = response.json()["upload_url"]
    # get transcribe id
    endpoint = "https://api.assemblyai.com/v2/transcript"
    json = {"audio_url":url}
    headers = {"authorization":assemblyai_api_key,"content-type":"application/json"}
    response = requests.post(endpoint, json=json, headers=headers)
    transcribe_id = response.json()['id']
    result = {}
    #polling
    while result.get("status") != "processing":
        # get text
        endpoint = f"https://api.assemblyai.com/v2/transcript/{transcribe_id}"
        headers = {"authorization":assemblyai_api_key}
        result = requests.get(endpoint, headers=headers).json()

    while result.get("status") != 'completed':
        # get text
        endpoint = f"https://api.assemblyai.com/v2/transcript/{transcribe_id}"
        headers = {"authorization":assemblyai_api_key}
        result = requests.get(endpoint, headers=headers).json()

    return result['text']

def update_yml():
    # updates login authentication configuration file
    with open(streamlit_project_path + "auth.yml", 'w') as file:
        yaml.dump(config, file, default_flow_style=False)
Enter fullscreen mode Exit fullscreen mode

variables.py

# Elastic D&D
# Author: thtmexicnkid
# Last Updated: 10/03/2023
# 
# Streamlit - Backend - Houses variables that are loaded into pages of the application.

### VARIABLES ###
# *** change this to fit your environment ***
assemblyai_api_key = "API_KEY"
elastic_api_key = "API_KEY"

# *** DO NOT CHANGE ***
elastic_url = "https://es01:9200"
elastic_ca_certs = "certs/ca/ca.crt"
fastapi_url = "http://api:8000"
kibana_url = "http://kibana:5601"
streamlit_data_path = "data/"
streamlit_project_path = "streamlit/"
Enter fullscreen mode Exit fullscreen mode

The Home Page

The home page consists mostly of the old logon page code. When logged in, however, it now displays a welcome message and instructions on how to use the application!

# Elastic D&D
# Author: thtmexicnkid
# Last Updated: 10/04/2023
# 
# Streamlit - Main Page - Displays a welcome message and explains how to navigate and use the application.

import streamlit as st
from functions import *
from variables import *

# displays application title
display_image(streamlit_data_path + "banner.png")

# initializes session state, loads login authentication configuration, and performs index/data view setup in Elastic
initialize_session_state(["username"])
config, authenticator = load_yml()
elastic_kibana_setup(config)

# makes user log on to view page
if not st.session_state.username:
    # displays login and registration widgets
    tab1, tab2 = st.tabs(["Login", "Register"])
    # login tab
    with tab1:
        try:
            name,authentication_status,username = authenticator.login("Login","main")
            if authentication_status:
                st.rerun()
            elif authentication_status == False:
                error_message('Username/password is incorrect')
            elif authentication_status == None:
                st.warning('Please enter your username and password')
        except:
            pass
    # registration tab
    with tab2:
        try:
            if authenticator.register_user('Register', preauthorization=True):
                success('User registered successfully')
                update_yml()
        except Exception as e:
            error_message(e)
else:
    st.header('Welcome!',divider=True)
    welcome_message = '''
    ## Elastic D&D is an ongoing project to facilitate note-taking and other functions derived from elements of D&D (Veverbot the AI assistant, roll data, etc.)

    ### You can navigate between pages of the application with the sidebar on the left:
    ##### The Home page is where you can go to refresh your memory on how to use the Elastic D&D application.
    ##### The Note Input page is used for storing notes for viewing and use with Virtual DM functions. Currently, you can input notes via an audio file or text.
    ##### The Veverbot page is an active chat session with your own personal AI assistant! Ask Veverbot questions about your campaign and it will give you answers, hopefully.
    ##### The Account page is used for changing your password and logging off.

    ### Stay up-to-date with the progress of this project on the [Github](https://github.com/thtmexicnkid/elastic-dnd) and the [blog](https://dev.to/thtmexicnkid)!

    ## **Thanks for using Elastic D&D!**
    '''
    st.markdown(welcome_message)
Enter fullscreen mode Exit fullscreen mode

The Note Input Page

The note input page consists mostly of the old note input tab code. If trying to access this page while not logged in, it will display an unauthorized message.

# Elastic D&D
# Author: thtmexicnkid
# Last Updated: 10/04/2023
# 
# Streamlit - Note Input Page - Allows the user to store audio or text notes in Elasticsearch.

import streamlit as st
from functions import *
from variables import *

# displays application title and sets page accordingly
display_image(streamlit_data_path + "banner.png")

# initializes session state, loads login authentication configuration, and performs index/data view setup in Elastic
initialize_session_state(["username"])
config, authenticator = load_yml()

# makes user log on to view page
if not st.session_state.username:
    error_message("UNAUTHORIZED: Please login on the Home page.",False)
else:
    st.header('Note Input',divider=True)
    st.session_state["log_index"] = "dnd-notes-" + st.session_state.username
    st.session_state["note_type"] = st.selectbox("Audio or Text?", ["Audio","Text"], index=0)
    # runs app_page2_* functions depending on what is selected in selectbox
    if st.session_state.note_type == "Audio":
        #list of variables to clear from session state once finished
        audio_form_variable_list = ["log_type","log_session","file","submitted","transcribed_text","log_payload","message_vector"]

        # displays note form widgets, creates note payload, sends payload to an Elastic index, and handles error / success / warning messages
        with st.form("audio_form", clear_on_submit=True):
            st.session_state["log_type"] = "audio"
            st.session_state["log_session"] = st.slider("Which session is this?", 0, 250)
            st.session_state["file"] = st.file_uploader("Choose audio file",type=[".3ga",".8svx",".aac",".ac3",".aif",".aiff",".alac",".amr",".ape",".au",".dss",".flac",".flv",".m2ts",".m4a",".m4b",".m4p",".m4p",".m4r",".m4v",".mogg",".mov",".mp2",".mp3",".mp4",".mpga",".mts",".mxf",".oga",".ogg",".opus",".qcp",".ts",".tta",".voc",".wav",".webm",".wma",".wv"])
            st.session_state["submitted"] = st.form_submit_button("Upload file")
            if st.session_state.submitted and st.session_state.file is not None:
                # removes forward slash that will break the API call for AI functionality
                st.session_state["transcribed_text"] = text_cleanup(transcribe_audio(st.session_state.file))
                if st.session_state.transcribed_text is not None:
                    # gets vector object for use with AI functionality
                    st.session_state["message_vector"] = api_get_vector_object(st.session_state.transcribed_text)
                    if st.session_state.message_vector == None:
                        error_message("AI API vectorization failure",2)
                    else:
                        st.session_state["log_payload"] = json.dumps({"session":st.session_state.log_session,"type":st.session_state.log_type,"message":st.session_state.transcribed_text,"message_vector":st.session_state.message_vector})
                        elastic_index_document("dnd-notes-transcribed",st.session_state.log_payload,True)
                else:
                    error_message("Audio transcription failure",2)
            else:
                st.warning('Please upload a file and submit')

        # clears session state
        clear_session_state(audio_form_variable_list)
    elif st.session_state.note_type == "Text":
        #list of variables to clear from session state once finished
        text_form_variable_list = ["log_type","log_session","note_taker","log_index","quest_type","quest_name","quest_finished","log_message","submitted","log_payload","message_vector"]

        # displays note form widgets, creates note payload, sends payload to an Elastic index, and handles error / success / warning messages
        st.session_state["log_type"] = st.selectbox("What kind of note is this?", ["location","miscellaneous","overview","person","quest"])
        # displays note form for quest log type
        if st.session_state.log_type == "quest":
            st.session_state["quest_type"] = st.selectbox("Is this a new or existing quest?", ["New","Existing"])
            if st.session_state.quest_type == "New":
                with st.form("text_form_new_quest", clear_on_submit=True):
                    st.session_state["log_session"] = st.slider("Which session is this?", 0, 250)
                    st.session_state["quest_name"] = st.text_input("What is the name of the quest?")
                    st.session_state["quest_finished"] = st.checkbox("Did you finish the quest?")
                    # removes forward slash that will break the API call for AI functionality
                    st.session_state["log_message"] = text_cleanup(st.text_area("Input note text:"))
                    st.session_state["submitted"] = st.form_submit_button("Upload note")
                    if st.session_state.submitted == True and st.session_state.log_message is not None:
                        # gets vector object for use with AI functionality
                        st.session_state["message_vector"] = api_get_vector_object(st.session_state.log_message)
                        if st.session_state.message_vector == None:
                            error_message("AI API vectorization failure",2)
                        else:
                            st.session_state["log_payload"] = json.dumps({"finished":st.session_state.quest_finished,"message":st.session_state.log_message,"name":st.session_state.quest_name,"session":st.session_state.log_session,"type":st.session_state.log_type,"message_vector":st.session_state.message_vector})
                            elastic_index_document(st.session_state.log_index,st.session_state.log_payload,True)
                            st.rerun()
                    else:
                        st.warning('Please input note text and submit')
            else:
                quest_names = elastic_get_quests()
                with st.form("text_form_existing_quest", clear_on_submit=True):
                    st.session_state["log_session"] = st.slider("Which session is this?", 0, 250)
                    st.session_state["quest_name"] = st.selectbox("Which quest are you updating?", quest_names)
                    st.session_state["quest_finished"] = st.checkbox("Did you finish the quest?")
                    st.session_state["log_message"] = text_cleanup(st.text_area("Input note text:"))
                    st.session_state["submitted"] = st.form_submit_button("Upload note")
                    if st.session_state.submitted == True and st.session_state.log_message is not None:
                        # updates previous quest records to finished: true
                        if st.session_state.quest_finished == True:
                            elastic_update_quest_status(st.session_state.quest_name)
                        else:
                            pass
                        # gets vector object for use with AI functionality
                        st.session_state["message_vector"] = api_get_vector_object(st.session_state.log_message)
                        if st.session_state.message_vector == None:
                            error_message("AI API vectorization failure",2)
                        else:
                            st.session_state["log_payload"] = json.dumps({"finished":st.session_state.quest_finished,"message":st.session_state.log_message,"name":st.session_state.quest_name,"session":st.session_state.log_session,"type":st.session_state.log_type,"message_vector":st.session_state.message_vector})
                            elastic_index_document(st.session_state.log_index,st.session_state.log_payload,True)
                            st.rerun()
                    else:
                        st.warning('Please input note text and submit')
        # displays note form for all other log types
        else:
            with st.form("text_form_wo_quest", clear_on_submit=True):
                st.session_state["log_session"] = st.number_input("Which session is this?", 0, 250)
                st.session_state["log_message"] = text_cleanup(st.text_area("Input note text:"))
                st.session_state["submitted"] = st.form_submit_button("Upload Note")
                if st.session_state.submitted == True and st.session_state.log_message is not None:
                    # gets vector object for use with AI functionality
                    st.session_state["message_vector"] = api_get_vector_object(st.session_state.log_message)
                    if st.session_state.message_vector == None:
                        error_message("AI API vectorization failure",2)
                    else:
                        st.session_state["log_payload"] = json.dumps({"message":st.session_state.log_message,"session":st.session_state.log_session,"type":st.session_state.log_type,"message_vector":st.session_state.message_vector})
                        elastic_index_document(st.session_state.log_index,st.session_state.log_payload,True)
                        st.rerun()
                else:
                    st.warning('Please input note text and submit')

        # clears session state
        clear_session_state(text_form_variable_list)
    else:
        pass
Enter fullscreen mode Exit fullscreen mode

The AI Assistant Page

Meet Veverbot, your D&D AI assistant! This page is all brand new code. I will be getting into this in-depth in a couple of weeks, but here is your preview! If trying to access this page while not logged in, it will display an unauthorized message.

# Elastic D&D
# Author: thtmexicnkid
# Last Updated: 10/04/2023
# 
# Streamlit - Virtual DM Page - Allows the user to ask questions and receive answers automatically.

import streamlit as st
from functions import *
from variables import *

# displays application title and sets page accordingly
display_image(streamlit_data_path + "banner.png")

# initializes session state, loads login authentication configuration, and performs index/data view setup in Elastic
initialize_session_state(["username"])
config, authenticator = load_yml()

# makes user log on to view page
if not st.session_state.username:
    error_message("UNAUTHORIZED: Please login on the Home page.",False)
else:
    st.header('Veverbot',divider=True)
    st.session_state["log_index"] = "dnd-notes-" + st.session_state.username
    virtual_dm_variable_list = ["question","response","question_vector","query_results","answer","answer_vector","log_payload"]

    # Initialize chat history
    if "messages" not in st.session_state:
        st.session_state.messages = []

    # Display chat messages from history on app rerun
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.markdown(message["content"])

    # React to user input
    st.session_state["question"] = st.chat_input("Ask Veverbot a question")
    if st.session_state.question:
        st.session_state["question"] = text_cleanup(st.session_state.question)
        # Display user message in chat message container
        st.chat_message("user").markdown(st.session_state.question)
        # Add user message to chat history
        st.session_state.messages.append({"role": "user", "content": st.session_state.question})
        # Display assistant response in chat message container
        response = f"Veverbot searching for answer to the question -- \"{st.session_state.question}\""
        with st.chat_message("assistant"):
            st.markdown(response)
            st.session_state.messages.append({"role": "assistant", "content": response})
            # gets vector object for use with AI functionality
            st.session_state["question_vector"] = api_get_vector_object(st.session_state.question)
            if st.session_state.question_vector == None:
                error_message("AI API vectorization failure",2)
            else:
                st.session_state["query_results"] = elastic_ai_notes_query(st.session_state.question_vector)
                st.session_state["answers"] = api_get_question_answer(st.session_state.question,st.session_state.query_results)
                for answer in st.session_state.answers:
                    st.markdown(answer)
                    st.session_state.messages.append({"role": "assistant", "content": answer})
                    st.session_state["answer_vector"] = api_get_vector_object(answer)
                    if st.session_state.answer_vector == None:
                        error_message("AI API vectorization failure",2)
                    else:
                        st.session_state["log_payload"] = json.dumps({"question":st.session_state.question,"question_vector":st.session_state.question_vector,"answer":answer,"answer_vector":st.session_state.answer_vector})
                        elastic_index_document("virtual_dm-questions_answers",st.session_state.log_payload,False)

    clear_session_state(virtual_dm_variable_list)
Enter fullscreen mode Exit fullscreen mode

The Account Page

The note input page consists mostly of the old account tab code. If trying to access this page while not logged in, it will display an unauthorized message.

# Elastic D&D
# Author: thtmexicnkid
# Last Updated: 10/04/2023
# 
# Streamlit - Account Page - Allows the user to change their password and log out.

import streamlit as st
from functions import *
from variables import *

# displays application title and sets page accordingly
display_image(streamlit_data_path + "banner.png")

# initializes session state, loads login authentication configuration, and performs index/data view setup in Elastic
initialize_session_state(["username"])
config, authenticator = load_yml()

# makes user log on to view page
if not st.session_state.username:
    error_message("UNAUTHORIZED: Please login on the Home page.",False)
else:
    st.header('Account',divider=True)
    try:
        if authenticator.reset_password(st.session_state.username, 'Reset password'):
            success_message('Password modified successfully')
            update_yml()
    except Exception as e:
        error_message(e,2)
    authenticator.logout('Logout', 'main')
Enter fullscreen mode Exit fullscreen mode

Closing Remarks

I really like the way that the Streamlit application turned out. It is organized, neat, and functions great with this page structure.

Next week, I want to get into the code for the API I am actively working on. Mostly, it is handling functions for the AI assistant, so it would make sense to get into that next.

Check out the GitHub repo below. You can also find my Twitch account in the socials link, where I will be actively working on this during the week while interacting with whoever is hanging out!

GitHub Repo
Socials

Happy Coding,
Joe

Top comments (0)