Now that we have all the security flow, let's make the application actually secure, using JWT tokens and secure password hashing.
This code is something you can actually use in your application, save the password hashes in your database, etc.
We are going to start from where we left in the previous chapter and increment it.
About JWT¶
JWT means "JSON Web Tokens".
It's a standard to codify a JSON object in a long dense string without spaces. It looks like this:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
It is not encrypted, so, anyone could recover the information from the contents.
But it's signed. So, when you receive a token that you emitted, you can verify that you actually emitted it.
That way, you can create a token with an expiration of, let's say, 1 week. And then when the user comes back the next day with the token, you know that user is still logged in to your system.
After a week, the token will be expired and the user will not be authorized and will have to sign in again to get a new token. And if the user (or a third party) tried to modify the token to change the expiration, you would be able to discover it, because the signatures would not match.
If you want to play with JWT tokens and see how they work, check https://jwt.io.
Install python-jose
¶
We need to install python-jose
to generate and verify the JWT tokens in Python:
$ pip install "python-jose[cryptography]"---> 100%
Python-jose requires a cryptographic backend as an extra.
Here we are using the recommended one: pyca/cryptography.
Tip
This tutorial previously used PyJWT.
But it was updated to use Python-jose instead as it provides all the features from PyJWT plus some extras that you might need later when building integrations with other tools.
Password hashing¶
"Hashing" means converting some content (a password in this case) into a sequence of bytes (just a string) that looks like gibberish.
Whenever you pass exactly the same content (exactly the same password) you get exactly the same gibberish.
But you cannot convert from the gibberish back to the password.
Why use password hashing¶
If your database is stolen, the thief won't have your users' plaintext passwords, only the hashes.
So, the thief won't be able to try to use that password in another system (as many users use the same password everywhere, this would be dangerous).
Install passlib
¶
PassLib is a great Python package to handle password hashes.
It supports many secure hashing algorithms and utilities to work with them.
The recommended algorithm is "Bcrypt".
So, install PassLib with Bcrypt:
$ pip install "passlib[bcrypt]"---> 100%
Tip
With passlib
, you could even configure it to be able to read passwords created by Django, a Flask security plug-in or many others.
So, you would be able to, for example, share the same data from a Django application in a database with a FastAPI application. Or gradually migrate a Django application using the same database.
And your users would be able to login from your Django app or from your FastAPI app, at the same time.
Hash and verify the passwords¶
Import the tools we need from passlib
.
Create a PassLib "context". This is what will be used to hash and verify passwords.
Tip
The PassLib context also has functionality to use different hashing algorithms, including deprecated old ones only to allow verifying them, etc.
For example, you could use it to read and verify passwords generated by another system (like Django) but hash any new passwords with a different algorithm like Bcrypt.
And be compatible with all of them at the same time.
Create a utility function to hash a password coming from the user.
And another utility to verify if a received password matches the hash stored.
And another one to authenticate and return a user.
from datetime import datetime, timedeltafrom typing import Unionfrom fastapi import Depends, FastAPI, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormfrom jose import JWTError, jwtfrom passlib.context import CryptContextfrom pydantic import BaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 30fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, }}class Token(BaseModel): access_token: str token_type: strclass TokenData(BaseModel): username: Union[str, None] = Noneclass User(BaseModel): username: str email: Union[str, None] = None full_name: Union[str, None] = None disabled: Union[bool, None] = Noneclass UserInDB(User): hashed_password: strpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")app = FastAPI()def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)def get_password_hash(password): return pwd_context.hash(password)def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict)def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return userdef create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwtasync def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return userasync def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user@app.post("/token", response_model=Token)async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}@app.get("/users/me/", response_model=User)async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user@app.get("/users/me/items/")async def read_own_items(current_user: User = Depends(get_current_active_user)): return [{"item_id": "Foo", "owner": current_user.username}]
from datetime import datetime, timedeltafrom fastapi import Depends, FastAPI, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormfrom jose import JWTError, jwtfrom passlib.context import CryptContextfrom pydantic import BaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 30fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, }}class Token(BaseModel): access_token: str token_type: strclass TokenData(BaseModel): username: str | None = Noneclass User(BaseModel): username: str email: str | None = None full_name: str | None = None disabled: bool | None = Noneclass UserInDB(User): hashed_password: strpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")app = FastAPI()def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)def get_password_hash(password): return pwd_context.hash(password)def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict)def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return userdef create_access_token(data: dict, expires_delta: timedelta | None = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwtasync def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return userasync def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user@app.post("/token", response_model=Token)async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}@app.get("/users/me/", response_model=User)async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user@app.get("/users/me/items/")async def read_own_items(current_user: User = Depends(get_current_active_user)): return [{"item_id": "Foo", "owner": current_user.username}]
Note
If you check the new (fake) database fake_users_db
, you will see how the hashed password looks like now: "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW"
.
Handle JWT tokens¶
Import the modules installed.
Create a random secret key that will be used to sign the JWT tokens.
To generate a secure random secret key use the command:
$ openssl rand -hex 3209d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7
And copy the output to the variable SECRET_KEY
(don't use the one in the example).
Create a variable ALGORITHM
with the algorithm used to sign the JWT token and set it to "HS256"
.
Create a variable for the expiration of the token.
Define a Pydantic Model that will be used in the token endpoint for the response.
Create a utility function to generate a new access token.
from datetime import datetime, timedeltafrom typing import Unionfrom fastapi import Depends, FastAPI, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormfrom jose import JWTError, jwtfrom passlib.context import CryptContextfrom pydantic import BaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 30fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, }}class Token(BaseModel): access_token: str token_type: strclass TokenData(BaseModel): username: Union[str, None] = Noneclass User(BaseModel): username: str email: Union[str, None] = None full_name: Union[str, None] = None disabled: Union[bool, None] = Noneclass UserInDB(User): hashed_password: strpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")app = FastAPI()def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)def get_password_hash(password): return pwd_context.hash(password)def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict)def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return userdef create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwtasync def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return userasync def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user@app.post("/token", response_model=Token)async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}@app.get("/users/me/", response_model=User)async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user@app.get("/users/me/items/")async def read_own_items(current_user: User = Depends(get_current_active_user)): return [{"item_id": "Foo", "owner": current_user.username}]
from datetime import datetime, timedeltafrom fastapi import Depends, FastAPI, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormfrom jose import JWTError, jwtfrom passlib.context import CryptContextfrom pydantic import BaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 30fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, }}class Token(BaseModel): access_token: str token_type: strclass TokenData(BaseModel): username: str | None = Noneclass User(BaseModel): username: str email: str | None = None full_name: str | None = None disabled: bool | None = Noneclass UserInDB(User): hashed_password: strpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")app = FastAPI()def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)def get_password_hash(password): return pwd_context.hash(password)def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict)def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return userdef create_access_token(data: dict, expires_delta: timedelta | None = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwtasync def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return userasync def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user@app.post("/token", response_model=Token)async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}@app.get("/users/me/", response_model=User)async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user@app.get("/users/me/items/")async def read_own_items(current_user: User = Depends(get_current_active_user)): return [{"item_id": "Foo", "owner": current_user.username}]
Update the dependencies¶
Update get_current_user
to receive the same token as before, but this time, using JWT tokens.
Decode the received token, verify it, and return the current user.
If the token is invalid, return an HTTP error right away.
from datetime import datetime, timedeltafrom typing import Unionfrom fastapi import Depends, FastAPI, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormfrom jose import JWTError, jwtfrom passlib.context import CryptContextfrom pydantic import BaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 30fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, }}class Token(BaseModel): access_token: str token_type: strclass TokenData(BaseModel): username: Union[str, None] = Noneclass User(BaseModel): username: str email: Union[str, None] = None full_name: Union[str, None] = None disabled: Union[bool, None] = Noneclass UserInDB(User): hashed_password: strpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")app = FastAPI()def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)def get_password_hash(password): return pwd_context.hash(password)def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict)def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return userdef create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwtasync def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return userasync def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user@app.post("/token", response_model=Token)async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}@app.get("/users/me/", response_model=User)async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user@app.get("/users/me/items/")async def read_own_items(current_user: User = Depends(get_current_active_user)): return [{"item_id": "Foo", "owner": current_user.username}]
from datetime import datetime, timedeltafrom fastapi import Depends, FastAPI, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormfrom jose import JWTError, jwtfrom passlib.context import CryptContextfrom pydantic import BaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 30fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, }}class Token(BaseModel): access_token: str token_type: strclass TokenData(BaseModel): username: str | None = Noneclass User(BaseModel): username: str email: str | None = None full_name: str | None = None disabled: bool | None = Noneclass UserInDB(User): hashed_password: strpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")app = FastAPI()def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)def get_password_hash(password): return pwd_context.hash(password)def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict)def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return userdef create_access_token(data: dict, expires_delta: timedelta | None = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwtasync def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return userasync def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user@app.post("/token", response_model=Token)async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}@app.get("/users/me/", response_model=User)async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user@app.get("/users/me/items/")async def read_own_items(current_user: User = Depends(get_current_active_user)): return [{"item_id": "Foo", "owner": current_user.username}]
Update the /token
path operation¶
Create a timedelta
with the expiration time of the token.
Create a real JWT access token and return it.
from datetime import datetime, timedeltafrom typing import Unionfrom fastapi import Depends, FastAPI, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormfrom jose import JWTError, jwtfrom passlib.context import CryptContextfrom pydantic import BaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 30fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, }}class Token(BaseModel): access_token: str token_type: strclass TokenData(BaseModel): username: Union[str, None] = Noneclass User(BaseModel): username: str email: Union[str, None] = None full_name: Union[str, None] = None disabled: Union[bool, None] = Noneclass UserInDB(User): hashed_password: strpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")app = FastAPI()def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)def get_password_hash(password): return pwd_context.hash(password)def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict)def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return userdef create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwtasync def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return userasync def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user@app.post("/token", response_model=Token)async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}@app.get("/users/me/", response_model=User)async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user@app.get("/users/me/items/")async def read_own_items(current_user: User = Depends(get_current_active_user)): return [{"item_id": "Foo", "owner": current_user.username}]
from datetime import datetime, timedeltafrom fastapi import Depends, FastAPI, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormfrom jose import JWTError, jwtfrom passlib.context import CryptContextfrom pydantic import BaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 30fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, }}class Token(BaseModel): access_token: str token_type: strclass TokenData(BaseModel): username: str | None = Noneclass User(BaseModel): username: str email: str | None = None full_name: str | None = None disabled: bool | None = Noneclass UserInDB(User): hashed_password: strpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")app = FastAPI()def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)def get_password_hash(password): return pwd_context.hash(password)def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict)def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return userdef create_access_token(data: dict, expires_delta: timedelta | None = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwtasync def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return userasync def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user@app.post("/token", response_model=Token)async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}@app.get("/users/me/", response_model=User)async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user@app.get("/users/me/items/")async def read_own_items(current_user: User = Depends(get_current_active_user)): return [{"item_id": "Foo", "owner": current_user.username}]
Technical details about the JWT "subject" sub
¶
The JWT specification says that there's a key sub
, with the subject of the token.
It's optional to use it, but that's where you would put the user's identification, so we are using it here.
JWT might be used for other things apart from identifying a user and allowing them to perform operations directly on your API.
For example, you could identify a "car" or a "blog post".
Then you could add permissions about that entity, like "drive" (for the car) or "edit" (for the blog).
And then, you could give that JWT token to a user (or bot), and they could use it to perform those actions (drive the car, or edit the blog post) without even needing to have an account, just with the JWT token your API generated for that.
Using these ideas, JWT can be used for way more sophisticated scenarios.
In those cases, several of those entities could have the same ID, let's say foo
(a user foo
, a car foo
, and a blog post foo
).
So, to avoid ID collisions, when creating the JWT token for the user, you could prefix the value of the sub
key, e.g. with username:
. So, in this example, the value of sub
could have been: username:johndoe
.
The important thing to have in mind is that the sub
key should have a unique identifier across the entire application, and it should be a string.
Check it¶
Run the server and go to the docs: http://127.0.0.1:8000/docs.
You'll see the user interface like:
Authorize the application the same way as before.
Using the credentials:
Username: johndoe
Password: secret
Check
Notice that nowhere in the code is the plaintext password "secret
", we only have the hashed version.
Call the endpoint /users/me/
, you will get the response as:
{ "username": "johndoe", "email": "johndoe@example.com", "full_name": "John Doe", "disabled": false}
If you open the developer tools, you could see how the data sent only includes the token, the password is only sent in the first request to authenticate the user and get that access token, but not afterwards:
Note
Notice the header Authorization
, with a value that starts with Bearer
.
Advanced usage with scopes
¶
OAuth2 has the notion of "scopes".
You can use them to add a specific set of permissions to a JWT token.
Then you can give this token to a user directly or a third party, to interact with your API with a set of restrictions.
You can learn how to use them and how they are integrated into FastAPI later in the Advanced User Guide.
Recap¶
With what you have seen up to now, you can set up a secure FastAPI application using standards like OAuth2 and JWT.
In almost any framework handling the security becomes a rather complex subject quite quickly.
Many packages that simplify it a lot have to make many compromises with the data model, database, and available features. And some of these packages that simplify things too much actually have security flaws underneath.
FastAPI doesn't make any compromise with any database, data model or tool.
It gives you all the flexibility to choose the ones that fit your project the best.
And you can use directly many well maintained and widely used packages like passlib
and python-jose
, because FastAPI doesn't require any complex mechanisms to integrate external packages.
But it provides you the tools to simplify the process as much as possible without compromising flexibility, robustness, or security.
And you can use and implement secure, standard protocols, like OAuth2 in a relatively simple way.
You can learn more in the Advanced User Guide about how to use OAuth2 "scopes", for a more fine-grained permission system, following these same standards. OAuth2 with scopes is the mechanism used by many big authentication providers, like Facebook, Google, GitHub, Microsoft, Twitter, etc. to authorize third party applications to interact with their APIs on behalf of their users.
FAQs
Can we use JWT with OAuth2? ›
JWTs can be used as OAuth 2.0 Bearer Tokens to encode all relevant parts of an access token into the access token itself instead of having to store them in a database.
Is bearer token same as JWT token? ›JWT is a particular type of token. JWT can be used as an OAuth Bearer token. A useful resource for reference can be found at https://auth0.com/docs/tokens.
How do I send JWT as bearer token? ›- Construct the JWT header.
- Base64url encode the JWT Header.
- Construct a JSON claim set.
- Base64url encode the claim set.
- Concatenate the header and claim set.
- Create a signature of the payload.
- Concatenate the payload and signature. Exchange the JWT for a bearer token.
- Send the JWT.
In essence, a JSON Web Token (JWT) is a bearer token. It's a particular implementation which has been specified and standardised. JWT in particular uses cryptography to encode a timestamp and some other parameters. This way, you can check if it's valid by just decrypting it, without hitting a DB.
What is the difference between OAuth 2.0 and JWT? ›JWT token vs oauth token: JWT defines a token format while OAuth deals in defining authorization protocols. JWT is simple and easy to learn from the initial stage while OAuth is complex. OAuth uses both client-side and server-side storage while JWT must use only client-side storage. JWT has limited scope and use cases.
Why JWT is not good for sessions? ›Although JWT does eliminate the database lookup, it introduces security issues and other complexities while doing so. Security is binary—either it's secure or it's not. Thus making it dangerous to use JWT for user sessions.
Does OAuth2 use bearer token? ›Bearer Tokens are the predominant type of access token used with OAuth 2.0. A Bearer Token is an opaque string, not intended to have any meaning to clients using it. Some servers will issue tokens that are a short string of hexadecimal characters, while others may use structured tokens such as JSON Web Tokens.
How does JWT bearer token work? ›In short, JWTs are used as a secure way to authenticate users and share information. Typically, a private key, or secret, is used by the issuer to sign the JWT. The receiver of the JWT will verify the signature to ensure that the token hasn't been altered after it was signed by the issuer.
Why does JWT start with bearer? ›The word Bearer wants to provide the authorization scheme. since there are Different Authorization Schemes like: Basic use for http-basic-Authentication.
What is the difference between JWT and bearer header? ›Short answer. A JWT is a convenient way to encode and verify claims. A Bearer Token is just a string, potentially arbitrary, that is used for authorization.
How do I create a bearer token using username and password? ›
Invoking the Token API to generate tokens
Access the Token API by using a REST client such as cURL, with the following parameters. payload - "grant_type=password&username=<username>&password=<password>&scope=<scope>" . Replace the <username> and <password> values as appropriate. Tip: <scope> is optional.
Bearer tokens enable requests to authenticate using an access key, such as a JSON Web Token (JWT). The token is a text string, included in the request header. In the request Authorization tab, select Bearer Token from the Type dropdown list. In the Token field, enter your API key value.
Which JWT algorithm is most secure? ›The option with the best security and performance is EdDSA, though ES256 (The Elliptic Curve Digital Signature Algorithm (ECDSA) using P-256 and SHA-256) is also a good choice. The most widely used option, supported by most technology stacks, is RS256 (RSASSA-PKCS1-v1_5 using SHA-256).
Is JWT better than OAuth? ›OAuth2 vs JWT, Both systems have their particular use cases and advantages. While JWT is excellent for API authentication and server-to-server authorization, OAuth 2.0 takes the lead in session management.
How do I make my JWT token more secure? ›There are two critical steps in using JWT securely in a web application: 1) send them over an encrypted channel, and 2) verify the signature immediately upon receiving it. The asymmetric nature of public key cryptography makes JWT signature verification possible.
Is JWT good for authentication or authorization? ›To authenticate a user, a client application must send a JSON Web Token (JWT) in the authorization header of the HTTP request to your backend API. API Gateway validates the token on behalf of your API, so you don't have to add any code in your API to process the authentication.
What are the disadvantages of using JWT? ›No way to log out or invalidate sessions for users. Moreover, there is no way for a user to disable their sessions across multiple devices. Since the tokens are generated and verified on the fly, we can't have access to the different logged-in clients which can pose problems when you need to identify the devices.
Is JWT good for API authentication? ›Any API that requires authentication can easily switch over to JWT's authorization. With JWT authorization, you get a user-based authentication. Once the user is authenticated, the user gets a secure token that they can use on all systems.
What is better than JWT? ›JSON web token (JWT) is the most popular token-based authentication. However, many security threats have been exposed in recent years, causing people to migrate to other types of tokens. Platform Agnostic Security Token or PASETO is one such token which is being accepted as the best secured alternative for JWT.
What is the lifespan of JWT token? ›The JWT access token is only valid for a finite period of time. Using an expired JWT will cause operations to fail. As you saw above, we are told how long a token is valid through expires_in . This value is normally 1200 seconds or 20 minutes.
What should you not store in a JWT? ›
To reiterate, whatever you do, don't store a JWT in local storage (or session storage). If any of the third-party scripts you include in your page is compromised, it can access all your users' tokens. To keep them secure, you should always store JWTs inside an httpOnly cookie.
What is the difference between OAuth and OAuth2? ›OAuth 1.0 needs to generate a signature on every API call to the server resource and that should be matched with the signature generated at the receiving endpoint in order to have access for the client. OAuth 2.0 do not need to generate signatures. It uses TLS/SSL (HTTPS) for communication.
Is bearer token authentication or Authorization? ›Bearer authentication (also called token authentication) is an HTTP authentication scheme that involves security tokens called bearer tokens. The name “Bearer authentication” can be understood as “give access to the bearer of this token.”
Should you encrypt bearer token? ›It does not usually make sense to encrypt access tokens, since doing so would not prevent an attacker from sending one to an API. The confidentiality of access tokens is instead ensured by returning them to clients in an opaque unreadable format, as described in the Phantom Token Pattern.
How do I pass a bearer token in REST API? ›- Step 1: Create authorization request link.
- Step 2: Request user for authorization.
- Step 3: Exchange authorization code with access tokenpost.
- Step 4: Use access token for REST API requests.
- Step 5: Get new access token using refresh token.
Using a bearer token does not require a bearer to prove possession of cryptographic key material (proof-of-possession). Access tokens are used in token-based authentication to allow an application to access an API.
What is the difference between access token and JWT token? ›The OAuth access token is different from the JWT in the sense that it's an opaque token. The access token's purpose is so that the client application can query Google to ask for more information about the signed in user. email: The end user's email ID. email_verified: Whether or not the user has verified their email.
Can I pass bearer token in URL? ›Don't pass bearer tokens in page URLs: Bearer tokens SHOULD NOT be passed in page URLs (for example as query string parameters). Instead, bearer tokens SHOULD be passed in HTTP message headers or message bodies for which confidentiality measures are taken.
What are the 3 parts of JWT token? ›Figure 1 shows that a JWT consists of three parts: a header, payload, and signature.
Where should bearer tokens be stored? ›The usual practice is to store access tokens in the browser's session storage or local storage. This is because we need to persist access tokens across page reloads, to prevent the need to re-authenticate on every reload. This provides a better user experience.
Is JWT encryption or hashing? ›
The issuer generates a hash of the JWT header and payload using SHA256, and encrypts it using the RSA encryption algorithm, and their private key.
Is JWT front end or backend? ›The restrictions could be anything ranging from the availability of a user to the role of the user. This is referred to as authorization. In this post, I am going to show you how to implement authorization with a frontend (React) and a backend (Node JS) using JSON Web Token (JWT).
Should JWT be in frontend or backend? ›You should implement it on both backend / frontend. The Front end should have a UI to get the login / password entered by the user.
How do you automate a bearer token? ›- Authorization Tab. In the Authorization Tab, set the Type to Bearer Token and for the value of the Token put your variable, I used {{BEARERTOKEN}}
- Pre-request Script Tab. In the Pre-request Script Tab, this is where the magic happens. ...
- Requests Setup.
Note: Bearer tokens in authorization headers are not sent by default. If you require a bearer token token to be sent, request it when registering with Google. The string "AbCdEf123456" in the example above is the bearer authorization token. This is a cryptographic token produced by Google.
How do I post a request with bearer token? ›To post JSON with a Bearer Token Authorization header, you need to make an HTTP POST request, provide your Bearer Token with an "Authorization: Bearer {token}" HTTP header, and give the JSON data in the body of the POST message.
Which format does OAuth 2.0 require tokens to use? ›Bearer Tokens are the predominant type of access token used with OAuth 2.0. A Bearer Token is an opaque string, not intended to have any meaning to clients using it. Some servers will issue tokens that are a short string of hexadecimal characters, while others may use structured tokens such as JSON Web Tokens.
Can you use OAuth and SAML together? ›Can you use both SAML and OAuth? Yes, you can. The Client can get a SAML assertion from the IdP and request the Authorization Server to grant access to the Resource Server. The Authorization Server can then verify the identity of the user and pass back an OAuth token in the HTTP header to access the protected resource.
Can we use JWT token for authentication? ›To authenticate a user, a client application must send a JSON Web Token (JWT) in the authorization header of the HTTP request to your backend API. API Gateway validates the token on behalf of your API, so you don't have to add any code in your API to process the authentication.
Can OAuth 2.0 be used for authentication? ›OAuth 2.0 is an authorization protocol and NOT an authentication protocol. As such, it is designed primarily as a means of granting access to a set of resources, for example, remote APIs or user data. OAuth 2.0 uses Access Tokens.