DEV Community

Cover image for Building Flask User Authentication with JWT
Dhruv Prajapati
Dhruv Prajapati

Posted on

Building Flask User Authentication with JWT

  • This is the last part of Flask Series.
  • Part 1 in which i have explained what is REST API and HTTPS and then implemented RESTAPI with Flask.
  • Part 2 we implemented simple Login Registration with Flask.
  • Today we will implement Token based authentication using JWT in Flask.

  • Modules which i have used to implement JWT based authentication are given below

Flask -> Building Web Application
Flask-JWT-Extended -> To work with JWT Tokens
Flask-RESTful -> To Build RestAPIs
Flask-SqlAlchemy -> Provide support for SQLAlchemy to your application
passlib -> Password Hasing Library
PyJWT -> To Encode and Decode Json Web Token
SQLAlchemy -> Provide ORM for our Application
Werkzeug -> WSGI Web Application Library, it will provide security to encrypt password and matching Password
Enter fullscreen mode Exit fullscreen mode
  • To install modules use pip install <module-name>

What is JWT?

  • JWT stands for JSON WEB TOKEN
  • It is used to transfer information securely between parties as a JSON object.
  • This information can be verfied at both the end because it is digitally signed.
  • We can use Public key cryptography or Private key cryptography to signed JWTs.

JSON WEB TOKEN Structure

  • JWT is made up from Three parts
  • Header
  • Paylod
  • Signature

Header

  • Header consist two parts:The Type of JWT Token and Hashing Algorithm For Example:
{
  "alg": "RS256",
  "typ": "JWT"
}
Enter fullscreen mode Exit fullscreen mode

Payload

  • Payload is nothing but User Data such as Username, subject, birthdate etc... For Example:
{
  "sub": "123456789",
  "name": "Dhruv Prajapati",
  "admin": true,
}
Enter fullscreen mode Exit fullscreen mode

Signature

  • To create the signature part you have to take the encoded header, the encoded payload, a secret, the algorithm specified in the header, and sign that. For Example:
HMACSHA256(
  base64UrlEncode(header) + "." +
  base64UrlEncode(payload),
  secret)
Enter fullscreen mode Exit fullscreen mode

You can understand more about JWT from here

  • So let's start building over project.

  • Create Directory named as Flask-JWT-Authentication. Our Folder Structure is as given below.

Alt Text

  • First we will create basic structure of our application such as creating Flask Application, API Class Object, Database class Object, JWT Manager Object and other application configuration.

  • Let's create app.py and put below code.


from flask import Flask

from flask_restful import Api

from flask_sqlalchemy import SQLAlchemy

from flask_jwt_extended import JWTManager

# Making Flask Application
app = Flask(__name__)

# Object of Api class
api = Api(app)

# Application Configuration
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:root@localhost/jwt_auth'

app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

app.config['SECRET_KEY'] = 'ThisIsHardestThing'

app.config['JWT_SECRET_KEY'] = 'Dude!WhyShouldYouEncryptIt'

app.config['JWT_BLACKLIST_ENABLED'] = True

app.config['JWT_BLACKLIST_TOKEN_CHECKS'] = ['access', 'refresh']

# SqlAlchemy object
db = SQLAlchemy(app)

# JwtManager object
jwt = JWTManager(app)

Enter fullscreen mode Exit fullscreen mode
  • we will create models for our web application. so create models.py inside the directory we just created. models.py contains two models. One is UserModel and other one is RevokedTokenModel

from app import db

from passlib.hash import pbkdf2_sha256 as sha256


class UserModel(db.Model):
    """
    User Model Class
    """

    __tablename__ = 'users'

    id = db.Column(db.Integer, primary_key=True)

    username = db.Column(db.String(120), unique=True, nullable=False)

    password = db.Column(db.String(120), nullable=False)

    """
    Save user details in Database
    """
    def save_to_db(self):

        db.session.add(self)

        db.session.commit()

    """
    Find user by username
    """
    @classmethod
    def find_by_username(cls, username):

        return cls.query.filter_by(username=username).first()

    """
    return all the user data in json form available in DB
    """
    @classmethod
    def return_all(cls):

        def to_json(x):

            return {

                'username': x.username,

                'password': x.password

            }

        return {'users': [to_json(user) for user in UserModel.query.all()]}

    """
    Delete user data
    """
    @classmethod
    def delete_all(cls):

        try:

            num_rows_deleted = db.session.query(cls).delete()

            db.session.commit()

            return {'message': f'{num_rows_deleted} row(s) deleted'}

        except:

            return {'message': 'Something went wrong'}

    """
    generate hash from password by encryption using sha256
    """
    @staticmethod
    def generate_hash(password):

        return sha256.hash(password)

    """
    Verify hash and password
    """
    @staticmethod
    def verify_hash(password, hash_):

        return sha256.verify(password, hash_)

Enter fullscreen mode Exit fullscreen mode
  • User Model contains two fields: Username and Password. To save the user details in Database model contains save_to_db method. find_by_username method will find user by it's username. return_all and delete_all method is used to return all users and delete all users respectively. generate_hash method will generate hash from user password and verify_hash method will verify hash and password at login time.
# User Model
# ...

class RevokedTokenModel(db.Model):
    """
    Revoked Token Model Class
    """

    __tablename__ = 'revoked_tokens'

    id = db.Column(db.Integer, primary_key=True)

    jti = db.Column(db.String(120))

    """
    Save Token in DB
    """
    def add(self):

        db.session.add(self)

        db.session.commit()

    """
    Checking that token is blacklisted
    """
    @classmethod
    def is_jti_blacklisted(cls, jti):

        query = cls.query.filter_by(jti=jti).first()

        return bool(query)
Enter fullscreen mode Exit fullscreen mode
  • RevokedToken Model contains jti field. add method is used to save token in Database and is_jti_blacklisted is used to check that JTI is blacklisted or not.

  • Now we will create resources for our APIs. So, let's create resource.py

  • First we will create UserRegistration and UserLogin Resources.


from flask_restful import Resource, reqparse

from models import UserModel, RevokedTokenModel

from flask_jwt_extended import (
    create_access_token,
    create_refresh_token,
    jwt_required,
    jwt_refresh_token_required,
    get_jwt_identity,
    get_raw_jwt
)

import pdb

# provide simple and uniform access to any variable
parser = reqparse.RequestParser()

parser.add_argument('username', help='username cannot be blank', required=True)

parser.add_argument('password', help='password cannot be blank', required=True)


class UserRegistration(Resource):
    """
    User Registration Api
    """

    def post(self):

        data = parser.parse_args()

        username = data['username']

        # Checking that user is already exist or not
        if UserModel.find_by_username(username):

            return {'message': f'User {username} already exists'}

        # create new user
        new_user = UserModel(

            username=username,

            password=UserModel.generate_hash(data['password'])

        )

        try:

            # Saving user in DB and Generating Access and Refresh token
            new_user.save_to_db()

            access_token = create_access_token(identity=username)

            refresh_token = create_refresh_token(identity=username)

            return {

                'message': f'User {username} was created',

                'access_token': access_token,

                'refresh_token': refresh_token

            }

        except:

            return {'message': 'Something went wrong'}, 500


class UserLogin(Resource):
    """
    User Login Api
    """

    def post(self):

        data = parser.parse_args()

        username = data['username']

        # Searching user by username
        current_user = UserModel.find_by_username(username)

        # user does not exists
        if not current_user:

            return {'message': f'User {username} doesn\'t exist'}

        # user exists, comparing password and hash
        if UserModel.verify_hash(data['password'], current_user.password):

            # generating access token and refresh token
            access_token = create_access_token(identity=username)

            refresh_token = create_refresh_token(identity=username)

            return {

                'message': f'Logged in as {username}',

                'access_token': access_token,

                'refresh_token': refresh_token

            }

        else:

            return {'message': "Wrong credentials"}
Enter fullscreen mode Exit fullscreen mode
  • Now Let's create UserLogoutAccess and UserLogoutRefresh inside the resource.py

