feat: roles init

This commit is contained in:
swve 2023-11-13 21:23:44 +01:00
parent a50fc67104
commit aa0eda5682
5 changed files with 113 additions and 109 deletions

View file

@ -1,127 +1,99 @@
from typing import Literal
from uuid import uuid4
from sqlmodel import Session, select
from src.db.roles import Role, RoleCreate, RoleUpdate
from src.security.rbac.rbac import authorization_verify_if_user_is_anon
from src.services.roles.schemas.roles import Role, RoleInDB
from src.services.users.schemas.users import PublicUser
from fastapi import HTTPException, status, Request
from datetime import datetime
async def create_role(request: Request, role_object: Role, current_user: PublicUser):
roles = request.app.db["roles"]
async def create_role(
request: Request,
db_session: Session,
role_object: RoleCreate,
current_user: PublicUser,
):
role = Role.from_orm(role_object)
await verify_user_permissions_on_roles(request, current_user, "create", None)
# Complete the role object
role.role_uuid = f"role_{uuid4()}"
role.creation_date = str(datetime.now())
role.update_date = str(datetime.now())
# create the role object in the database and return the object
role_id = "role_" + str(uuid4())
role = RoleInDB(
role_id=role_id,
created_at=str(datetime.now()),
updated_at=str(datetime.now()),
**role_object.dict()
)
await roles.insert_one(role.dict())
db_session.add(role)
db_session.commit()
db_session.refresh(role)
return role
async def read_role(request: Request, role_id: str, current_user: PublicUser):
roles = request.app.db["roles"]
async def read_role(
request: Request, db_session: Session, role_id: str, current_user: PublicUser
):
statement = select(Role).where(Role.id == role_id)
result = db_session.exec(statement)
await verify_user_permissions_on_roles(request, current_user, "read", role_id)
role = result.first()
role = RoleInDB(**await roles.find_one({"role_id": role_id}))
if not role:
raise HTTPException(
status_code=404,
detail="Role not found",
)
return role
async def update_role(
request: Request, role_id: str, role_object: Role, current_user: PublicUser
):
roles = request.app.db["roles"]
await verify_user_permissions_on_roles(request, current_user, "update", role_id)
role_object.updated_at = datetime.now()
# Update the role object in the database and return the object
updated_role = RoleInDB(
**await roles.find_one_and_update(
{"role_id": role_id}, {"$set": role_object.dict()}, return_document=True
)
)
return updated_role
async def delete_role(request: Request, role_id: str, current_user: PublicUser):
roles = request.app.db["roles"]
await verify_user_permissions_on_roles(request, current_user, "delete", role_id)
# Delete the role object in the database and return the object
deleted_role = RoleInDB(**await roles.find_one_and_delete({"role_id": role_id}))
return deleted_role
#### Security ####################################################
async def verify_user_permissions_on_roles(
request: Request,
db_session: Session,
role_object: RoleUpdate,
current_user: PublicUser,
action: Literal["create", "read", "update", "delete"],
role_id: str | None,
):
request.app.db["users"]
roles = request.app.db["roles"]
statement = select(Role).where(Role.id == role_object.role_id)
result = db_session.exec(statement)
# If current user is not authenticated
role = result.first()
if not current_user:
if not role:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Roles : Not authenticated"
status_code=404,
detail="Role not found",
)
await authorization_verify_if_user_is_anon(current_user.user_id)
# Complete the role object
role.update_date = str(datetime.now())
if action == "create":
if "owner" in [org.org_role for org in current_user.orgs]:
return True
# Remove the role_id from the role_object
del role_object.role_id
if role_id is not None:
role = RoleInDB(**await roles.find_one({"role_id": role_id}))
# Update only the fields that were passed in
for var, value in vars(role_object).items():
if value is not None:
setattr(role, var, value)
if action == "read":
if "owner" in [org.org_role for org in current_user.orgs]:
return True
db_session.add(role)
db_session.commit()
db_session.refresh(role)
for org in current_user.orgs:
if org.org_id == role.org_id:
return True
if action == "update":
for org in current_user.orgs:
# If the user is an owner of the organization
if org.org_id == role.org_id:
if org.org_role == "owner" or org.org_role == "editor":
return True
# Can't update a global role
if role.org_id == "*":
return False
if action == "delete":
for org in current_user.orgs:
# If the user is an owner of the organization
if org.org_id == role.org_id:
if org.org_role == "owner":
return True
# Can't delete a global role
if role.org_id == "*":
return False
return role
#### Security ####################################################
async def delete_role(
request: Request, db_session: Session, role_id: str, current_user: PublicUser
):
statement = select(Role).where(Role.id == role_id)
result = db_session.exec(statement)
role = result.first()
if not role:
raise HTTPException(
status_code=404,
detail="Role not found",
)
db_session.delete(role)
db_session.commit()
return "Role deleted"

View file

@ -164,6 +164,8 @@ async def update_user(
for key, value in user_data.items():
setattr(user, key, value)
user.update_date = str(datetime.now())
# Update user in database
db_session.add(user)
db_session.commit()
@ -197,6 +199,8 @@ async def update_user_password(
# Update user
user.password = await security_hash_password(form.new_password)
user.update_date = str(datetime.now())
# Update user in database
db_session.add(user)