mirror of
https://github.com/rzmk/learnhouse.git
synced 2025-12-19 04:19:25 +00:00
feat: init roles + authorship detection
This commit is contained in:
parent
38288e8a57
commit
0595bfdb3f
16 changed files with 109 additions and 236 deletions
|
|
@ -1,17 +1,24 @@
|
|||
from math import e
|
||||
from typing import Literal
|
||||
from fastapi import HTTPException, status, Request
|
||||
from src.security.rbac.utils import check_element_type, get_id_identifier_of_element
|
||||
from src.services.roles.schemas.roles import RoleInDB
|
||||
from src.services.users.schemas.users import UserRolesInOrganization
|
||||
from sqlalchemy import func, null, or_
|
||||
from sqlmodel import Session, select
|
||||
from src.db.collections import Collection
|
||||
from src.db.courses import Course
|
||||
from src.db.resource_authors import ResourceAuthor, ResourceAuthorshipEnum
|
||||
from src.db.roles import Role
|
||||
from src.db.user_organizations import UserOrganization
|
||||
from src.security.rbac.utils import check_element_type
|
||||
|
||||
|
||||
async def authorization_verify_if_element_is_public(
|
||||
request,
|
||||
element_id: str,
|
||||
element_uuid: str,
|
||||
user_id: str,
|
||||
action: Literal["read"],
|
||||
db_session: Session,
|
||||
):
|
||||
element_nature = await check_element_type(element_id)
|
||||
element_nature = await check_element_type(element_uuid)
|
||||
|
||||
# Verifies if the element is public
|
||||
if (
|
||||
|
|
@ -20,10 +27,12 @@ async def authorization_verify_if_element_is_public(
|
|||
and user_id == "anonymous"
|
||||
):
|
||||
if element_nature == "courses":
|
||||
courses = request.app.db["courses"]
|
||||
course = await courses.find_one({"course_id": element_id})
|
||||
statement = select(Course).where(
|
||||
Course.public == True, Course.course_uuid == element_uuid
|
||||
)
|
||||
course = db_session.exec(statement).first()
|
||||
|
||||
if course["public"]:
|
||||
if course:
|
||||
return True
|
||||
else:
|
||||
raise HTTPException(
|
||||
|
|
@ -32,10 +41,12 @@ async def authorization_verify_if_element_is_public(
|
|||
)
|
||||
|
||||
if element_nature == "collections":
|
||||
collections = request.app.db["collections"]
|
||||
collection = await collections.find_one({"collection_id": element_id})
|
||||
statement = select(Collection).where(
|
||||
Collection.public == True, Collection.collection_uuid == element_uuid
|
||||
)
|
||||
collection = db_session.exec(statement).first()
|
||||
|
||||
if collection["public"]:
|
||||
if collection:
|
||||
return True
|
||||
else:
|
||||
raise HTTPException(
|
||||
|
|
@ -53,67 +64,65 @@ async def authorization_verify_if_user_is_author(
|
|||
request,
|
||||
user_id: str,
|
||||
action: Literal["read", "update", "delete", "create"],
|
||||
element_id: str,
|
||||
element_uuid: str,
|
||||
db_session: Session,
|
||||
):
|
||||
if action == "update" or "delete" or "read":
|
||||
element_nature = await check_element_type(element_id)
|
||||
elements = request.app.db[element_nature]
|
||||
element_identifier = await get_id_identifier_of_element(element_id)
|
||||
element = await elements.find_one({element_identifier: element_id})
|
||||
if user_id in element["authors"]:
|
||||
return True
|
||||
statement = select(ResourceAuthor).where(
|
||||
ResourceAuthor.resource_uuid == element_uuid
|
||||
)
|
||||
resource_author = db_session.exec(statement).first()
|
||||
|
||||
if resource_author:
|
||||
if resource_author.user_id == user_id:
|
||||
if (resource_author.authorship == ResourceAuthorshipEnum.CREATOR) or (
|
||||
resource_author.authorship == ResourceAuthorshipEnum.MAINTAINER
|
||||
):
|
||||
return True
|
||||
else:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="User rights (authorship) : You don't have the right to perform this action",
|
||||
)
|
||||
else:
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Wrong action (create)",
|
||||
)
|
||||
|
||||
|
||||
async def authorization_verify_based_on_roles(
|
||||
request: Request,
|
||||
user_id: str,
|
||||
action: Literal["read", "update", "delete", "create"],
|
||||
roles_list: list[UserRolesInOrganization],
|
||||
element_id: str,
|
||||
element_uuid: str,
|
||||
db_session: Session,
|
||||
):
|
||||
element_type = await check_element_type(element_id)
|
||||
element = request.app.db[element_type]
|
||||
roles = request.app.db["roles"]
|
||||
element_type = await check_element_type(element_uuid)
|
||||
|
||||
# Get the element
|
||||
element_identifier = await get_id_identifier_of_element(element_id)
|
||||
element = await element.find_one({element_identifier: element_id})
|
||||
# Get user roles bound to an organization and standard roles
|
||||
statement = (
|
||||
select(Role)
|
||||
.join(UserOrganization)
|
||||
.where(UserOrganization.user_id == user_id)
|
||||
.where((UserOrganization.id == Role.org_id) | (UserOrganization.id == null))
|
||||
)
|
||||
|
||||
# Get the roles of the user
|
||||
roles_id_list = [role["role_id"] for role in roles_list]
|
||||
roles = await roles.find({"role_id": {"$in": roles_id_list}}).to_list(length=100)
|
||||
user_roles_in_organization_and_standard_roles = db_session.exec(statement).all()
|
||||
|
||||
async def checkRoles():
|
||||
# Check Roles
|
||||
for role in roles:
|
||||
role = RoleInDB(**role)
|
||||
if role.elements[element_type][f"action_{action}"] is True:
|
||||
# Find in roles list if there is a role that matches users action for this type of element
|
||||
for role in user_roles_in_organization_and_standard_roles:
|
||||
role = Role.from_orm(role)
|
||||
if role.rights:
|
||||
rights = role.rights
|
||||
if rights[element_type][f"action_{action}"] is True:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
async def checkOrgRoles():
|
||||
# Check Org Roles
|
||||
users = request.app.db["users"]
|
||||
user = await users.find_one({"user_id": user_id})
|
||||
if element is not None:
|
||||
for org in user["orgs"]:
|
||||
if org["org_id"] == element["org_id"]:
|
||||
if org["org_role"] == "owner" or org["org_role"] == "editor":
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
if await checkRoles() or await checkOrgRoles():
|
||||
return True
|
||||
else:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="User rights (roless) : You don't have the right to perform this action",
|
||||
detail="User rights (roles) : You don't have the right to perform this action",
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -121,15 +130,15 @@ async def authorization_verify_based_on_roles_and_authorship(
|
|||
request: Request,
|
||||
user_id: str,
|
||||
action: Literal["read", "update", "delete", "create"],
|
||||
roles_list: list[UserRolesInOrganization],
|
||||
element_id: str,
|
||||
element_uuid: str,
|
||||
db_session: Session,
|
||||
):
|
||||
isAuthor = await authorization_verify_if_user_is_author(
|
||||
request, user_id, action, element_id
|
||||
request, user_id, action, element_uuid, db_session
|
||||
)
|
||||
|
||||
isRole = await authorization_verify_based_on_roles(
|
||||
request, user_id, action, roles_list, element_id
|
||||
request, user_id, action, element_uuid, db_session
|
||||
)
|
||||
|
||||
if isAuthor or isRole:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue