refactor: move security services to security dir

This commit is contained in:
swve 2023-03-25 00:24:20 +01:00
parent a0e749399f
commit 22464cead4
23 changed files with 21 additions and 21 deletions

0
src/security/__init__.py Normal file
View file

89
src/security/auth.py Normal file
View file

@ -0,0 +1,89 @@
from pydantic import BaseModel
from fastapi import Depends, FastAPI, APIRouter, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
from src.services.users.users import *
from fastapi import Cookie, FastAPI
from src.security.security import *
from fastapi_jwt_auth import AuthJWT
from fastapi_jwt_auth.exceptions import AuthJWTException
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
#### JWT Auth ####################################################
class Settings(BaseModel):
authjwt_secret_key: str = "secret"
authjwt_token_location = {"cookies"}
authjwt_cookie_csrf_protect = False
authjwt_access_token_expires = False # (pre-alpha only) # TODO: set to 1 hour
authjwt_cookie_samesite = "none"
authjwt_cookie_secure = True
@AuthJWT.load_config # type: ignore
def get_config():
return Settings()
#### JWT Auth ####################################################
#### Classes ####################################################
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
#### Classes ####################################################
async def authenticate_user(request: Request,email: str, password: str):
user = await security_get_user(request, email)
if not user:
return False
if not await security_verify_password(password, user.password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(request: Request, Authorize: AuthJWT = Depends()):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
Authorize.jwt_required()
username = Authorize.get_jwt_subject()
token_data = TokenData(username=username) # type: ignore
except JWTError:
raise credentials_exception
user = await security_get_user(request, email=token_data.username) # type: ignore # treated as an email
if user is None:
raise credentials_exception
return PublicUser(**user.dict())

125
src/security/security.py Normal file
View file

@ -0,0 +1,125 @@
from pprint import pprint
from fastapi import HTTPException, status, Request
from passlib.context import CryptContext
from passlib.hash import pbkdf2_sha256
from src.services.roles.schemas.roles import RoleInDB
from src.services.users.schemas.users import User, UserInDB
### 🔒 JWT ##############################################################
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ACCESS_TOKEN_EXPIRE_MINUTES = 30
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
### 🔒 JWT ##############################################################
### 🔒 Passwords Hashing ##############################################################
async def security_hash_password(password: str):
return pbkdf2_sha256.hash(password)
async def security_verify_password(plain_password: str, hashed_password: str):
return pbkdf2_sha256.verify(plain_password, hashed_password)
### 🔒 Passwords Hashing ##############################################################
### 🔒 Roles checking ##############################################################
async def verify_user_rights_with_roles(request: Request, action: str, user_id: str, element_id: str, element_org_id: str):
"""
Check if the user has the right to perform the action on the element
"""
roles = request.app.db["roles"]
users = request.app.db["users"]
# Check if the user is an admin
user: UserInDB = UserInDB(**await users.find_one({"user_id": user_id}))
# Organization roles verification
for org in user.orgs:
# TODO: Check if the org_id (user) is the same as the org_id (element)
if org.org_id == element_org_id:
return True
# Check if user is owner or reader of the organization
if org.org_role == ("owner" or "editor"):
return True
# If the user is not an owner or a editor, check if he has a role
# Get user roles
user_roles = user.roles
# TODO: Check if the org_id of the role is the same as the org_id of the element using find
if action != "create":
await check_user_role_org_with_element_org(request, element_id, user_roles)
# Check if user has the right role
element_type = await check_element_type(element_id)
for role_id in user_roles:
role = RoleInDB(**await roles.find_one({"role_id": role_id}))
if role.elements[element_type][f"action_{action}"]:
return True
# If no role is found, raise an error
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="You don't have the right to perform this action")
async def check_element_type(element_id):
"""
Check if the element is a course, a user, a house or a collection, by checking its prefix
"""
if element_id.startswith("course_"):
return "courses"
elif element_id.startswith("user_"):
return "users"
elif element_id.startswith("house_"):
return "houses"
elif element_id.startswith("org_"):
return "organizations"
elif element_id.startswith("coursechapter_"):
return "coursechapters"
elif element_id.startswith("collection_"):
return "collections"
elif element_id.startswith("activity_"):
return "activities"
else:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Issue verifying element nature")
async def check_user_role_org_with_element_org(request: Request, element_id: str, roles_list: list[str]):
element_type = await check_element_type(element_id)
element = request.app.db[element_type]
roles = request.app.db["roles"]
# get singular element type
singular_form_element = element_type[:-1]
element_type_id = singular_form_element + "_id"
element_org = await element.find_one({element_type_id: element_id})
for role_id in roles_list:
role = RoleInDB(**await roles.find_one({"role_id": role_id}))
if role.org_id == element_org["org_id"]:
return True
if role.org_id == "*":
return True
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="You don't have the right to perform this action")
### 🔒 Roles checking ##############################################################