DEV Community

Mohamed Sambo
Mohamed Sambo

Posted on • Updated on

3- Your First FastApi+JWT token

Objectives the blog will discuss

  • Hashing
  • OAuth2 with Password
  • SQLMODEL
  • Bearer with JWT tokens

GITHUB REPO: https://github.com/sambo2021/python-dev/tree/master/fastapi-auth-project

We'll proceed to installing the necessary dependencies needed in this guide. Copy the below content to requirements.txt.
pip install -r requirements.txt

Hashing:

Our database will handle users signing in for now, but you do not want to store the password as it is plaintext, converting the original plain-text passwords
adrian123

into irreversible, fixed-length hash codes
$2b$12$fNiX.PSSs4XQg0YYC5PEF.t5.aDjEvhIVYHIN5UxLXO2.9LIRHnO6

This way, even if the hashed data is somehow obtained by an attacker, they cannot reverse-engineer it to reveal the original passwords.

We implement it by installing the necessary dependencies.
pip install "passlib[bcrypt]"

Python package to handle password hashes with the recommended algorithm is “Bcrypt”.

from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_password(password:str):
    return pwd_context.hash(password)
if __name__ == "__main__":
    print(hash_password("adrian123"))
Enter fullscreen mode Exit fullscreen mode

output:

python3 main.py 
python3 main.py 
(trapped) error reading bcrypt version
Traceback (most recent call last):
  File "/home/sambo/.local/lib/python3.10/site-packages/passlib/handlers/bcrypt.py", line 620, in _load_backend_mixin
    version = _bcrypt.__about__.__version__
AttributeError: module 'bcrypt' has no attribute '__about__'
$2b$12$fNiX.PSSs4XQg0YYC5PEF.t5.aDjEvhIVYHIN5UxLXO2.9LIRHnO6
Enter fullscreen mode Exit fullscreen mode

note:
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto"): This creates an instance of the CryptContext class, specifying that the bcrypt hashing algorithm should be used.
The deprecated="auto" parameter means that if bcrypt becomes deprecated in the future, passlib will automatically choose a more secure scheme.

OAuth2 with Password and Bearer:

as mentioned in the docs-> https://fastapi.tiangolo.com/tutorial/security/simple-oauth2/

We are going to use FastAPI security utilities to get the username and password.
OAuth2 specifies that when using the "password flow" (that we are using) the client/user must send a username and password fields as form data.
And the spec says that the fields have to be named like that. So user-name or email wouldn't work.
But don't worry, you can show it as you wish to your final users in the frontend.
And your database models can use any other names you want.
But for the login path operation, we need to use these names to be compatible with the spec (and be able to, for example, use the integrated API documentation system).
The spec also states that the username and password must be sent as form data (so, no JSON here).

but before going into that part lets dive into

Create Database and Tables on startup:

I am using sqlmodel to connent to database, create table and do the sql operations to store users into the table and use them into other endpoints especially /token for generating jwt token

database.py

from sqlmodel import Field, SQLModel, create_engine

class User(SQLModel, table=True):
    __table_args__ = (UniqueConstraint("email"),)
    username: str = Field( primary_key=True)
    fullname: str
    email: str
    hashed_password: str
    join: datetime = Field(default=datetime.utcnow())
    disabled: bool = Field(default=False)

sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
    SQLModel.metadata.create_all(engine)

adrianholland = User(
        username = "adrianholland", 
        fullname = "Adrian Holland",
        email = "Adrian.Holland@gmail.com",
        hashed_password = hash_password("a1dri2an5@6holl7and"))

def initiate_admin():
    admin = get_user("adrianholland")
    if not admin:
        add_user(adrianholland)
Enter fullscreen mode Exit fullscreen mode

For that, we will import SQLModel (plus other things we will also use) and create a class User that inherits from SQLModel and represents the table model for our users, and on start up of the main app we will use:

main.py

@app.on_event("startup")
def on_startup():
    create_db_and_tables()
    initiate_admin()
Enter fullscreen mode Exit fullscreen mode

but of course we can not do it before adding main database functions we need, so:

database.py

from sqlmodel import Field, SQLModel, Session, create_engine,select
from passlib.context import CryptContext
from datetime import datetime
from sqlalchemy import UniqueConstraint
from fastapi import HTTPException
from email_validator import EmailNotValidError, validate_email
from disposable_email_domains import blocklist

sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)


def create_db_and_tables():
    SQLModel.metadata.create_all(engine)

class User(SQLModel, table=True):
    __table_args__ = (UniqueConstraint("email"),)
    username: str = Field( primary_key=True)
    fullname: str
    email: str
    hashed_password: str
    join: datetime = Field(default=datetime.utcnow())
    disabled: bool = Field(default=False)

# to hash the passowrd as we did before    
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_password(password: str):
    return pwd_context.hash(password)

##https://github.com/s-azizkhan/fastapi-email-validation-server/blob/main/main.py
def validate_email_data(email: str):
    try:
        # Validate against general email rules
        v = validate_email(email, check_deliverability=True)

        # Check if the domain is in the disposable email blocklist
        domain = email.split("@")[1]
        if domain in blocklist:
            raise HTTPException(
                status_code=400,
                detail=f"Disposable email addresses are not allowed: {email}",
            )

        return True
    except EmailNotValidError as e:
        return False
    except Exception as e:
        return False

def validate_data(user: User):
    return validate_email_data(user.email) and type(user.username) == str

def get_user(username: str):
    with Session(engine) as session:
        user = session.get(User, username)
        return user

def add_user(user: User):
    exist_user = get_user(user.username)
    if not exist_user:
        with Session(engine) as session:
            session.add(user,_warn=True)
            session.commit()
    else:
        raise HTTPException(status_code=409, detail=f"user {exist_user.fullname} exists and username is {exist_user.username}")


def get_all_users():
    with Session(engine) as session:
        statement = select(User)
        users = session.exec(statement).fetchall()
        return users

adrianholland = User(
        username = "adrianholland", 
        fullname = "Adrian Holland",
        email = "Adrian.Holland@gmail.com",
        hashed_password = hash_password("a1dri2an5@6holl7and"))

def initiate_admin():
    admin = get_user("adrianholland")
    if not admin:
        add_user(adrianholland)

Enter fullscreen mode Exit fullscreen mode

so now we have our main skeleton to build our app, get_user, add_user, and get_users, and of course, helper functions like hash_passowrd to store the password you entered as hashed one, and the validator function to validate mainly the email

now we get back to the main.py to add all endpoint we need

main.py

from datetime import timedelta
from fastapi import Depends, FastAPI, HTTPException, Request, status
from fastapi.security import OAuth2PasswordRequestForm
from models import Token, User, authenticate_user, create_access_token, get_current_active_user
from database import create_db_and_tables, get_user, add_user, get_all_users, hash_password, initiate_admin, validate_data
import json


app = FastAPI()

@app.on_event("startup")
def on_startup():
    create_db_and_tables()
    initiate_admin()

@app.get("/",tags=["root"])
async def read_root(current_user: User = Depends(get_current_active_user)):
    return {"message":"Welcome inside first FastApi api",
            "owner": current_user}


# when u login u redirected to /token to generate token by username and password
# u enter the username
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
                            detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"})
    access_token_expires = timedelta(minutes=30)
    access_token = create_access_token(
        data={"username" : user.username, "email": user.email, "fullname": user.fullname }, expires_delta=access_token_expires)
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me", tags=["my-user-data"])
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return {
            "owner": current_user
            }


@app.get("/users/{user_name}",tags=["get-one-user"])
async def read_item(user_name: str , request: Request , current_user: User = Depends(get_current_active_user)):
    user = get_user(user_name)
    try:
        return {"user_name": user.username,
                "user_fullname": user.fullname,
                "user_email": user.email,
                "query_params": request.query_params,
                "request_headers": request.headers,
                "owner": current_user
                }
    except:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
                            detail=f"user {user_name} doesnot exist", headers={"WWW-Authenticate": "Bearer"})




@app.get("/users",tags=["get-all-users"])
async def read_item(request: Request , current_user: User = Depends(get_current_active_user)):
    return {
            "users": get_all_users(),
            "request_headers": request.headers,
            "owner": current_user
            }

