feat: add linking UGs to Users

This commit is contained in:
swve 2024-03-30 17:32:29 +00:00
parent d1d817678b
commit e173a32e3c
9 changed files with 235 additions and 20 deletions

View file

@ -6,14 +6,13 @@ from fastapi import HTTPException, Request
from sqlmodel import Session, select
from src.security.rbac.rbac import (
authorization_verify_based_on_roles_and_authorship_and_usergroups,
authorization_verify_if_element_is_public,
authorization_verify_if_user_is_anon,
)
from src.db.usergroup_resources import UserGroupResource
from src.db.usergroup_user import UserGroupUser
from src.db.organizations import Organization
from src.db.usergroups import UserGroup, UserGroupCreate, UserGroupRead, UserGroupUpdate
from src.db.users import AnonymousUser, PublicUser, User
from src.db.users import AnonymousUser, PublicUser, User, UserRead
async def create_usergroup(
@ -89,6 +88,48 @@ async def read_usergroup_by_id(
return usergroup
async def get_users_linked_to_usergroup(
request: Request,
db_session: Session,
current_user: PublicUser | AnonymousUser,
usergroup_id: int,
) -> list[UserRead]:
statement = select(UserGroup).where(UserGroup.id == usergroup_id)
usergroup = db_session.exec(statement).first()
if not usergroup:
raise HTTPException(
status_code=404,
detail="UserGroup not found",
)
# RBAC check
await rbac_check(
request,
usergroup_uuid=usergroup.usergroup_uuid,
current_user=current_user,
action="read",
db_session=db_session,
)
statement = select(UserGroupUser).where(UserGroupUser.usergroup_id == usergroup_id)
usergroup_users = db_session.exec(statement).all()
user_ids = [usergroup_user.user_id for usergroup_user in usergroup_users]
# get users
users = []
for user_id in user_ids:
statement = select(User).where(User.id == user_id)
user = db_session.exec(statement).first()
users.append(user)
users = [UserRead.from_orm(user) for user in users]
return users
async def read_usergroups_by_org_id(
request: Request,
db_session: Session,
@ -99,8 +140,6 @@ async def read_usergroups_by_org_id(
statement = select(UserGroup).where(UserGroup.org_id == org_id)
usergroups = db_session.exec(statement).all()
# RBAC check
await rbac_check(
request,
@ -236,6 +275,8 @@ async def add_users_to_usergroup(
status_code=404,
detail="UserGroup not found",
)
# RBAC check
await rbac_check(
@ -252,6 +293,17 @@ async def add_users_to_usergroup(
statement = select(User).where(User.id == user_id)
user = db_session.exec(statement).first()
# Check if User is already Linked to UserGroup
statement = select(UserGroupUser).where(
UserGroupUser.usergroup_id == usergroup_id,
UserGroupUser.user_id == user_id,
)
usergroup_user = db_session.exec(statement).first()
if usergroup_user:
logging.error(f"User with id {user_id} already exists in UserGroup")
continue
if user:
# Add user to UserGroup
if user.id is not None:
@ -301,7 +353,7 @@ async def remove_users_from_usergroup(
user_ids_array = user_ids.split(",")
for user_id in user_ids_array:
statement = select(UserGroupUser).where(UserGroupUser.user_id == user_id)
statement = select(UserGroupUser).where(UserGroupUser.user_id == user_id, UserGroupUser.usergroup_id == usergroup_id)
usergroup_user = db_session.exec(statement).first()
if usergroup_user: