feat: refactor RBAC authorization functions to include usergroups

This commit is contained in:
swve 2024-03-26 19:56:14 +00:00
parent e1b3b62e40
commit 0df250c729
14 changed files with 392 additions and 37 deletions

View file

@ -1,10 +1,13 @@
from datetime import datetime
import logging
from uuid import uuid4
from fastapi import HTTPException, Request
from sqlmodel import Session, select
from src.db.usergroup_ressources import UserGroupRessource
from src.db.usergroup_user import UserGroupUser
from src.db.organizations import Organization
from src.db.usergroups import UserGroup, UserGroupCreate, UserGroupRead, UserGroupUpdate
from src.db.users import AnonymousUser, PublicUser
from src.db.users import AnonymousUser, PublicUser, User
async def create_usergroup(
@ -112,3 +115,151 @@ async def delete_usergroup_by_id(
db_session.commit()
return "UserGroup deleted successfully"
async def add_users_to_usergroup(
request: Request,
db_session: Session,
current_user: PublicUser | AnonymousUser,
usergroup_id: int,
user_ids: str,
) -> str:
statement = select(UserGroup).where(UserGroup.id == usergroup_id)
usergroup = db_session.exec(statement).first()
if not usergroup:
raise HTTPException(
status_code=404,
detail="UserGroup not found",
)
user_ids_array = user_ids.split(",")
for user_id in user_ids_array:
statement = select(User).where(User.id == user_id)
user = db_session.exec(statement).first()
if user:
# Add user to UserGroup
if user.id is not None:
usergroup_obj = UserGroupUser(
usergroup_id=usergroup_id,
user_id=user.id,
org_id=usergroup.org_id,
creation_date=str(datetime.now()),
update_date=str(datetime.now()),
)
db_session.add(usergroup_obj)
db_session.commit()
db_session.refresh(usergroup_obj)
else:
logging.error(f"User with id {user_id} not found")
return "Users added to UserGroup successfully"
async def remove_users_from_usergroup(
request: Request,
db_session: Session,
current_user: PublicUser | AnonymousUser,
usergroup_id: int,
user_ids: str,
) -> str:
statement = select(UserGroup).where(UserGroup.id == usergroup_id)
usergroup = db_session.exec(statement).first()
if not usergroup:
raise HTTPException(
status_code=404,
detail="UserGroup not found",
)
user_ids_array = user_ids.split(",")
for user_id in user_ids_array:
statement = select(UserGroupUser).where(UserGroupUser.user_id == user_id)
usergroup_user = db_session.exec(statement).first()
if usergroup_user:
db_session.delete(usergroup_user)
db_session.commit()
else:
logging.error(f"User with id {user_id} not found in UserGroup")
return "Users removed from UserGroup successfully"
async def add_ressources_to_usergroup(
request: Request,
db_session: Session,
current_user: PublicUser | AnonymousUser,
usergroup_id: int,
ressources_uuids: str,
) -> str:
statement = select(UserGroup).where(UserGroup.id == usergroup_id)
usergroup = db_session.exec(statement).first()
if not usergroup:
raise HTTPException(
status_code=404,
detail="UserGroup not found",
)
ressources_uuids_array = ressources_uuids.split(",")
for ressource_uuid in ressources_uuids_array:
# TODO : Find a way to check if ressource exists
usergroup_obj = UserGroupRessource(
usergroup_id=usergroup_id,
ressource_uuid=ressource_uuid,
org_id=usergroup.org_id,
creation_date=str(datetime.now()),
update_date=str(datetime.now()),
)
db_session.add(usergroup_obj)
db_session.commit()
db_session.refresh(usergroup_obj)
return "Ressources added to UserGroup successfully"
async def remove_ressources_from_usergroup(
request: Request,
db_session: Session,
current_user: PublicUser | AnonymousUser,
usergroup_id: int,
ressources_uuids: str,
) -> str:
statement = select(UserGroup).where(UserGroup.id == usergroup_id)
usergroup = db_session.exec(statement).first()
if not usergroup:
raise HTTPException(
status_code=404,
detail="UserGroup not found",
)
ressources_uuids_array = ressources_uuids.split(",")
for ressource_uuid in ressources_uuids_array:
statement = select(UserGroupRessource).where(
UserGroupRessource.ressource_uuid == ressource_uuid
)
usergroup_ressource = db_session.exec(statement).first()
if usergroup_ressource:
db_session.delete(usergroup_ressource)
db_session.commit()
else:
logging.error(
f"Ressource with uuid {ressource_uuid} not found in UserGroup"
)
return "Ressources removed from UserGroup successfully"

View file

@ -10,7 +10,7 @@ from src.services.orgs.invites import get_invite_code
from src.services.users.avatars import upload_avatar
from src.db.roles import Role, RoleRead
from src.security.rbac.rbac import (
authorization_verify_based_on_roles_and_authorship,
authorization_verify_based_on_roles_and_authorship_and_usergroups,
authorization_verify_if_user_is_anon,
)
from src.db.organizations import Organization, OrganizationRead
@ -124,11 +124,15 @@ async def create_user_with_invite(
):
# Check if invite code exists
isInviteCodeCorrect = await get_invite_code(
inviteCOde = await get_invite_code(
request, org_id, invite_code, current_user, db_session
)
if not isInviteCodeCorrect:
# Check if invite code contains UserGroup
#TODO
if not inviteCOde:
raise HTTPException(
status_code=400,
detail="Invite code is incorrect",
@ -463,7 +467,7 @@ async def authorize_user_action(
)
# RBAC check
authorized = await authorization_verify_based_on_roles_and_authorship(
authorized = await authorization_verify_based_on_roles_and_authorship_and_usergroups(
request, current_user.id, action, ressource_uuid, db_session
)
@ -535,7 +539,7 @@ async def rbac_check(
if current_user.id == 0: # if user is anonymous
return True
else:
await authorization_verify_based_on_roles_and_authorship(
await authorization_verify_based_on_roles_and_authorship_and_usergroups(
request, current_user.id, "create", "user_x", db_session
)
@ -546,7 +550,7 @@ async def rbac_check(
if current_user.user_uuid == user_uuid:
return True
await authorization_verify_based_on_roles_and_authorship(
await authorization_verify_based_on_roles_and_authorship_and_usergroups(
request, current_user.id, action, user_uuid, db_session
)