class UserLogoutAccess(Resource):
    """
    User Logout Api 
    """

    @jwt_required
    def post(self):

        jti = get_raw_jwt()['jti']

        try:
            # Revoking access token
            revoked_token = RevokedTokenModel(jti=jti)

            revoked_token.add()

            return {'message': 'Access token has been revoked'}

        except:

            return {'message': 'Something went wrong'}, 500


class UserLogoutRefresh(Resource):
    """
    User Logout Refresh Api 
    """
    @jwt_refresh_token_required
    def post(self):

        jti = get_raw_jwt()['jti']

        try:

            revoked_token = RevokedTokenModel(jti=jti)

            revoked_token.add()

            pdb.set_trace()

            return {'message': 'Refresh token has been revoked'}

        except:

            return {'message': 'Something went wrong'}, 500
Enter fullscreen mode Exit fullscreen mode
  • We need to create TokenRefresh resource so, if token expired then user can get new access token from refresh token. Create TokenRefresh resource under the resource.py

class TokenRefresh(Resource):
    """
    Token Refresh Api
    """

    @jwt_refresh_token_required
    def post(self):

        # Generating new access token
        current_user = get_jwt_identity()

        access_token = create_access_token(identity=current_user)

        return {'access_token': access_token}

Enter fullscreen mode Exit fullscreen mode
  • At last we will create getAllUsers and deleteAllUsers under the resources. also we will create starting point for authenticated resources secretResources. under secretResources you can create authenticated end points which will only accessible when user is authenticated.

class AllUsers(Resource):

    def get(self):
        """
        return all user api
        """
        return UserModel.return_all()

    def delete(self):
        """
        delete all user api
        """
        return UserModel.delete_all()


class SecretResource(Resource):

    """
    Secrest Resource Api
    You can create crud operation in this way
    """
    @jwt_required
    def get(self):
        return {'answer': 'You are accessing super secret blueprint'}
Enter fullscreen mode Exit fullscreen mode
  • At the finishing up the project, create our APIs and database tables under the app.py
# above code
#...

# Generating tables before first request is fetched
@app.before_first_request
def create_tables():

    db.create_all()

# Checking that token is in blacklist or not
@jwt.token_in_blacklist_loader
def check_if_token_in_blacklist(decrypted_token):

    jti = decrypted_token['jti']

    return models.RevokedTokenModel.is_jti_blacklisted(jti)

# Importing models and resources
import models, resources

# Api Endpoints

api.add_resource(resources.UserRegistration, '/registration')

api.add_resource(resources.UserLogin, '/login')

api.add_resource(resources.UserLogoutAccess, '/logout/access')

api.add_resource(resources.UserLogoutRefresh, '/logout/refresh')

api.add_resource(resources.TokenRefresh, '/token/refresh')

api.add_resource(resources.AllUsers, '/users')

api.add_resource(resources.SecretResource, '/secret')

Enter fullscreen mode Exit fullscreen mode
  • Here, our building section is completed. Let's run our project

  • To run the project in windows machine go to the project path in Command Prompt and use below commands.

set FLASK_APP=app.py

set FLASK_DEBUG=1

flask run --port=8080 
Enter fullscreen mode Exit fullscreen mode
  • To run the project in Unix Machinego to the project path in Terminal and use below commands.
export FLASK_APP=app.py

export FLASK_DEBUG=1

flask run --port=8080 
Enter fullscreen mode Exit fullscreen mode
  • Here, --port is optional flag.

  • Now, consume all the apis one by one. I have added some of api access in Postman.

UserRegistration Api

Alt Text

UserLogin Api

Alt Text

GetAllUsers

Alt Text

Thank you for Reading. Give like and follow me for more amazing tutorials.

Top comments (3)

Collapse
 
alimurtadho profile image
ali murtadho

name 'jwt_refresh_token_required' is not defined

Collapse
 
imdhruv99 profile image
Dhruv Prajapati

You need to install Flask-JWT-Extended first.

Collapse
 
alimurtadho profile image
ali murtadho

ok thank you