Python JWT Authentication with RSA + User Management CRUD

Python JWT Authentication with RSA + User Management CRUD

Build a secure REST API authentication system with Python and FastAPI using JWT and RSA encryption. This step-by-step tutorial covers user registration, login with encrypted passwords, JWT-protected routes, and full user management (CRUD) backed by MySQL. Includes setup, security best practices, and Postman testing. Perfect for building professional, production-ready APIs.

📌 Tech Stack

  • Python 3.10+

  • FastAPI

  • MySQL

  • JWT (python-jose)

  • RSA Encryption (Public/Private Key)

  • bcrypt

  • Postman

🧠 Authentication Flow

  1. Client encrypts password using RSA Public Key.

  2. Server decrypts password using RSA Private Key.

  3. Password verified using bcrypt.

  4. JWT token issued.

  5. Protected routes secured with JWT dependency.

📦 Requirements

  • Python 3.10+

  • MySQL 8+

  • OpenSSL

  • Postman

1️⃣ Create Python Project

mkdir python-rsa-auth cd python-rsa-auth python3 -m venv venv source venv/bin/activate

Open the project folder in your preferred editor (e.g., VS Code).

2️⃣ Install Dependencies

pip install fastapi uvicorn python-jose bcrypt mysql-connector-python python-dotenv cryptography

3️⃣ Create Project Folder Structure

mkdir -p app/api/auth app/api/users app/utils app/db app/middleware storage/rsa
FolderPurpose
app/apiAPI endpoints
app/api/authAuth related APIs
app/api/usersUser Management CRUD APIs
app/middlewareJWT auth middleware
app/utilsJWT & RSA helper utilities
app/dbDatabase connection
storage/rsaRSA key storage

4️⃣ Create Required Files

Create these files (you can use your editor or terminal):

touch app/main.py \ app/api/auth/register.py \ app/api/auth/login.py \ app/api/auth/profile.py \ app/api/auth/logout.py \ app/api/users/user_crud.py \ app/middleware/auth.py \ app/utils/jwt.py \ app/utils/rsa.py \ app/db/mysql.py \ .env \ .gitignore

5️⃣ Configure .gitignore

Add:

venv/ .env storage/rsa/private.pem

⚠️ Never commit your private RSA key!

6️⃣ Set Environment Variables (.env)

DB_HOST=localhost DB_USER=root DB_PASSWORD= DB_NAME=python_rsa_auth JWT_SECRET=super_secret_key JWT_EXPIRES=3600

7️⃣ MySQL Database Setup

Using your MySQL client (like DBeaver, MySQL Workbench, or CLI), run:

CREATE DATABASE python_rsa_auth; USE python_rsa_auth; CREATE TABLE users ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100), email VARCHAR(100) UNIQUE, password VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );

8️⃣ Generate RSA Keys (Using OpenSSL)

openssl genrsa -out storage/rsa/private.pem 2048 openssl rsa -in storage/rsa/private.pem -pubout -out storage/rsa/public.pem
  • public.pem → Used on frontend for password encryption.

  • private.pem → Used on backend for password decryption.

9️⃣ MySQL Connection (app/db/mysql.py)

import mysql.connector import os from dotenv import load_dotenv load_dotenv() db = mysql.connector.connect( host=os.getenv("DB_HOST"), user=os.getenv("DB_USER"), password=os.getenv("DB_PASSWORD"), database=os.getenv("DB_NAME") )

🔟 JWT Helper (app/utils/jwt.py)

from jose import jwt import os, time def sign_token(user): payload = { "id": user["id"], "email": user["email"], "exp": time.time() + int(os.getenv("JWT_EXPIRES")) } return jwt.encode(payload, os.getenv("JWT_SECRET"), algorithm="HS256") def verify_token(token): return jwt.decode(token, os.getenv("JWT_SECRET"), algorithms=["HS256"])

1️⃣1️⃣ RSA Helper (app/utils/rsa.py)

from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import padding import base64 with open("storage/rsa/private.pem", "rb") as f: private_key = serialization.load_pem_private_key( f.read(), password=None ) def decrypt_password(encrypted): decrypted = private_key.decrypt( base64.b64decode(encrypted), padding.PKCS1v15() ) return decrypted.decode()

1️⃣2️⃣ Register API (app/api/auth/register.py)

from fastapi import APIRouter import bcrypt from app.db.mysql import db router = APIRouter() @router.post("/register") def register(data: dict): cursor = db.cursor(dictionary=True) hashed = bcrypt.hashpw( data["password"].encode(), bcrypt.gensalt() ) cursor.execute( "INSERT INTO users (name, email, password) VALUES (%s, %s, %s)", (data["name"], data["email"], hashed) ) db.commit() return {"message": "User registered successfully"}

1️⃣3️⃣ Login API (app/api/auth/login.py)

from fastapi import APIRouter, HTTPException import bcrypt from app.utils.rsa import decrypt_password from app.utils.jwt import sign_token from app.db.mysql import db router = APIRouter() @router.post("/login") def login(data: dict): cursor = db.cursor(dictionary=True) cursor.execute("SELECT * FROM users WHERE email=%s", (data["email"],)) user = cursor.fetchone() if not user: raise HTTPException(status_code=401, detail="Invalid credentials") decrypted = decrypt_password(data["password"]) if not bcrypt.checkpw(decrypted.encode(), user["password"].encode()): raise HTTPException(status_code=401, detail="Invalid credentials") return { "user": { "id": user["id"], "name": user["name"], "email": user["email"] }, "access_token": sign_token(user), "token_type": "Bearer" }

