🔒️ init auth via roles

This commit is contained in:
swve 2022-07-08 21:07:48 +02:00
parent 62fe8dbe89
commit 90234bc5d7
5 changed files with 98 additions and 16 deletions

View file

@ -17,11 +17,11 @@ async def api_create_house(house_object: House, current_user: User = Depends(get
@router.get("/{house_id}")
async def api_get_house(house_id: str):
async def api_get_house(house_id: str, current_user: User = Depends(get_current_user)):
"""
Get single House by house_id
"""
return await get_house(house_id)
return await get_house(house_id, current_user=current_user)
@router.get("/page/{page}/limit/{limit}")

View file

@ -16,7 +16,7 @@ async def api_create_org(org_object: Organization, current_user: User = Depends(
@router.get("/{org_id}")
async def api_get_org(org_id: str):
async def api_get_org(org_id: str, current_user: User = Depends(get_current_user)):
"""
Get single Org by ID
"""
@ -24,7 +24,7 @@ async def api_get_org(org_id: str):
@router.get("/page/{page}/limit/{limit}")
async def api_get_org_by(page: int, limit: int):
async def api_get_org_by(page: int, limit: int, current_user: User = Depends(get_current_user)):
"""
Get orgs by page and limit
"""

View file

@ -27,12 +27,21 @@ class HouseInDB(House):
#### Classes ####################################################
async def get_house(house_id: str):
async def get_house(house_id: str, current_user: User):
await check_database()
houses = learnhouseDB["houses"]
house = houses.find_one({"house_id": house_id})
# verify house rights
hasOwnershipRights = await verify_house_rights(house_id, current_user)
hasRoleRights = await verify_user_rights_with_roles("read", current_user.username, house_id)
if not hasRoleRights or not hasOwnershipRights:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Roles/Ownership : Insufficient rights to perform this action")
if not house:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="House does not exist")
@ -55,6 +64,12 @@ async def create_house(house_object: House, current_user: User):
# generate house_id with uuid4
house_id = str(f"house_{uuid4()}")
hasRoleRights = await verify_user_rights_with_roles("create", current_user.username, house_id)
if not hasRoleRights:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Roles : Insufficient rights to perform this action")
house = HouseInDB(house_id=house_id, owners=[
current_user.username], admins=[
current_user.username], **house_object.dict())
@ -72,7 +87,13 @@ async def update_house(house_object: House, house_id: str, current_user: User):
await check_database()
# verify house rights
await verify_house_rights(house_id, current_user)
hasOwnershipRights = await verify_house_rights(house_id, current_user)
hasRoleRights = await verify_user_rights_with_roles("update", current_user.username, house_id)
if not hasRoleRights or not hasOwnershipRights:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Roles/Ownership : Insufficient rights to perform this action")
houses = learnhouseDB["houses"]
@ -98,7 +119,13 @@ async def delete_house(house_id: str, current_user: User):
await check_database()
# verify house rights
await verify_house_rights(house_id, current_user)
hasOwnershipRights = await verify_house_rights(house_id, current_user)
hasRoleRights = await verify_user_rights_with_roles("delete", current_user.username, house_id)
if not hasRoleRights or not hasOwnershipRights:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Roles/Ownership : Insufficient rights to perform this action")
houses = learnhouseDB["houses"]
@ -120,7 +147,7 @@ async def delete_house(house_id: str, current_user: User):
async def get_houses(page: int = 1, limit: int = 10):
await check_database()
houses = learnhouseDB["houses"]
# TODO : Get only houses that user is admin/has roles of
# get all houses from database
all_houses = houses.find().sort("name", 1).skip(10 * (page - 1)).limit(limit)

View file

@ -3,28 +3,27 @@ from typing import List
from uuid import uuid4
from pydantic import BaseModel
from src.services.users import User
from ..services.database import check_database, learnhouseDB, learnhouseDB
from ..services.database import check_database, learnhouseDB, learnhouseDB
from ..services.security import *
from ..services.houses import House
from fastapi import HTTPException, status
from fastapi import HTTPException, status
from datetime import datetime
#### Classes ####################################################
class Permission(BaseModel):
create: bool
read: bool
update: bool
delete: bool
action_create: bool
action_read: bool
action_update: bool
action_delete: bool
class Elements(BaseModel):
course: List[str]
users: List[str]
houses: List[str]
paths: List[str]
coursegroups: List[str]
collections: List[str]
class Role(BaseModel):
@ -32,6 +31,7 @@ class Role(BaseModel):
description: str
permissions: Permission
elements: Elements
linked_users: List[str]
class RoleInDB(Role):

View file

@ -1,6 +1,10 @@
from fastapi import HTTPException, status
from passlib.context import CryptContext
from jose import JWTError, jwt
import logging
from passlib.hash import pbkdf2_sha256
from ..services.database import check_database
from ..services.database import check_database, learnhouseDB, learnhouseDB
### 🔒 JWT ##############################################################
@ -23,3 +27,54 @@ async def security_verify_password(plain_password: str, hashed_password: str):
return pbkdf2_sha256.verify(plain_password, hashed_password)
### 🔒 Passwords Hashing ##############################################################
### 🔒 Roles checking ##############################################################
async def verify_user_rights_with_roles(action: str, user_id: str, element_id: str):
"""
Check if the user has the right to perform the action on the element
"""
await check_database()
roles = learnhouseDB["roles"]
user_roles_cursor = roles.find({
"linked_users": str(user_id)
})
user_roles = []
# Info: permission actions are: read, create, delete, update
for role in user_roles_cursor:
user_roles.append(role)
for role in user_roles:
element = role["elements"][await check_element_type(element_id)]
permission_state = role["permissions"][f'action_{action}']
##
if ("*" in element or element_id in element) and (permission_state is True):
return True
else:
return False
async def check_element_type(element_id):
"""
Check if the element is a course, a user, a house or a collection, by checking its prefix
"""
if element_id.startswith("course_"):
return "courses"
elif element_id.startswith("user_"):
return "users"
elif element_id.startswith("house_"):
return "houses"
elif element_id.startswith("collection_"):
return "collections"
else:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Issue verifying element nature")
### 🔒 Roles checking ##############################################################