feat: additional verification for anon users

This commit is contained in:
swve 2023-07-20 01:42:20 +02:00
parent 3c2f6b3a98
commit 42c99f3939
9 changed files with 46 additions and 4 deletions

View file

@ -79,7 +79,6 @@ async def authorization_verify_based_on_roles(
element_id: str, element_id: str,
): ):
element_type = await check_element_type(element_id) element_type = await check_element_type(element_id)
print(element_type)
element = request.app.db[element_type] element = request.app.db[element_type]
roles = request.app.db["roles"] roles = request.app.db["roles"]
@ -125,3 +124,11 @@ async def authorization_verify_based_on_roles_and_authorship(
status_code=status.HTTP_403_FORBIDDEN, status_code=status.HTTP_403_FORBIDDEN,
detail="User rights (roles & authorship) : You don't have the right to perform this action", detail="User rights (roles & authorship) : You don't have the right to perform this action",
) )
async def authorization_verify_if_user_is_anon(user_id: str):
if user_id == "anonymous":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You should be logged in to perform this action",
)

View file

@ -3,6 +3,7 @@ from pydantic import BaseModel
from src.security.rbac.rbac import ( from src.security.rbac.rbac import (
authorization_verify_based_on_roles, authorization_verify_based_on_roles,
authorization_verify_if_element_is_public, authorization_verify_if_element_is_public,
authorization_verify_if_user_is_anon,
) )
from src.services.users.schemas.users import AnonymousUser, PublicUser from src.services.users.schemas.users import AnonymousUser, PublicUser
from fastapi import HTTPException, status, Request from fastapi import HTTPException, status, Request
@ -214,6 +215,8 @@ async def verify_rights(
users = request.app.db["users"] users = request.app.db["users"]
user = await users.find_one({"user_id": current_user.user_id}) user = await users.find_one({"user_id": current_user.user_id})
await authorization_verify_if_user_is_anon(current_user.user_id)
await authorization_verify_based_on_roles( await authorization_verify_based_on_roles(
request, request,
current_user.user_id, current_user.user_id,
@ -225,6 +228,8 @@ async def verify_rights(
users = request.app.db["users"] users = request.app.db["users"]
user = await users.find_one({"user_id": current_user.user_id}) user = await users.find_one({"user_id": current_user.user_id})
await authorization_verify_if_user_is_anon(current_user.user_id)
await authorization_verify_based_on_roles( await authorization_verify_based_on_roles(
request, request,
current_user.user_id, current_user.user_id,

View file

@ -7,6 +7,7 @@ from src.security.rbac.rbac import (
authorization_verify_based_on_roles, authorization_verify_based_on_roles,
authorization_verify_based_on_roles_and_authorship, authorization_verify_based_on_roles_and_authorship,
authorization_verify_if_element_is_public, authorization_verify_if_element_is_public,
authorization_verify_if_user_is_anon,
) )
from src.services.courses.courses import Course from src.services.courses.courses import Course
from src.services.courses.activities.activities import ActivityInDB from src.services.courses.activities.activities import ActivityInDB
@ -323,6 +324,8 @@ async def verify_rights(
users = request.app.db["users"] users = request.app.db["users"]
user = await users.find_one({"user_id": current_user.user_id}) user = await users.find_one({"user_id": current_user.user_id})
await authorization_verify_if_user_is_anon(current_user.user_id)
await authorization_verify_based_on_roles_and_authorship( await authorization_verify_based_on_roles_and_authorship(
request, request,
current_user.user_id, current_user.user_id,
@ -334,6 +337,8 @@ async def verify_rights(
users = request.app.db["users"] users = request.app.db["users"]
user = await users.find_one({"user_id": current_user.user_id}) user = await users.find_one({"user_id": current_user.user_id})
await authorization_verify_if_user_is_anon(current_user.user_id)
await authorization_verify_based_on_roles_and_authorship( await authorization_verify_based_on_roles_and_authorship(
request, request,
current_user.user_id, current_user.user_id,

View file

@ -1,7 +1,7 @@
from typing import List, Literal from typing import List, Literal
from uuid import uuid4 from uuid import uuid4
from pydantic import BaseModel from pydantic import BaseModel
from src.security.rbac.rbac import authorization_verify_based_on_roles_and_authorship from src.security.rbac.rbac import authorization_verify_based_on_roles_and_authorship, authorization_verify_if_user_is_anon
from src.services.users.users import PublicUser from src.services.users.users import PublicUser
from fastapi import HTTPException, status, Request from fastapi import HTTPException, status, Request
@ -233,6 +233,8 @@ async def verify_collection_rights(
if current_user.user_id == "anonymous" and action == "read": if current_user.user_id == "anonymous" and action == "read":
return True return True
await authorization_verify_if_user_is_anon(current_user.user_id)
await authorization_verify_based_on_roles_and_authorship( await authorization_verify_based_on_roles_and_authorship(
request, current_user.user_id, action, user["roles"], collection_id request, current_user.user_id, action, user["roles"], collection_id
) )

View file

@ -6,6 +6,7 @@ from src.security.rbac.rbac import (
authorization_verify_based_on_roles, authorization_verify_based_on_roles,
authorization_verify_based_on_roles_and_authorship, authorization_verify_based_on_roles_and_authorship,
authorization_verify_if_element_is_public, authorization_verify_if_element_is_public,
authorization_verify_if_user_is_anon,
) )
from src.services.courses.activities.activities import ActivityInDB from src.services.courses.activities.activities import ActivityInDB
from src.services.courses.thumbnails import upload_thumbnail from src.services.courses.thumbnails import upload_thumbnail
@ -398,6 +399,8 @@ async def verify_rights(
users = request.app.db["users"] users = request.app.db["users"]
user = await users.find_one({"user_id": current_user.user_id}) user = await users.find_one({"user_id": current_user.user_id})
await authorization_verify_if_user_is_anon(current_user.user_id)
await authorization_verify_based_on_roles_and_authorship( await authorization_verify_based_on_roles_and_authorship(
request, request,
current_user.user_id, current_user.user_id,

View file

@ -1,7 +1,10 @@
import json import json
from typing import Literal from typing import Literal
from uuid import uuid4 from uuid import uuid4
from src.security.rbac.rbac import authorization_verify_based_on_roles from src.security.rbac.rbac import (
authorization_verify_based_on_roles,
authorization_verify_if_user_is_anon,
)
from src.services.orgs.logos import upload_org_logo from src.services.orgs.logos import upload_org_logo
from src.services.orgs.schemas.orgs import ( from src.services.orgs.schemas.orgs import (
Organization, Organization,
@ -212,6 +215,8 @@ async def verify_org_rights(
status_code=status.HTTP_409_CONFLICT, detail="Organization does not exist" status_code=status.HTTP_409_CONFLICT, detail="Organization does not exist"
) )
await authorization_verify_if_user_is_anon(current_user.user_id)
await authorization_verify_based_on_roles( await authorization_verify_based_on_roles(
request, current_user.user_id, action, user["roles"], org_id request, current_user.user_id, action, user["roles"], org_id
) )

View file

@ -1,5 +1,6 @@
from typing import Literal from typing import Literal
from uuid import uuid4 from uuid import uuid4
from src.security.rbac.rbac import authorization_verify_if_user_is_anon
from src.services.roles.schemas.roles import Role, RoleInDB from src.services.roles.schemas.roles import Role, RoleInDB
from src.services.users.schemas.users import PublicUser from src.services.users.schemas.users import PublicUser
from fastapi import HTTPException, status, Request from fastapi import HTTPException, status, Request
@ -85,6 +86,8 @@ async def verify_user_permissions_on_roles(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Roles : Not authenticated" status_code=status.HTTP_401_UNAUTHORIZED, detail="Roles : Not authenticated"
) )
await authorization_verify_if_user_is_anon(current_user.user_id)
if action == "create": if action == "create":
if "owner" in [org.org_role for org in current_user.orgs]: if "owner" in [org.org_role for org in current_user.orgs]:
return True return True

View file

@ -57,6 +57,9 @@ class PublicUser(User):
class AnonymousUser(BaseModel): class AnonymousUser(BaseModel):
user_id: str = "anonymous" user_id: str = "anonymous"
username: str = "anonymous" username: str = "anonymous"
roles: list[UserRolesInOrganization] = [
UserRolesInOrganization(org_id="anonymous", role_id="role_anonymous")
]

View file

@ -2,7 +2,7 @@ from datetime import datetime
from typing import Literal from typing import Literal
from uuid import uuid4 from uuid import uuid4
from fastapi import HTTPException, Request, status from fastapi import HTTPException, Request, status
from src.security.rbac.rbac import authorization_verify_based_on_roles from src.security.rbac.rbac import authorization_verify_based_on_roles, authorization_verify_if_user_is_anon
from src.security.security import security_hash_password, security_verify_password from src.security.security import security_hash_password, security_verify_password
from src.services.users.schemas.users import ( from src.services.users.schemas.users import (
PasswordChangeForm, PasswordChangeForm,
@ -266,6 +266,9 @@ async def verify_user_rights_on_user(
return True return True
if action == "read": if action == "read":
await authorization_verify_if_user_is_anon(current_user.user_id)
if current_user.user_id == user_id: if current_user.user_id == user_id:
return True return True
@ -276,6 +279,9 @@ async def verify_user_rights_on_user(
return False return False
if action == "update": if action == "update":
await authorization_verify_if_user_is_anon(current_user.user_id)
if current_user.user_id == user_id: if current_user.user_id == user_id:
return True return True
@ -291,6 +297,9 @@ async def verify_user_rights_on_user(
return False return False
if action == "delete": if action == "delete":
await authorization_verify_if_user_is_anon(current_user.user_id)
if current_user.user_id == user_id: if current_user.user_id == user_id:
return True return True