DEV Community

Riottecboi
Riottecboi

Posted on

Travel Recommended with FastAPI, Kafka, MongoDB and OpenAI

FastAPI

Introduction

In the bustling realm of modern travel, personalized recommendations play a pivotal role in enhancing user experiences and fostering memorable journeys. Our project, the Smart Travel Recommender System, aims to revolutionize the way travelers explore new destinations by providing tailored recommendations for activities based on country and season. By integrating cutting-edge technologies such as FastAPI, Kafka, MongoDB, and OpenAI, we aspire to deliver a seamless and scalable solution that empowers users to discover the essence of every destination.

Structure of Project

Travel Recommender/
├── app/
│   ├── background_worker.py
│   ├── main.py
│   ├── api/
│   │   ├── openai_api.py
│   │   └── routes/
│   │       ├── recommendations_route.py
│   │       └── status_route.py
│   ├── models/
│   │   ├── __init__.py
│   │   └── models.py
│   ├── schemas/
│   │   └── schema.py
│   ├── core/
│   │   └── config.py
│   ├── db/
│   │   └── mongodb.py
│   └── utils/
│       └── kafka.py
├── test/
│   └── sample_test.py
├── requirements.txt
├── README.md
├── Dockerfile
├── docker-compose.yml
Enter fullscreen mode Exit fullscreen mode

More information from my Github

Installation & Usage

sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt update 
sudo apt install python3.11 -y
Enter fullscreen mode Exit fullscreen mode

Use the package manager Pip to install driver, and any other required libraries.

sudo apt-get install python3-pip -y
Enter fullscreen mode Exit fullscreen mode

Initialize your virtual environment in your project and activate the virtual environment.

python3.11 -m venv <virtual-environment-name>
source <virtual-environment-name>/bin/activate
Enter fullscreen mode Exit fullscreen mode

Install all required libraries for this project from the file requirements.txt

python3.11 -m pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode

Set Up All Services

I have created a docker-compose file for easy to install all of elements we need for this project.

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: bitnami/kafka:latest
    ports:
        - 9092:9092
        - 9093:9093
    environment:
        - KAFKA_BROKER_ID=1
        - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
        - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
        - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
        - ALLOW_PLAINTEXT_LISTENER=yes
        - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT
        - KAFKA_CFG_LISTENERS=CLIENT://:9092
        - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092
        - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
        - zookeeper

  kafka-ui:
    image: provectuslabs/kafka-ui
    ports:
      - 8080:8080
    depends_on:
      - kafka
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092

  mongodb:
    image: mongo:5.0
    ports:
      - 27017:27017
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: root
    healthcheck:
      test: echo 'db.runCommand("ping").ok' | mongo localhost:27017/test --quiet
      interval: 10s
      timeout: 5s
      retries: 5

  mongo-express:
    image: mongo-express:latest
    depends_on:
      mongodb:
        condition: service_healthy
    ports:
      - 8888:8081
    environment:
      ME_CONFIG_MONGODB_SERVER: mongodb

  fastapi-app:
    build:
      context: .
      args:
        NO_CACHE: "true"
    depends_on:
      - kafka
      - mongodb
    ports:
      - "8000:8000"
    environment:
      OPENAI_KEY: "xxxx"
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
      KAFKA_TOPIC: recommendations_topic
      MONGODB_URI: mongodb://root:root@mongodb:27017
      MONGODB_DATABASE: recommendations
Enter fullscreen mode Exit fullscreen mode

Dockerfile will be like this

FROM tiangolo/uvicorn-gunicorn-fastapi:python3.11

RUN apt-get update && apt-get install -y \
    build-essential \
    libpq-dev \
    python3-pip \
    && rm -rf /var/lib/apt/lists/*

COPY requirements.txt /app/requirements.txt
COPY app/api /app/api
COPY app/core /app/core
COPY app/db /app/db
COPY app/schemas /app/schemas
COPY app/utils /app/utils
COPY app/background-worker.py /app/background.py
COPY app/main.py /app/main.py

RUN pip3 install --no-cache-dir -r requirements.txt

EXPOSE 8000
# Run the FastAPI service and background process
CMD ["sh", "-c", "uvicorn main:app --host 0.0.0.0 --port 8000 & python background.py"]
Enter fullscreen mode Exit fullscreen mode

Containers will be pulled and run by each process, connect with a default bridge network of project

Containers will be pulled and run by each process, connect with a default bridge network of project.

Explanation

We will have in total two routes (recommendation_route and status_route)

  • recommendation_route will accept two query parameters:
  1. country: The country for which the recommendations are to be fetched.
  2. season: The season in which the recommendations are desired (e.g., "summer", "winter").
  3. Both parameters are required and the season must validate that one of the four seasons is chosen.
class RecommendationsRequest(BaseModel):
    country: str = Field(..., description="The country for which recommendations are to be fetched.")
    season: str = Field(..., description="The season in which the recommendations are desired.")

    @model_validator(mode='after')
    def validate_season(cls, values):
        try:
            pycountry.countries.search_fuzzy(values.country)
        except LookupError:
            raise ValueError(f"Invalid country.")
        valid_seasons = ["spring", "summer", "autumn", "winter"]
        if values.season not in valid_seasons:
            raise ValueError(f"Invalid season. Must be one of {', '.join(valid_seasons)}")

        return values
Enter fullscreen mode Exit fullscreen mode

When a request is made to the endpoint, generate a unique identifier (UID) for the request then return the UID to the user immediately.

try:
    RecommendationsRequest(country=country, season=season)
    uid = str(uuid.uuid4())
    request_data = {'country': country, 'season': season}
    await kafka_producer(request_data, uid)
    return RecommendationSubmitResponse(uid=uid)
except ValidationError as e:
    raise HTTPException(status_code=422, detail=ErrorResponse(error="Invalid country/season", message="The input of country or season is invalid. Please try again.").dict())
Enter fullscreen mode Exit fullscreen mode

Offload the processing of the request to a background component. At this stage, we will use Kafka to send the request as a message to a Kafka topic and consume it with a separate worker.

async def kafka_producer(request_data, uid):
    producer = AIOKafkaProducer(
        bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS
    )
    await producer.start()
    await producer.send(
        settings.KAFKA_TOPIC,
        f"{uid}:{request_data}".encode("utf-8"), partition=0
    )
    await producer.stop()
Enter fullscreen mode Exit fullscreen mode

This Python code block defines an asynchronous function named kafka_producer that is responsible for sending data to a Kafka topic.

producer = AIOKafkaProducer(
        bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS
    )
Enter fullscreen mode Exit fullscreen mode

Initializes a Kafka producer using the AIOKafkaProducer class from aiokafka library. This producer is configured to connect to the Kafka cluster specified in the settings.KAFKA_BOOTSTRAP_SERVERS

await producer.start()
    await producer.send(
        settings.KAFKA_TOPIC,
        f"{uid}:{request_data}".encode("utf-8"), partition=0
    )
Enter fullscreen mode Exit fullscreen mode

This step prepares the producer to send messages to the Kafka cluster, sends a message to the specified Kafka topic (settings.KAFKA_TOPIC). The message content is a string composed of the uid followed by a colon (:) and the request_data, message should be encoded as UTF-8 before sending.

When the message sent a new topic to Kafka, background_worker.py will do a work to catch that message.

async def handle_request(uid, request_data):
    try:
        recommendations = await get_openai_recommendation(request_data)
    except Exception as e:
        recommendations = []
    result = await save_recommendations(uid, request_data, recommendations)
    print(f"Recommendations saved with ID: {result}")

async def main():
    while True:
        uid, request_data = await kafka_consumer()
        await handle_request(uid, request_data)
Enter fullscreen mode Exit fullscreen mode

on this stage, the worker will consume a message by this code block, it defines an asynchronous function that acts as a Kafka consumer, receiving messages from a specified Kafka topic, processing them, and committing the offsets to track the progress of message consumption.

consumer = AIOKafkaConsumer(
        settings.KAFKA_TOPIC,
        bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
        group_id=settings.KAFKA_TOPIC,
        auto_offset_reset='earliest'
    )
await consumer.start()
try:
    async for msg in consumer:
        uid, request_data = msg.value.decode("utf-8").split(":", 1)
        print(f"Processed recommendation request: {request_data}")
        await consumer.commit()
        return uid, eval(request_data)
except Exception as e:
    print(f"Consumer error: {e}")
finally:
    await consumer.stop()
Enter fullscreen mode Exit fullscreen mode

About the auto_offset_reset=’earliest’, It specifies the behavior for handling the offset when there is no initial offset or the current offset does not exist on the server (e.g., because the data has been deleted). In this case, it's set to ‘earliest’, meaning the consumer will start reading from the earliest available message.

After commits the offset of the consumed message, background_worker.py make the OpenAI API call.

try:
    client = OpenAI(api_key=settings.OPENAI_KEY)
    chat_completion = client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": f"Provide three recommendations for doing in {request_data['country']} during {request_data['season']}.",
            }
        ],
        model="gpt-3.5-turbo",
    )
    return [chat_completion.choices[0].message.content]
except Exception as e:
    raise Exception(str(e))
Enter fullscreen mode Exit fullscreen mode

and saving recommendations store the recommendations in MongoDB with the UID as the key, structure the data to include the (country, season) as request_data, recommendations.

async def save_recommendations(uid, request_data, recommendations):
    recommendation_doc = {
        "uid": uid,
        "request_data": request_data,
        "recommendations": recommendations
    }
    result = await loop.run_in_executor(None, recommendations_collection.insert_one, recommendation_doc)
    return result.inserted_id
Enter fullscreen mode Exit fullscreen mode
  • status_route accept the UID as a query parameter, checking the result in MongoDB.
  1. If having result, the status is “completed” with the recommendations.
  2. If not process haven’t finish, the status is “pending” to inform the data is not yet available.
if recommendations is None:
    raise HTTPException(status_code=404, detail=ErrorResponse(error="UID not found", message="The provided UID does not exist. Please check the UID and try again.").dict())

if recommendations:
    return RecommendationResponse(uid=uid, country=country, season=season, message="The recommendations are ready", recommendations=recommendations, status="completed")
return RecommendationCheckResponse(uid=uid, status="pending", message="The recommendations are not yet available. Please try again later.")
Enter fullscreen mode Exit fullscreen mode
class RecommendationSubmitResponse(BaseModel):
    uid: str

class RecommendationCheckResponse(RecommendationSubmitResponse):
    message: Optional[str] = None
    status: str

class RecommendationResponse(RecommendationCheckResponse):
    country: Optional[str] = None
    season: Optional[Literal["spring", "summer", "autumn", "winter"]] = None
    recommendations: Optional[List[str]] = None

class ErrorResponse(BaseModel):
    error: str
    message: str
Enter fullscreen mode Exit fullscreen mode

Final Demo

Final Demo

Conclusion

By leveraging FastAPI, Kafka, MongoDB, and OpenAI, we endeavor to deliver a sophisticated yet user-friendly platform that empowers travelers to embark on unforgettable journeys tailored to their preferences and interests. With scalability, efficiency, and personalization at its core, our system strives to redefine the way travelers explore the world, one recommendation at a time.

Top comments (0)