1️⃣4️⃣ JWT Middleware (app/middleware/auth.py)

from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer from app.utils.jwt import verify_token security = HTTPBearer() def auth_required(credentials=Depends(security)): try: return verify_token(credentials.credentials) except: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token" )

1️⃣5️⃣ Profile API (app/api/auth/profile.py)

from fastapi import APIRouter, Depends from app.middleware.auth import auth_required router = APIRouter() @router.get("/profile") def profile(user=Depends(auth_required)): return user

1️⃣6️⃣ Logout API (app/api/auth/logout.py)

from fastapi import APIRouter router = APIRouter() @router.post("/logout") def logout(): return {"message": "Logout handled on client side"}

1️⃣7️⃣ User Management CRUD (app/api/users/user_crud.py)

from fastapi import APIRouter, HTTPException, Depends, Request from app.middleware.auth import auth_required from app.db.mysql import db router = APIRouter() @router.get("/users") def list_users(user=Depends(auth_required)): cursor = db.cursor(dictionary=True) cursor.execute("SELECT id, name, email, created_at FROM users") users = cursor.fetchall() return {"users": users} @router.get("/users/{user_id}") def get_user(user_id: int, user=Depends(auth_required)): cursor = db.cursor(dictionary=True) cursor.execute("SELECT id, name, email, created_at FROM users WHERE id=%s", (user_id,)) user_data = cursor.fetchone() if not user_data: raise HTTPException(status_code=404, detail="User not found") return user_data @router.put("/users/{user_id}") async def update_user(user_id: int, request: Request, user=Depends(auth_required)): data = await request.json() name = data.get("name") email = data.get("email") if not name or not email: raise HTTPException(status_code=400, detail="Name and Email are required") cursor = db.cursor() cursor.execute("UPDATE users SET name=%s, email=%s WHERE id=%s", (name, email, user_id)) db.commit() return {"message": "User updated successfully"} @router.delete("/users/{user_id}") def delete_user(user_id: int, user=Depends(auth_required)): cursor = db.cursor() cursor.execute("DELETE FROM users WHERE id=%s", (user_id,)) db.commit() return {"message": "User deleted successfully"}

1️⃣8️⃣ Main App (app/main.py)

from fastapi import FastAPI from app.api.auth import register, login, profile, logout from app.api.users import user_crud app = FastAPI() app.include_router(register.router, prefix="/api/auth") app.include_router(login.router, prefix="/api/auth") app.include_router(profile.router, prefix="/api/auth") app.include_router(logout.router, prefix="/api/auth") app.include_router(user_crud.router, prefix="/api")

▶️ Run Application

uvicorn app.main:app --reload

Base URL:
http://localhost:8000/api

🚀 Postman Testing

1. Register

  • Method: POST

  • URL: http://localhost:8000/api/auth/register

  • Body (JSON):

{ "name": "StarCode Kh", "email": "starcodekh@example.com", "password": "12345678" }

2. Login

  • Method: POST

  • URL: http://localhost:8000/api/auth/login

  • Body (JSON):

{ "email": "starcodekh@example.com", "password": "ENCRYPTED_PASSWORD_BASE64" }

Copy the returned "access_token".

3. Access Protected APIs

Set header:

  • Authorization: Bearer YOUR_ACCESS_TOKEN

4. Get Profile

  • Method: GET

  • URL: http://localhost:8000/api/auth/profile

5. List Users

  • Method: GET

  • URL: http://localhost:8000/api/users

6. Get Single User

  • Method: GET

  • URL: http://localhost:8000/api/users/{user_id}

7. Update User

  • Method: PUT

  • URL: http://localhost:8000/api/users/{user_id}

  • Body (JSON):

{ "name": "New Name", "email": "newemail@example.com" }

8. Delete User

  • Method: DELETE

  • URL: http://localhost:8000/api/users/{user_id}

✅ Production Best Practices

  • Use HTTPS everywhere.

  • Never commit private.pem.

  • Rotate RSA keys periodically.

  • Use short-lived JWT tokens.

  • Rate-limit login attempts.

  • Secure .env files properly.

🎯 Final Result

You now have a secure, production-ready Python FastAPI REST API with:

  • 🔐 RSA-encrypted passwords

  • 🔑 JWT Authentication

  • 🛡 MySQL backend

  • 👤 User management CRUD operations

🔗 Source Code

Want the full source code?
👉 Download the complete Python JWT example from my GitHub repository

Souy Soeng

Souy Soeng

Hi there 👋, I’m Soeng Souy (StarCode Kh)
-------------------------------------------
🌱 I’m currently creating a sample Laravel and React Vue Livewire
👯 I’m looking to collaborate on open-source PHP & JavaScript projects
💬 Ask me about Laravel, MySQL, or Flutter
⚡ Fun fact: I love turning ☕️ into code!

Post a Comment

CAN FEEDBACK
close