DEV Community

Building a Task Management API with Apache Cassandra and FastAPI

In this tutorial, we'll create a scalable task management API using Apache Cassandra as our NoSQL database and FastAPI for the web framework. Cassandra is an excellent choice for applications requiring high availability, horizontal scalability, and consistent performance with large datasets.

Why Apache Cassandra?

While MongoDB and Redis are popular NoSQL choices, Cassandra offers unique benefits:

  • Linear scalability with no single point of failure
  • Tunable consistency levels
  • Optimized for write-heavy workloads
  • Excellent support for time-series data
  • Built-in partitioning and replication

Prerequisites

  • Python 3.8+
  • Docker and Docker Compose
  • Basic understanding of REST APIs
  • Familiarity with async programming

Project Setup

First, let's create our project structure:

task-manager/
├── docker-compose.yml
├── requirements.txt
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── models.py
│   ├── database.py
│   └── routes/
│       └── tasks.py
└── README.md
Enter fullscreen mode Exit fullscreen mode

Let's set up our dependencies in requirements.txt:

fastapi==0.104.1
uvicorn==0.24.0
cassandra-driver==3.28.0
pydantic==2.4.2
python-dotenv==1.0.0
Enter fullscreen mode Exit fullscreen mode

Create a docker-compose.yml for Cassandra:

version: '3'

services:
  cassandra:
    image: cassandra:latest
    ports:
      - "9042:9042"
    environment:
      - CASSANDRA_CLUSTER_NAME=TaskManagerCluster
    volumes:
      - cassandra_data:/var/lib/cassandra

volumes:
  cassandra_data:
Enter fullscreen mode Exit fullscreen mode

Database Setup

Let's create our database connection handler in database.py:

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.cqlengine import connection
from contextlib import asynccontextmanager

class CassandraConnection:
    def __init__(self):
        self.cluster = None
        self.session = None

    async def connect(self):
        self.cluster = Cluster(['localhost'])
        self.session = self.cluster.connect()

        # Create keyspace if not exists
        self.session.execute("""
            CREATE KEYSPACE IF NOT EXISTS task_manager
            WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}
        """)

        self.session.set_keyspace('task_manager')

        # Create tasks table
        self.session.execute("""
            CREATE TABLE IF NOT EXISTS tasks (
                task_id uuid,
                title text,
                description text,
                status text,
                due_date timestamp,
                created_at timestamp,
                updated_at timestamp,
                PRIMARY KEY (task_id)
            )
        """)

        return self.session

    async def disconnect(self):
        if self.cluster:
            self.cluster.shutdown()

db = CassandraConnection()

@asynccontextmanager
async def get_db():
    try:
        session = await db.connect()
        yield session
    finally:
        await db.disconnect()
Enter fullscreen mode Exit fullscreen mode

Models

Define our data models in models.py:

from datetime import datetime
from uuid import UUID, uuid4
from pydantic import BaseModel, Field
from typing import Optional

class TaskBase(BaseModel):
    title: str
    description: Optional[str] = None
    status: str = "pending"
    due_date: Optional[datetime] = None

class TaskCreate(TaskBase):
    pass

class Task(TaskBase):
    task_id: UUID = Field(default_factory=uuid4)
    created_at: datetime = Field(default_factory=datetime.now)
    updated_at: datetime = Field(default_factory=datetime.now)

    class Config:
        from_attributes = True
Enter fullscreen mode Exit fullscreen mode

API Routes

Create our task routes in routes/tasks.py:

from fastapi import APIRouter, Depends, HTTPException
from cassandra.cluster import Session
from datetime import datetime
import uuid
from typing import List
from ..models import Task, TaskCreate
from ..database import get_db

router = APIRouter()

@router.post("/tasks/", response_model=Task)
async def create_task(task: TaskCreate, db: Session = Depends(get_db)):
    task_id = uuid.uuid4()
    now = datetime.now()

    query = """
        INSERT INTO tasks (task_id, title, description, status, due_date, created_at, updated_at)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
    """

    db.execute(query, (
        task_id,
        task.title,
        task.description,
        task.status,
        task.due_date,
        now,
        now
    ))

    return Task(
        task_id=task_id,
        title=task.title,
        description=task.description,
        status=task.status,
        due_date=task.due_date,
        created_at=now,
        updated_at=now
    )

