feat: user init & refactors

This commit is contained in:
swve 2023-11-12 23:16:34 +01:00
parent b4dcc14749
commit a50fc67104
25 changed files with 356 additions and 358 deletions

View file

@ -1,215 +1,285 @@
from datetime import datetime
from typing import Literal
from uuid import uuid4
from fastapi import HTTPException, Request, status
from src.security.rbac.rbac import (
authorization_verify_based_on_roles,
authorization_verify_if_user_is_anon,
)
from src.security.security import security_hash_password, security_verify_password
from src.services.users.schemas.users import (
PasswordChangeForm,
PublicUser,
from sqlmodel import Session, select
from src.db.organizations import Organization
from src.db.users import (
User,
UserOrganization,
UserRolesInOrganization,
UserWithPassword,
UserInDB,
UserCreate,
UserRead,
UserUpdate,
UserUpdatePassword,
)
from src.db.user_organizations import UserOrganization
from src.security.security import security_hash_password, security_verify_password
from src.services.users.schemas.users import PublicUser
async def create_user(
request: Request,
db_session: Session,
current_user: PublicUser | None,
user_object: UserWithPassword,
org_slug: str,
user_object: UserCreate,
org_id: int,
):
users = request.app.db["users"]
user = User.from_orm(user_object)
isUsernameAvailable = await users.find_one({"username": user_object.username})
isEmailAvailable = await users.find_one({"email": user_object.email})
# Complete the user object
user.user_uuid = f"user_{uuid4()}"
user.password = await security_hash_password(user_object.password)
user.email_verified = False
user.creation_date = str(datetime.now())
user.update_date = str(datetime.now())
if isUsernameAvailable:
# Verifications
# Check if Organization exists
statement = select(Organization).where(Organization.id == org_id)
result = db_session.exec(statement)
if not result.first():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Username already exists"
status_code=400,
detail="Organization does not exist",
)
if isEmailAvailable:
# Username
statement = select(User).where(User.username == user.username)
result = db_session.exec(statement)
if result.first():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Email already exists"
status_code=400,
detail="Username already exists",
)
# Generate user_id with uuid4
user_id = str(f"user_{uuid4()}")
# Email
statement = select(User).where(User.email == user.email)
result = db_session.exec(statement)
# Check if the requesting user is authenticated
if current_user is not None:
# Verify rights
await verify_user_rights_on_user(request, current_user, "create", user_id)
# Set the username & hash the password
user_object.username = user_object.username.lower()
user_object.password = await security_hash_password(user_object.password)
# Get org_id from org_slug
orgs = request.app.db["organizations"]
# Check if the org exists
isOrgExists = await orgs.find_one({"slug": org_slug})
# If the org does not exist, raise an error
if not isOrgExists and (org_slug != "None"):
if result.first():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="You are trying to create a user in an organization that does not exist",
status_code=400,
detail="Email already exists",
)
org_id = isOrgExists["org_id"] if org_slug != "None" else ''
# Create initial orgs list with the org_id passed in
orgs = (
[UserOrganization(org_id=org_id, org_role="member")]
if org_slug != "None"
else []
)
# Exclude unset values
user_data = user.dict(exclude_unset=True)
for key, value in user_data.items():
setattr(user, key, value)
# Give role
roles = (
[UserRolesInOrganization(role_id="role_member", org_id=org_id)]
if org_slug != "None"
else []
)
# Add user to database
db_session.add(user)
db_session.commit()
db_session.refresh(user)
# Create the user
user = UserInDB(
user_id=user_id,
# Link user and organization
user_organization = UserOrganization(
user_id=user.id is not None,
org_id=int(org_id),
role_id=1,
creation_date=str(datetime.now()),
update_date=str(datetime.now()),
orgs=orgs,
roles=roles,
**user_object.dict(),
)
# Insert the user into the database
await users.insert_one(user.dict())
db_session.add(user_organization)
db_session.commit()
db_session.refresh(user_organization)
return User(**user.dict())
user = UserRead.from_orm(user)
return user
async def read_user(request: Request, current_user: PublicUser, user_id: str):
users = request.app.db["users"]
async def create_user_without_org(
request: Request,
db_session: Session,
current_user: PublicUser | None,
user_object: UserCreate,
):
user = User.from_orm(user_object)
# Check if the user exists
isUserExists = await users.find_one({"user_id": user_id})
# Complete the user object
user.user_uuid = f"user_{uuid4()}"
user.password = await security_hash_password(user_object.password)
user.email_verified = False
user.creation_date = str(datetime.now())
user.update_date = str(datetime.now())
# Verify rights
await verify_user_rights_on_user(request, current_user, "read", user_id)
# Verifications
# If the user does not exist, raise an error
if not isUserExists:
# Username
statement = select(User).where(User.username == user.username)
result = db_session.exec(statement)
if result.first():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="User does not exist"
status_code=400,
detail="Username already exists",
)
return User(**isUserExists)
# Email
statement = select(User).where(User.email == user.email)
result = db_session.exec(statement)
if result.first():
raise HTTPException(
status_code=400,
detail="Email already exists",
)
# Exclude unset values
user_data = user.dict(exclude_unset=True)
for key, value in user_data.items():
setattr(user, key, value)
# Add user to database
db_session.add(user)
db_session.commit()
db_session.refresh(user)
user = UserRead.from_orm(user)
return user
async def update_user(
request: Request, user_id: str, user_object: User, current_user: PublicUser
request: Request,
db_session: Session,
current_user: PublicUser | None,
user_object: UserUpdate,
):
users = request.app.db["users"]
# Get user
statement = select(User).where(User.username == user_object.username)
user = db_session.exec(statement).first()
# Verify rights
await verify_user_rights_on_user(request, current_user, "update", user_id)
isUserExists = await users.find_one({"user_id": user_id})
isUsernameAvailable = await users.find_one({"username": user_object.username})
isEmailAvailable = await users.find_one({"email": user_object.email})
if not isUserExists:
if not user:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="User does not exist"
status_code=400,
detail="User does not exist",
)
# okay if username is not changed
if isUserExists["username"] == user_object.username:
user_object.username = user_object.username.lower()
# Update user
user_data = user_object.dict(exclude_unset=True)
for key, value in user_data.items():
setattr(user, key, value)
else:
if isUsernameAvailable:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Username already used"
)
# Update user in database
db_session.add(user)
db_session.commit()
db_session.refresh(user)
if isEmailAvailable:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Email already used"
)
user = UserRead.from_orm(user)
updated_user = {"$set": user_object.dict()}
users.update_one({"user_id": user_id}, updated_user)
return User(**user_object.dict())
return user
async def update_user_password(
request: Request,
current_user: PublicUser,
user_id: str,
password_change_form: PasswordChangeForm,
db_session: Session,
current_user: PublicUser | None,
form: UserUpdatePassword,
):
users = request.app.db["users"]
# Get user
statement = select(User).where(User.username == form.user_id)
user = db_session.exec(statement).first()
isUserExists = await users.find_one({"user_id": user_id})
# Verify rights
await verify_user_rights_on_user(request, current_user, "update", user_id)
if not isUserExists:
if not user:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="User does not exist"
status_code=400,
detail="User does not exist",
)
if not await security_verify_password(
password_change_form.old_password, isUserExists["password"]
):
if not await security_verify_password(form.old_password, user.password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Wrong password"
)
new_password = await security_hash_password(password_change_form.new_password)
# Update user
user.password = await security_hash_password(form.new_password)
updated_user = {"$set": {"password": new_password}}
await users.update_one({"user_id": user_id}, updated_user)
# Update user in database
db_session.add(user)
db_session.commit()
db_session.refresh(user)
return {"detail": "Password updated"}
user = UserRead.from_orm(user)
return user
async def delete_user(request: Request, current_user: PublicUser, user_id: str):
users = request.app.db["users"]
async def read_user_by_id(
request: Request,
db_session: Session,
current_user: PublicUser | None,
user_id: int,
):
# Get user
statement = select(User).where(User.id == user_id)
user = db_session.exec(statement).first()
isUserExists = await users.find_one({"user_id": user_id})
# Verify is user has permission to delete the user
await verify_user_rights_on_user(request, current_user, "delete", user_id)
if not isUserExists:
if not user:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="User does not exist"
status_code=400,
detail="User does not exist",
)
await users.delete_one({"user_id": user_id})
user = UserRead.from_orm(user)
return {"detail": "User deleted"}
return user
async def read_user_by_uuid(
request: Request,
db_session: Session,
current_user: PublicUser | None,
uuid: str,
):
# Get user
statement = select(User).where(User.user_uuid == uuid)
user = db_session.exec(statement).first()
if not user:
raise HTTPException(
status_code=400,
detail="User does not exist",
)
user = UserRead.from_orm(user)
return user
async def delete_user_by_id(
request: Request,
db_session: Session,
current_user: PublicUser | None,
user_id: int,
):
# Get user
statement = select(User).where(User.id == user_id)
user = db_session.exec(statement).first()
if not user:
raise HTTPException(
status_code=400,
detail="User does not exist",
)
# Delete user
db_session.delete(user)
db_session.commit()
return "User deleted"
# Utils & Security functions
async def security_get_user(request: Request, email: str):
users = request.app.db["users"]
user = await users.find_one({"email": email})
async def security_get_user(request: Request, db_session: Session, email: str) -> User:
# Check if user exists
statement = select(User).where(User.email == email)
user = db_session.exec(statement).first()
if not user:
raise HTTPException(
@ -217,105 +287,6 @@ async def security_get_user(request: Request, email: str):
detail="User with Email does not exist",
)
return UserInDB(**user)
user = User(**user.dict())
async def get_userid_by_username(request: Request, username: str):
users = request.app.db["users"]
user = await users.find_one({"username": username})
if not user:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="User does not exist"
)
return user["user_id"]
async def get_user_by_userid(request: Request, user_id: str):
users = request.app.db["users"]
user = await users.find_one({"user_id": user_id})
if not user:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="User does not exist"
)
user = User(**user)
return user
async def get_profile_metadata(request: Request, user):
users = request.app.db["users"]
request.app.db["roles"]
user = await users.find_one({"user_id": user["user_id"]})
if not user:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="User does not exist"
)
return {"user_object": PublicUser(**user), "roles": "random"}
# Verification of the user's permissions on the roles
async def verify_user_rights_on_user(
request: Request,
current_user: PublicUser,
action: Literal["create", "read", "update", "delete"],
user_id: str,
):
users = request.app.db["users"]
user = UserInDB(**await users.find_one({"user_id": user_id}))
if action == "create":
return True
if action == "read":
await authorization_verify_if_user_is_anon(current_user.user_id)
if current_user.user_id == user_id:
return True
for org in current_user.orgs:
if org.org_id in [org.org_id for org in user.orgs]:
return True
return False
if action == "update":
await authorization_verify_if_user_is_anon(current_user.user_id)
if current_user.user_id == user_id:
return True
for org in current_user.orgs:
if org.org_id in [org.org_id for org in user.orgs]:
if org.org_role == "owner":
return True
await authorization_verify_based_on_roles(
request, current_user.user_id, "update", user["roles"], user_id
)
return False
if action == "delete":
await authorization_verify_if_user_is_anon(current_user.user_id)
if current_user.user_id == user_id:
return True
for org in current_user.orgs:
if org.org_id in [org.org_id for org in user.orgs]:
if org.org_role == "owner":
return True
await authorization_verify_based_on_roles(
request, current_user.user_id, "update", user["roles"], user_id
)