DEV Community

Aneesh Arora
Aneesh Arora

Posted on

Why we use Server Sent Events and how to implement them in FastAPI

I, Aneesh Arora, am the cofounder and CTO of Afterword and have written this post to describe the advantages of using Server Sent Events for generative AI applications and how to implement them using FastAPI.

When it came to implementing SEE using FastAPI back in July 2022 I could find only two articles on the internet and neither of them were very helpful. There might be more articles today but all of them fail to explain certain basic concepts clearly. Hence I am attempting to mitigate that gap.

You can also read this on our website.

Challenges in AI Model Integration

  • Speed and Content Limitation: When building an application with AI models, a key challenge is the speed of content creation and the limit on the amount of content that can be processed at once.

Enhancing User Experience in Afterword

  • Dual Tasks: Afterword must scrape web articles and condense their content, both time-consuming tasks.
  • Managing User Expectations: Users expect quick results. It's vital to:
    • Provide partial summaries during processing.
    • Update users on the process status.
  • Purpose: This ensures users know the application is active and not stalled.

Technical Solutions for Timely Updates

  • WebSockets vs. Server-Sent Events (SSE):
    • WebSockets: Used in bidirectional communication (e.g., WhatsApp), but resource-intensive.
    • Server-Sent Events (SSE): Offers a unidirectional communication path.
      • Server broadcasts updates.
      • Client receives updates passively, different from traditional HTTP requests where client has to make a new request for every update.

Why Choose SSE?

  • Efficient Broadcasting: Chosen for its efficiency in broadcasting updates.
  • Industry Validation: Similar approach used by OpenAI for ChatGPT.
  • Early Adoption: We implemented SSE in July 2022, predating ChatGPT's launch.

Implementing SSE in Afterword

Having settled on Server-Sent Events (SSE) for Afterword, our next step was to implement it practically. For Afterword's backend, we chose FastAPI, a leading Python framework, aligning with our decision to use Python for our AI functionalities. I'll delve into Afterword's complete tech stack in a subsequent, more detailed blog post.

SSE implementation involves two main components: the server and the client. On the server side, we needed to integrate SSE within the FastAPI framework. Regarding the client side, the implementation of SSE in JavaScript will be the same no matter which frontend framework you choose to work with unless you use HTMX (the frontend framework for javascript haters. Just kidding, it’s really cool do check it out).

FastAPI, built upon the Starlette framework, can leverage a third-party library specifically designed for Server-Sent Events (SSE), namely sse-starlette. Implementing SSE in FastAPI using this library is straightforward. Instead of returning a typical response from a FastAPI route, you use EventSourceResponse(generator_function()).

To understand this better, let's clarify what a generator function is. In Python, a generator function is used to return a sequence of values over time, each time it's iterated, by employing the yield statement. This differs from a normal function, which returns a single value using the return statement.

For instance, a standard function is structured as follows:

def normal_function():
    #Perform some action
    return final_value
Enter fullscreen mode Exit fullscreen mode

In contrast, a generator function looks like this:

def generator_function():
    #Perform some action
    yield first_value
    #Perform more action
    yield second_value
    #Perform even more action
    yield final_value
Enter fullscreen mode Exit fullscreen mode

Each yield in the generator function produces a value that can be sent to the client as part of an SSE stream. This allows for real-time data transmission to the client, making it an ideal approach for applications that require continuous data updates, like Afterword.

Standard vs SSE route

In FastAPI, a standard route for a simple request-response cycle looks quite straightforward. Here's an example of such a route:

@app.get("/normal")
def hello_world():
    return {"Hello": "World"}
Enter fullscreen mode Exit fullscreen mode

This route responds with a simple JSON object upon a GET request.

However, for a route that implements Server-Sent Events (SSE), the structure changes to accommodate the SSE pattern. This is achieved by returning an EventSourceResponse with a generator function. Here's how the SSE route would look:

@app.get("/SSE")
def server_sent_events():
    return EventSourceResponse(generator_function())
Enter fullscreen mode Exit fullscreen mode

With the generator function you've described, this SSE route will send three separate updates to the client (browser) before ceasing transmission. Each yield in your generator function corresponds to a separate update sent to the client.

Message format for SSE

When using SSE, it's crucial that the messages sent to the browser adhere to the structure specified in the HTML format. Each message consists of various fields, each on a new line, defined as fieldname: value. The key fields include:

  1. event: Specifies the event type. If this is set, it triggers a browser event for which listeners can be added using addEventListener(). If not set, the onmessage handler is used.
  2. data: Sets the event ID.
  3. id: Contains the actual message data.
  4. retry: Defines the reconnection time in milliseconds, instructing the browser how long to wait before attempting to reconnect if the connection is lost.

All other field names are ignored by the browser.

Events are optional but they are helpful. For example by setting an event name - β€œend” we can tell the browser when to stop listening for more messages. Of course the same can be achieved based on certain input received through the data field as well.

In your generator function, the messages are structured to include these key fields. For instance:

def generator_function():
    yield {
        "event": "event_name",
        "id": event_id, #any unique id
        "retry": 15000, #15s
        "data": json.dumps({"message": "message text"}) #actual data being sent
    }
Enter fullscreen mode Exit fullscreen mode

Now the reason we use json.dumps is because Python data structures cannot be directly sent over a network. json.dumps serializes these structures into a string representation, which can be transmitted over the network.

Handling SSE in browser using javascript

Receiving Server-Sent Events (SSE) in the browser using JavaScript is relatively straightforward and involves setting up an EventSource object to listen for messages from the server. Here's a breakdown of how it works based on your provided code:

1. Initialize the EventSource:

const evtSource = new EventSource(URL);
Enter fullscreen mode Exit fullscreen mode

This line creates a new EventSource instance, connecting to the specified URL (where your FastAPI server sends SSE).

2. Listen for Specific Events:

evtSource.addEventListener("event_name", function(event) {
    let data = JSON.parse(event.data);
    // Use the data
});
Enter fullscreen mode Exit fullscreen mode

Here, you're adding an event listener for a specific event type, "event_name". When the event is received, the callback function is executed. The event data, which is a JSON string, is parsed back into a JavaScript object.

3. Listen for a Termination Event:

evtSource.addEventListener("end", function(event) {
    evtSource.close();
});
Enter fullscreen mode Exit fullscreen mode

This listener waits for an "end" event, signaling that no more messages will be sent. Upon receiving this event, it closes the EventSource connection.

4. Error Handling:

evtSource.onerror = function(event) {
    // Handle error
};
Enter fullscreen mode Exit fullscreen mode

This function will be called if an error occurs with the EventSource connection. You can implement specific error handling logic here.

This setup ensures that your browser client is continuously listening for messages sent from your FastAPI server via SSE. It processes each message as it arrives and responds accordingly, whether that's updating the UI, triggering further actions, or closing the connection. This method is efficient for real-time applications, as it reduces the need for repeated polling and provides a more interactive user experience.

Complete working example

Now that we know how to send and receive SSE let us look at a full example.

In this we will assume that we are looping over an array and generating data and sending to the browser.

1. Install all dependencies:

pip install fastapi
pip install uvicorn[standard]
pip install sse-starlette
Enter fullscreen mode Exit fullscreen mode

2. Create a python file 'sse.py' and copy the below code to it:

from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import time
import json
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

# Set up CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Allows all origins
    allow_credentials=True,
    allow_methods=["*"],  # Allows all methods
    allow_headers=["*"],  # Allows all headers
)

@app.get("/SSE")
async def SSE():
    return EventSourceResponse(generator_function())

RETRY_TIMEOUT = 15000 #15s

async def generator_function():
    #We are sleeping to simulate CPU processing
    time.sleep(2) #sleep for 2 seconds
    yield {"event": "message","id": 1,"retry": RETRY_TIMEOUT,"data": json.dumps({"message":"1st SSE"})}
    time.sleep(1)
    yield {"event": "message","id": 2,"retry": RETRY_TIMEOUT,"data": json.dumps({"message":"2nd SSE"})}
    #Loop over a list and send SSE
    messages = ["data 1","data 2","data 3"]
    for message in messages:
        time.sleep(1)
        yield {"event": "message","id": 3,"retry": RETRY_TIMEOUT,"data": json.dumps({"message":message})}
    time.sleep(1)
    yield {"event": "end","id": 4,"retry": RETRY_TIMEOUT,"data": json.dumps({"message":"last SSE"})}
Enter fullscreen mode Exit fullscreen mode

3. CORS Middleware Configuration:

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Allows all origins
    allow_credentials=True,
    allow_methods=["*"],  # Allows all methods
    allow_headers=["*"],  # Allows all headers
)
Enter fullscreen mode Exit fullscreen mode

This sets up CORS to allow requests from any origin, which is crucial for API accessibility from different domains. Since we will be making requests using javascript.

4. Start FastAPI server:

uvicorn sse:app
Enter fullscreen mode Exit fullscreen mode

5. Setup a svelte project: You can use any framework you like

npm create svelte@latest sse
cd sse
npm install
Enter fullscreen mode Exit fullscreen mode

6. Overwrite content of sse/src/routes/+page.svelte with the code below:

<script>
let message = "";
async function SSE()
{
  const evtSource = new EventSource("http://localhost:8000/SSE");
  evtSource.addEventListener("message", function(event) {
      let data = JSON.parse(event.data);
      message = data.message;
  });
  evtSource.addEventListener("end", function(event) {
      let data = JSON.parse(event.data);
      message = data.message;
      evtSource.close();
  });
}
</script>
<h1>SSE</h1>
<p><a on:click={SSE} href="#">Click</a> to start SSE</p>
<p>{message}</p>
Enter fullscreen mode Exit fullscreen mode

7. Start svelte server:

npm run dev -- --open
Enter fullscreen mode Exit fullscreen mode

9. Open http://localhost:5173 and click to start SSE: You should be able to see the messages in your browser

Hope you found this post helpful. Do check out Afterword to take control of your reading and tackle information overload.

Top comments (0)