@app.post("/users",tags=["add-new-user"])
async def read_item(request: Request , current_user: User = Depends(get_current_active_user)):
    request_body  = await request.body()
    json_str = request_body.decode('utf-8')
    json_data = json.loads(json_str)
    new_user=User(
        username = json_data["username"], 
        fullname = json_data["fullname"],
        email = json_data["email"],
        hashed_password = hash_password(json_data["password"])
    )
    if validate_data(new_user):
        add_user(new_user)
        return {
                "request_headers": request.headers,
                "owner": current_user
                }
    else:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
                            detail=f"{new_user.username} or {new_user.email} may be not valid", headers={"WWW-Authenticate": "Bearer"})

Enter fullscreen mode Exit fullscreen mode

Bearer with JWT tokens

but you still need some functionality to generate the jwt token and verify it against any login so have a look on :

models.py
and you will find everything described, but for SECRET_KEY used in encoding the token you need to generate your own local and use it
$ openssl rand -hex 32

from fastapi import Depends,HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext # we use it for password hasher
from database import get_user,add_user,User


#openssl rand -hex 32
#we gonna use it to encode and decode the token

SECRET_KEY = "2cfea755f57d42cc34ce427475f5891aa92361bff9af79a360d1dafd296853d9"

# lets consider this as out data base already which have users and thier hashed password
# what is the actual password -> adrian123
# how u hashed it -> print(get_password_hash("adrian123"))



class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: str or None = None


#This parameter specifies how to handle deprecated hashing algorithms. "auto" means 
#that FastAPI will automatically handle deprecated hashing schemes and update them as needed.
#"bcrypt" is chosen. Bcrypt is a popular and secure password hashing algorithm
#This parameter specifies how to handle deprecated hashing algorithms. "auto" means that 
#FastAPI will automatically handle deprecated hashing schemes and update them as needed.
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


#OAuth2PasswordBearer: This is a class provided by the fastapi.security module for handling OAuth2 password bearer authentication. 
#OAuth2 is a protocol that allows secure authorization in a standardized way.
#tokenUrl="token": This parameter specifies the URL where clients can request a token. 
#In this case, it's set to "token," meaning that when clients want to authenticate using OAuth2 password flow, 
#they should send their credentials to the "/token" endpoint.
#With this code, you've created an instance of the OAuth2PasswordBearer class named oauth2_scheme, and you can use it as a dependency in 
#your FastAPI routes. When a route depends on oauth2_scheme, FastAPI will expect clients to include an OAuth2 token in the "Authorization" 
#header of their requests. The token will be validated and can be used to identify and authenticate the user.
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


def authenticate_user(username: str, password: str):
    user = get_user(username)
    # if the user u entered is not exist 
    if not user:
        return False
    # if the user u entered exists but lets compare the password text u entered with the hashed one
    if not pwd_context.verify(password, user.hashed_password):
        return False

    return user


def create_access_token(data: dict, expires_delta: timedelta or None = None):
    to_encode = data.copy()
    print(f"data: {to_encode}")
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)

    to_encode.update({"exp": expire})
    print(f"data: {to_encode}")

    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")
    print(f"jwt: {encoded_jwt}")
    return encoded_jwt


async def get_current_user(token: str = Depends(oauth2_scheme)):
    credential_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
                                         detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"})

    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        print(f"payload: {payload}")
        username = payload.get("username")
        print(f"username: {username}")
        if username is None:
            raise credential_exception

        token_data = TokenData(username=username)
        print(f"token_data: {token_data}")
    except JWTError:
        raise credential_exception

    user = get_user(username=token_data.username)
    if user is None:
        raise credential_exception

    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")

    return current_user

Enter fullscreen mode Exit fullscreen mode

finally your app structure shall be something like

app
|__ database.py
|__ models.py
|__ main.py

once u run your app uvicorn main:app --port 9095 --reload
you can go through your brower directly http://localhost:9095/docs
you should have endpoints for user login and remaining endpoints

fastapi/docs

and once you go into /token endpoint and enter the username and password

token endpoint

you will have a valid token

jwt token

you can see a lock on each endpoint because we already used
current_user: User = Depends(get_current_active_user)

so you can not use any of them if you dont have a valid token at first point

next enhancement:
1- add more endpoints delete-user, update-user, block-user
2- Using Postgres sql database
3- containerize our app and database, and make them microservices

Top comments (0)