feat: init auth across the app

This commit is contained in:
swve 2022-09-24 13:15:40 +02:00
parent 9479a4b127
commit 3b85a73ec1
12 changed files with 204 additions and 29 deletions

View file

@ -1,3 +1,4 @@
from urllib.request import Request
from fastapi import Depends, APIRouter, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from src.services.auth import *
@ -5,9 +6,10 @@ from src.services.users import *
from datetime import timedelta
from fastapi.responses import JSONResponse
router = APIRouter()
# DEPRECATED
@router.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
"""
@ -29,3 +31,44 @@ async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(
response.set_cookie(key="user_token", value=access_token, httponly=True, expires="3600",secure=True)
return response
@router.post('/refresh')
def refresh(Authorize: AuthJWT = Depends()):
"""
The jwt_refresh_token_required() function insures a valid refresh
token is present in the request before running any code below that function.
we can use the get_jwt_subject() function to get the subject of the refresh
token, and use the create_access_token() function again to make a new access token
"""
Authorize.jwt_refresh_token_required()
current_user = Authorize.get_jwt_subject()
new_access_token = Authorize.create_access_token(subject=current_user)
return {"access_token": new_access_token}
@router.post('/login')
async def login(Authorize: AuthJWT = Depends(), form_data: OAuth2PasswordRequestForm = Depends()):
user = await authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect Email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = Authorize.create_access_token(subject=form_data.username)
refresh_token = Authorize.create_refresh_token(subject=form_data.username)
Authorize.set_refresh_cookies(refresh_token)
return {"access_token": access_token , "refresh_token": refresh_token}
@router.delete('/logout')
def logout(Authorize: AuthJWT = Depends()):
"""
Because the JWT are stored in an httponly cookie now, we cannot
log the user out by simply deleting the cookies in the frontend.
We need the backend to send us a response to delete the cookies.
"""
Authorize.jwt_required()
Authorize.unset_jwt_cookies()
return {"msg":"Successfully logout"}

View file

@ -9,7 +9,7 @@ oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token")
router = APIRouter()
# DEPRECATED
@router.get("/me")
async def api_get_current_user(current_user: User = Depends(get_current_user)):
"""
@ -17,6 +17,13 @@ async def api_get_current_user(current_user: User = Depends(get_current_user)):
"""
return current_user.dict()
@router.get("/profile")
async def api_get_current_user(current_user: User = Depends(get_current_user_jwt)):
"""
Get current user
"""
return current_user.dict()
@router.get("/username/{username}")
async def api_get_user_by_username(username: str):

View file

@ -5,9 +5,28 @@ from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
from src.services.users import *
from fastapi import Cookie, FastAPI
from src.services.security import *
from fastapi_jwt_auth import AuthJWT
from fastapi_jwt_auth.exceptions import AuthJWTException
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
#### JWT Auth ####################################################
class Settings(BaseModel):
authjwt_secret_key: str = "secret"
authjwt_token_location = {"cookies", "headers"}
authjwt_cookie_csrf_protect = False
@AuthJWT.load_config
def get_config():
return Settings()
#### JWT Auth ####################################################
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token")
#### Classes ####################################################
@ -42,7 +61,7 @@ def create_access_token(data: dict, expires_delta: timedelta | None = None):
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# DEPRECATED
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@ -57,7 +76,28 @@ async def get_current_user(token: str = Depends(oauth2_scheme)):
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = await security_get_user(username=token_data.username)
user = await security_get_user(email=token_data.username)
if user is None:
raise credentials_exception
return PublicUser(**user.dict())
async def get_current_user_jwt(Authorize: AuthJWT = Depends()):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
Authorize.jwt_required()
username = Authorize.get_jwt_subject()
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = await security_get_user(email=token_data.username) # treated as an email
if user is None:
raise credentials_exception
return PublicUser(**user.dict())