🔒️ roles : organizations support

This commit is contained in:
swve 2022-07-08 21:47:29 +02:00
parent 90234bc5d7
commit 73e25c4b37
5 changed files with 16 additions and 30 deletions

View file

@ -20,7 +20,7 @@ async def api_get_org(org_id: str, current_user: User = Depends(get_current_user
"""
Get single Org by ID
"""
return await get_organization(org_id)
return await get_organization(org_id, current_user)
@router.get("/page/{page}/limit/{limit}")
@ -28,7 +28,7 @@ async def api_get_org_by(page: int, limit: int, current_user: User = Depends(get
"""
Get orgs by page and limit
"""
return await get_orgs(page, limit)
return await get_orgs(page, limit, current_user)
@router.put("/{org_id}")

View file

@ -34,13 +34,7 @@ async def get_house(house_id: str, current_user: User):
house = houses.find_one({"house_id": house_id})
# verify house rights
hasOwnershipRights = await verify_house_rights(house_id, current_user)
hasRoleRights = await verify_user_rights_with_roles("read", current_user.username, house_id)
if not hasRoleRights or not hasOwnershipRights:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Roles/Ownership : Insufficient rights to perform this action")
await verify_house_rights(house_id, current_user,"read")
if not house:
raise HTTPException(
@ -87,13 +81,7 @@ async def update_house(house_object: House, house_id: str, current_user: User):
await check_database()
# verify house rights
hasOwnershipRights = await verify_house_rights(house_id, current_user)
hasRoleRights = await verify_user_rights_with_roles("update", current_user.username, house_id)
if not hasRoleRights or not hasOwnershipRights:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Roles/Ownership : Insufficient rights to perform this action")
await verify_house_rights(house_id, current_user,"update")
houses = learnhouseDB["houses"]
@ -119,13 +107,7 @@ async def delete_house(house_id: str, current_user: User):
await check_database()
# verify house rights
hasOwnershipRights = await verify_house_rights(house_id, current_user)
hasRoleRights = await verify_user_rights_with_roles("delete", current_user.username, house_id)
if not hasRoleRights or not hasOwnershipRights:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Roles/Ownership : Insufficient rights to perform this action")
await verify_house_rights(house_id, current_user,"delete")
houses = learnhouseDB["houses"]
@ -156,7 +138,7 @@ async def get_houses(page: int = 1, limit: int = 10):
#### Security ####################################################
async def verify_house_rights(house_id: str, current_user: User):
async def verify_house_rights(house_id: str, current_user: User, action: str):
await check_database()
houses = learnhouseDB["houses"]
@ -166,12 +148,12 @@ async def verify_house_rights(house_id: str, current_user: User):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="House does not exist")
isAdmin = current_user.username in house["admins"]
hasRoleRights = await verify_user_rights_with_roles(action, current_user.username, house_id)
isOwner = current_user.username in house["owners"]
if not isAdmin and not isOwner:
if not hasRoleRights and not isOwner:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="You do not have rights to this house")
status_code=status.HTTP_403_FORBIDDEN, detail="Roles/Ownership : Insufficient rights to perform this action")
return True

View file

@ -116,6 +116,7 @@ async def delete_org(org_id: str, current_user: User):
async def get_orgs(page: int = 1, limit: int = 10):
## TODO : auth
await check_database()
orgs = learnhouseDB["orgs"]
@ -127,7 +128,7 @@ async def get_orgs(page: int = 1, limit: int = 10):
#### Security ####################################################
async def verify_org_rights(org_id: str, current_user: User):
async def verify_org_rights(org_id: str, current_user: User, action:str,):
await check_database()
orgs = learnhouseDB["organizations"]
@ -137,10 +138,10 @@ async def verify_org_rights(org_id: str, current_user: User):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Organization does not exist")
isAdmin = current_user.username in org["admins"]
isOwner = current_user.username in org["owners"]
hasRoleRights = await verify_user_rights_with_roles(action,current_user.username,org_id)
if not isAdmin and not isOwner:
if not hasRoleRights and not isOwner:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="You do not have rights to this organization")

View file

@ -24,6 +24,7 @@ class Elements(BaseModel):
users: List[str]
houses: List[str]
collections: List[str]
organizations: List[str]
class Role(BaseModel):

View file

@ -70,6 +70,8 @@ async def check_element_type(element_id):
return "users"
elif element_id.startswith("house_"):
return "houses"
elif element_id.startswith("org_"):
return "organizations"
elif element_id.startswith("collection_"):
return "collections"
else: