feat: use new session and auth provider for the frontend

This commit is contained in:
swve 2023-12-26 22:32:08 +01:00
parent d939dc16eb
commit 6aa849b305
27 changed files with 283 additions and 235 deletions

View file

@ -1,6 +1,10 @@
from typing import Optional
from pydantic import BaseModel
from sqlmodel import Field, SQLModel
from src.db.roles import RoleRead
from src.db.organizations import OrganizationRead
class UserBase(SQLModel):
username: str
@ -33,14 +37,27 @@ class UserRead(UserBase):
id: int
user_uuid: str
class PublicUser(UserRead):
pass
class UserRoleWithOrg(BaseModel):
role: RoleRead
org: OrganizationRead
class UserSession(BaseModel):
user: UserRead
roles: list[UserRoleWithOrg]
class AnonymousUser(SQLModel):
id: int = 0
user_uuid: str = "user_anonymous"
username: str = "anonymous"
class User(UserBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
password: str = ""

View file

@ -9,6 +9,7 @@ from src.db.users import (
User,
UserCreate,
UserRead,
UserSession,
UserUpdate,
UserUpdatePassword,
)
@ -17,6 +18,7 @@ from src.services.users.users import (
create_user,
create_user_without_org,
delete_user_by_id,
get_user_session,
read_user_by_id,
read_user_by_uuid,
update_user,
@ -35,6 +37,18 @@ async def api_get_current_user(current_user: User = Depends(get_current_user)):
return current_user.dict()
@router.get("/session")
async def api_get_current_user_session(
request: Request,
db_session: Session = Depends(get_db_session),
current_user: PublicUser = Depends(get_current_user),
) -> UserSession:
"""
Get current user
"""
return await get_user_session(request, db_session, current_user)
@router.get("/authorize/ressource/{ressource_uuid}/action/{action}")
async def api_get_authorization_status(
request: Request,

View file

@ -23,7 +23,7 @@ async def authorization_verify_if_element_is_public(
if element_nature == "courses":
print("looking for course")
statement = select(Course).where(
Course.public == True, Course.course_uuid == element_uuid
Course.public is True, Course.course_uuid == element_uuid
)
course = db_session.exec(statement).first()
if course:
@ -33,7 +33,7 @@ async def authorization_verify_if_element_is_public(
if element_nature == "collections":
statement = select(Collection).where(
Collection.public == True, Collection.collection_uuid == element_uuid
Collection.public is True, Collection.collection_uuid == element_uuid
)
collection = db_session.exec(statement).first()

View file

@ -327,7 +327,7 @@ async def get_courses_orgslug(
statement_public = (
select(Course)
.join(Organization)
.where(Organization.slug == org_slug, Course.public == True)
.where(Organization.slug == org_slug, Course.public is True)
)
statement_all = (
select(Course).join(Organization).where(Organization.slug == org_slug)

View file

@ -3,17 +3,20 @@ from typing import Literal
from uuid import uuid4
from fastapi import HTTPException, Request, status
from sqlmodel import Session, select
from src.db.roles import Role, RoleRead
from src.security.rbac.rbac import (
authorization_verify_based_on_roles_and_authorship,
authorization_verify_if_user_is_anon,
)
from src.db.organizations import Organization
from src.db.organizations import Organization, OrganizationRead
from src.db.users import (
AnonymousUser,
PublicUser,
User,
UserCreate,
UserRead,
UserRoleWithOrg,
UserSession,
UserUpdate,
UserUpdatePassword,
)
@ -279,6 +282,57 @@ async def read_user_by_uuid(
return user
async def get_user_session(
request: Request,
db_session: Session,
current_user: PublicUser | AnonymousUser,
) -> UserSession:
# Get user
statement = select(User).where(User.user_uuid == current_user.user_uuid)
user = db_session.exec(statement).first()
if not user:
raise HTTPException(
status_code=400,
detail="User does not exist",
)
user = UserRead.from_orm(user)
# Get roles and orgs
statement = (
select(UserOrganization)
.where(UserOrganization.user_id == user.id)
.join(Organization)
)
user_organizations = db_session.exec(statement).all()
roles = []
for user_organization in user_organizations:
role_statement = select(Role).where(Role.id == user_organization.role_id)
role = db_session.exec(role_statement).first()
org_statement = select(Organization).where(
Organization.id == user_organization.org_id
)
org = db_session.exec(org_statement).first()
roles.append(
UserRoleWithOrg(
role=RoleRead.from_orm(role),
org=OrganizationRead.from_orm(org),
)
)
user_session = UserSession(
user=user,
roles=roles,
)
return user_session
async def authorize_user_action(
request: Request,
db_session: Session,