init roles

This commit is contained in:
swve 2022-07-02 18:33:55 +02:00
parent 13c6193bea
commit 62fe8dbe89
4 changed files with 225 additions and 6 deletions

View file

@ -1,5 +1,5 @@
from fastapi import APIRouter from fastapi import APIRouter
from src.routers import users, auth, houses, orgs from src.routers import users, auth, houses, orgs, roles
from starlette.responses import FileResponse from starlette.responses import FileResponse
@ -11,3 +11,4 @@ global_router.include_router(users.router, prefix="/users", tags=["users"])
global_router.include_router(auth.router, prefix="/auth", tags=["auth"]) global_router.include_router(auth.router, prefix="/auth", tags=["auth"])
global_router.include_router(houses.router, prefix="/houses", tags=["houses"]) global_router.include_router(houses.router, prefix="/houses", tags=["houses"])
global_router.include_router(orgs.router, prefix="/orgs", tags=["orgs"]) global_router.include_router(orgs.router, prefix="/orgs", tags=["orgs"])
global_router.include_router(roles.router, prefix="/roles", tags=["roles"])

49
src/routers/roles.py Normal file
View file

@ -0,0 +1,49 @@
from fastapi import APIRouter, Depends
from src.services.auth import get_current_user
from src.services.roles import Role, create_role, delete_role, get_role, update_role
from src.services.users import User
router = APIRouter()
@router.post("/")
async def api_create_role(role_object: Role, current_user: User = Depends(get_current_user)):
"""
Create new role
"""
return await create_role(role_object, current_user)
@router.get("/{role_id}")
async def api_get_role(role_id: str):
"""
Get single role by role_id
"""
return await get_role(role_id)
@router.get("/page/{page}/limit/{limit}")
async def api_get_role_by(page: int, limit: int):
"""
Get roles by page and limit
"""
return await get_role(page, limit)
@router.put("/{role_id}")
async def api_update_role(role_object: Role, role_id: str, current_user: User = Depends(get_current_user)):
"""
Update role by role_id
"""
return await update_role(role_object, role_id, current_user)
@router.delete("/{role_id}")
async def api_delete_role(role_id: str, current_user: User = Depends(get_current_user)):
"""
Delete role by ID
"""
return await delete_role(role_id, current_user)

168
src/services/roles.py Normal file
View file

@ -0,0 +1,168 @@
import json
from typing import List
from uuid import uuid4
from pydantic import BaseModel
from src.services.users import User
from ..services.database import check_database, learnhouseDB, learnhouseDB
from ..services.security import *
from ..services.houses import House
from fastapi import HTTPException, status
from datetime import datetime
#### Classes ####################################################
class Permission(BaseModel):
create: bool
read: bool
update: bool
delete: bool
class Elements(BaseModel):
course: List[str]
users: List[str]
houses: List[str]
paths: List[str]
coursegroups: List[str]
class Role(BaseModel):
name: str
description: str
permissions: Permission
elements: Elements
class RoleInDB(Role):
role_id: str
creationDate: str
updateDate: str
#### Classes ####################################################
async def get_role(role_id: str):
await check_database()
roles = learnhouseDB["roles"]
role = roles.find_one({"role_id": role_id})
if not role:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Role does not exist")
role = Role(**role)
return role
async def create_role(role_object: Role, current_user: User):
await check_database()
roles = learnhouseDB["roles"]
# find if house already exists using name
isRoleAvailable = roles.find_one({"name": role_object.name})
if isRoleAvailable:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Role name already exists")
await verify_user_permissions("create", current_user)
# generate house_id with uuid4
role_id = str(f"role_{uuid4()}")
role = RoleInDB(role_id=role_id, creationDate=str(datetime.now()),
updateDate=str(datetime.now()), **role_object.dict())
role_in_db = roles.insert_one(role.dict())
if not role_in_db:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Unavailable database")
return role.dict()
async def update_role(role_object: House, role_id: str, current_user: User):
await check_database()
# verify house rights
await verify_user_permissions("update", current_user)
roles = learnhouseDB["roles"]
role = roles.find_one({"role_id": role_id})
if not role:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="House does not exist")
updated_role = RoleInDB(
role_id=role_id, updateDate=str(datetime.now()), creationDate=role["creationDate"], **role_object.dict())
roles.update_one({"role_id": role_id}, {"$set": updated_role.dict()})
return RoleInDB(**updated_role.dict())
async def delete_role(role_id: str, current_user: User):
await check_database()
# verify house rights
await verify_user_permissions("delete", current_user)
roles = learnhouseDB["roles"]
role = roles.find_one({"role_id": role_id})
if not role:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Role does not exist")
isDeleted = roles.delete_one({"role_id": role_id})
if isDeleted:
return {"detail": "Role deleted"}
else:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Unavailable database")
async def get_roles(page: int = 1, limit: int = 10):
await check_database()
roles = learnhouseDB["roles"]
# get all roles from database
all_roles = roles.find().sort("name", 1).skip(10 * (page - 1)).limit(limit)
return [json.loads(json.dumps(role, default=str)) for role in all_roles]
#### Security ####################################################
async def verify_user_permissions(action: str, current_user: User):
await check_database()
users = learnhouseDB["users"]
user = users.find_one({"username": current_user.username})
if not user:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="User does not exist")
isOwner = "owner" in user["user_type"]
isEditor = "editor" in user["user_type"]
if action == "delete":
if isEditor:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="You do not have rights to this Role")
if not isOwner and not isEditor:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="You do not have rights to this Role")
return True
#### Security ####################################################

View file

@ -1,7 +1,7 @@
from pydantic import BaseModel from pydantic import BaseModel
from ..services.database import create_config_collection, check_database, create_database, learnhouseDB, learnhouseDB from ..services.database import check_database, learnhouseDB, learnhouseDB
from ..services.security import * from ..services.security import *
from fastapi import FastAPI, HTTPException, status, Request, Response, BackgroundTasks from fastapi import HTTPException, status
from datetime import datetime from datetime import datetime
#### Classes #################################################### #### Classes ####################################################
@ -15,7 +15,8 @@ class User(BaseModel):
avatar_url: str | None = None avatar_url: str | None = None
verified: bool verified: bool
created_date: str created_date: str
bio : str | None = None user_type: str
bio: str | None = None
class UserInDB(User): class UserInDB(User):
@ -61,7 +62,7 @@ async def update_user(user_object: UserInDB):
raise HTTPException( raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="User does not exist") status_code=status.HTTP_409_CONFLICT, detail="User does not exist")
user_object.password = security_hash_password(user_object.password) user_object.password = await security_hash_password(user_object.password)
updated_user = {"$set": user_object.dict()} updated_user = {"$set": user_object.dict()}
users.update_one({"username": user_object.username}, updated_user) users.update_one({"username": user_object.username}, updated_user)
@ -96,7 +97,7 @@ async def create_user(user_object: UserInDB):
# lowercase username # lowercase username
user_object.username = user_object.username.lower() user_object.username = user_object.username.lower()
user_object.created_date = str(datetime.now()) user_object.created_date = str(datetime.now())
user_object.password = await security_hash_password(user_object.password) user_object.password = await security_hash_password(user_object.password)