@router.get("/tasks/{task_id}", response_model=Task)
async def read_task(task_id: uuid.UUID, db: Session = Depends(get_db)):
    query = "SELECT * FROM tasks WHERE task_id = %s"
    result = db.execute(query, [task_id]).one()

    if not result:
        raise HTTPException(status_code=404, detail="Task not found")

    return Task(**result)

@router.get("/tasks/", response_model=List[Task])
async def list_tasks(db: Session = Depends(get_db)):
    query = "SELECT * FROM tasks"
    results = db.execute(query)
    return [Task(**row) for row in results]

@router.put("/tasks/{task_id}", response_model=Task)
async def update_task(task_id: uuid.UUID, task: TaskCreate, db: Session = Depends(get_db)):
    # Check if task exists
    exists_query = "SELECT * FROM tasks WHERE task_id = %s"
    existing_task = db.execute(exists_query, [task_id]).one()

    if not existing_task:
        raise HTTPException(status_code=404, detail="Task not found")

    update_query = """
        UPDATE tasks 
        SET title = %s, description = %s, status = %s, due_date = %s, updated_at = %s
        WHERE task_id = %s
    """

    now = datetime.now()
    db.execute(update_query, (
        task.title,
        task.description,
        task.status,
        task.due_date,
        now,
        task_id
    ))

    return Task(
        task_id=task_id,
        title=task.title,
        description=task.description,
        status=task.status,
        due_date=task.due_date,
        created_at=existing_task.created_at,
        updated_at=now
    )

@router.delete("/tasks/{task_id}")
async def delete_task(task_id: uuid.UUID, db: Session = Depends(get_db)):
    # Check if task exists
    exists_query = "SELECT * FROM tasks WHERE task_id = %s"
    existing_task = db.execute(exists_query, [task_id]).one()

    if not existing_task:
        raise HTTPException(status_code=404, detail="Task not found")

    query = "DELETE FROM tasks WHERE task_id = %s"
    db.execute(query, [task_id])

    return {"message": "Task deleted successfully"}
Enter fullscreen mode Exit fullscreen mode

Main Application

Set up our FastAPI application in main.py:

from fastapi import FastAPI
from .routes import tasks

app = FastAPI(title="Task Manager API")

app.include_router(tasks.router, tags=["tasks"])

@app.get("/")
async def root():
    return {"message": "Welcome to Task Manager API"}
Enter fullscreen mode Exit fullscreen mode

Running the Application

  1. Start Cassandra:
docker-compose up -d
Enter fullscreen mode Exit fullscreen mode
  1. Install dependencies:
pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode
  1. Run the API:
uvicorn app.main:app --reload
Enter fullscreen mode Exit fullscreen mode

Testing the API

Here are some example curl commands to test our API:

# Create a task
curl -X POST "http://localhost:8000/tasks/" \
     -H "Content-Type: application/json" \
     -d '{"title":"Complete project","description":"Finish the API implementation","status":"pending"}'

# List all tasks
curl "http://localhost:8000/tasks/"

# Get a specific task
curl "http://localhost:8000/tasks/{task_id}"

# Update a task
curl -X PUT "http://localhost:8000/tasks/{task_id}" \
     -H "Content-Type: application/json" \
     -d '{"title":"Complete project","description":"API implementation done","status":"completed"}'

# Delete a task
curl -X DELETE "http://localhost:8000/tasks/{task_id}"
Enter fullscreen mode Exit fullscreen mode

Performance Considerations

When working with Cassandra, keep in mind:

  1. Data Modeling: Design tables around your query patterns
  2. Partition Keys: Choose partition keys that distribute data evenly
  3. Consistency Levels: Configure based on your use case
  4. Batch Operations: Use them sparingly and only for related data
  5. Secondary Indexes: Use them carefully as they can impact performance

Conclusion

This implementation demonstrates how to build a robust task management API using Apache Cassandra. The combination of Cassandra's scalability and FastAPI's performance makes this stack suitable for production applications requiring high availability and consistency.

Future enhancements could include:

  • Authentication and authorization
  • More complex querying capabilities
  • Batch operations
  • Advanced task filtering
  • Automated testing

The complete source code is available on GitHub: [Link to your repository]

Resources

Top comments (0)