feat: adapt UGs to roles

This commit is contained in:
swve 2024-03-28 11:20:42 +00:00
parent 0df250c729
commit a6152ef1f5
9 changed files with 265 additions and 70 deletions

View file

@ -1,6 +1,12 @@
import stat
from typing import Literal
from uuid import uuid4
from sqlmodel import Session, select
from regex import R
from sqlalchemy import exists, union
from sqlmodel import Session, select, and_, or_
from src.db.usergroups import UserGroup
from src.db.usergroup_resources import UserGroupResource
from src.db.usergroup_user import UserGroupUser
from src.db.organizations import Organization
from src.db.trails import TrailRead
from src.services.trail.trail import get_user_trail_with_orgid
@ -105,7 +111,6 @@ async def get_course_meta(
)
trail = TrailRead.from_orm(trail)
return FullCourseReadWithTrail(
**course.dict(),
chapters=chapters,
@ -141,7 +146,7 @@ async def create_course(
if thumbnail_file and thumbnail_file.filename:
name_in_disk = f"{course.course_uuid}_thumbnail_{uuid4()}.{thumbnail_file.filename.split('.')[-1]}"
await upload_thumbnail(
thumbnail_file, name_in_disk, org.org_uuid, course.course_uuid # type: ignore
thumbnail_file, name_in_disk, org.org_uuid, course.course_uuid # type: ignore
)
course.thumbnail_image = name_in_disk
@ -212,7 +217,7 @@ async def update_course_thumbnail(
if thumbnail_file and thumbnail_file.filename:
name_in_disk = f"{course_uuid}_thumbnail_{uuid4()}.{thumbnail_file.filename.split('.')[-1]}"
await upload_thumbnail(
thumbnail_file, name_in_disk, org.org_uuid, course.course_uuid # type: ignore
thumbnail_file, name_in_disk, org.org_uuid, course.course_uuid # type: ignore
)
# Update course
@ -326,26 +331,47 @@ async def get_courses_orgslug(
page: int = 1,
limit: int = 10,
):
# Query for public courses
statement_public = (
select(Course)
.join(Organization)
.where(Organization.slug == org_slug, Course.public == True)
)
statement_all = (
select(Course).join(Organization).where(Organization.slug == org_slug)
# Query for courses where the current user is an author
statement_author = (
select(Course)
.join(Organization)
.join(ResourceAuthor, ResourceAuthor.user_id == current_user.id)
.where(
Organization.slug == org_slug,
ResourceAuthor.resource_uuid == Course.course_uuid,
)
)
if current_user.id == 0:
statement = statement_public
else:
# RBAC check
await authorization_verify_if_user_is_anon(current_user.id)
# Query for courses where the current user is in a user group that has access to the course
statement_usergroup = (
select(Course)
.join(Organization)
.join(
UserGroupResource, UserGroupResource.resource_uuid == Course.course_uuid
)
.join(
UserGroupUser, UserGroupUser.usergroup_id == UserGroupResource.usergroup_id
)
.where(Organization.slug == org_slug, UserGroupUser.user_id == current_user.id)
)
statement = statement_all
# Combine the results
statement_complete = union(
statement_public, statement_author, statement_usergroup
).subquery()
courses = db_session.exec(statement)
courses = db_session.execute(select([statement_complete])).all()
courses = [CourseRead(**course.dict(), authors=[]) for course in courses]
# TODO: I have no idea why this is necessary, but it is
courses = [CourseRead(**dict(course._mapping), authors=[]) for course in courses]
# for every course, get the authors
for course in courses:
@ -366,6 +392,7 @@ async def get_courses_orgslug(
## 🔒 RBAC Utils ##
async def rbac_check(
request: Request,
course_uuid: str,
@ -380,8 +407,10 @@ async def rbac_check(
)
return res
else:
res = await authorization_verify_based_on_roles_and_authorship_and_usergroups(
request, current_user.id, action, course_uuid, db_session
res = (
await authorization_verify_based_on_roles_and_authorship_and_usergroups(
request, current_user.id, action, course_uuid, db_session
)
)
return res
else: