learnhouse/src/services/install/install.py
2023-08-14 14:26:44 +02:00

419 lines
12 KiB
Python

from datetime import datetime
from uuid import uuid4
from fastapi import HTTPException, Request, status
from pydantic import BaseModel
import requests
from config.config import get_learnhouse_config
from src.security.security import security_hash_password
from src.services.courses.activities.activities import Activity, create_activity
from src.services.courses.chapters import create_coursechapter, CourseChapter
from src.services.courses.courses import CourseInDB
from src.services.orgs.schemas.orgs import Organization, OrganizationInDB
from faker import Faker
from src.services.roles.schemas.roles import Elements, Permission, RoleInDB
from src.services.users.schemas.users import (
PublicUser,
User,
UserInDB,
UserOrganization,
UserRolesInOrganization,
UserWithPassword,
)
class InstallInstance(BaseModel):
install_id: str
created_date: str
updated_date: str
step: int
data: dict
async def isInstallModeEnabled():
config = get_learnhouse_config()
if config.general_config.install_mode:
return True
else:
raise HTTPException(
status_code=403,
detail="Install mode is not enabled",
)
async def create_install_instance(request: Request, data: dict):
installs = request.app.db["installs"]
# get install_id
install_id = str(f"install_{uuid4()}")
created_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
updated_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
step = 1
# create install
install = InstallInstance(
install_id=install_id,
created_date=created_date,
updated_date=updated_date,
step=step,
data=data,
)
# insert install
installs.insert_one(install.dict())
return install
async def get_latest_install_instance(request: Request):
installs = request.app.db["installs"]
# get latest created install instance using find_one
install = await installs.find_one(
sort=[("created_date", -1)], limit=1, projection={"_id": 0}
)
if install is None:
raise HTTPException(
status_code=404,
detail="No install instance found",
)
else:
install = InstallInstance(**install)
return install
async def update_install_instance(request: Request, data: dict, step: int):
installs = request.app.db["installs"]
# get latest created install
install = await installs.find_one(
sort=[("created_date", -1)], limit=1, projection={"_id": 0}
)
if install is None:
return None
else:
# update install
install["data"] = data
install["step"] = step
install["updated_date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# update install
await installs.update_one(
{"install_id": install["install_id"]}, {"$set": install}
)
install = InstallInstance(**install)
return install
############################################################################################################
# Steps
############################################################################################################
# Install Default roles
async def install_default_elements(request: Request, data: dict):
roles = request.app.db["roles"]
# check if default roles ADMIN_ROLE and USER_ROLE already exist
admin_role = await roles.find_one({"role_id": "role_admin"})
user_role = await roles.find_one({"role_id": "role_member"})
if admin_role is not None or user_role is not None:
raise HTTPException(
status_code=400,
detail="Default roles already exist",
)
# get default roles
ADMIN_ROLE = RoleInDB(
name="Admin Role",
description="This role grants all permissions to the user",
elements=Elements(
courses=Permission(
action_create=True,
action_read=True,
action_update=True,
action_delete=True,
),
users=Permission(
action_create=True,
action_read=True,
action_update=True,
action_delete=True,
),
houses=Permission(
action_create=True,
action_read=True,
action_update=True,
action_delete=True,
),
collections=Permission(
action_create=True,
action_read=True,
action_update=True,
action_delete=True,
),
organizations=Permission(
action_create=True,
action_read=True,
action_update=True,
action_delete=True,
),
coursechapters=Permission(
action_create=True,
action_read=True,
action_update=True,
action_delete=True,
),
activities=Permission(
action_create=True,
action_read=True,
action_update=True,
action_delete=True,
),
),
org_id="*",
role_id="role_admin",
created_at=str(datetime.now()),
updated_at=str(datetime.now()),
)
USER_ROLE = RoleInDB(
name="Member Role",
description="This role grants read-only permissions to the user",
elements=Elements(
courses=Permission(
action_create=False,
action_read=True,
action_update=False,
action_delete=False,
),
users=Permission(
action_create=False,
action_read=True,
action_update=False,
action_delete=False,
),
houses=Permission(
action_create=False,
action_read=True,
action_update=False,
action_delete=False,
),
collections=Permission(
action_create=False,
action_read=True,
action_update=False,
action_delete=False,
),
organizations=Permission(
action_create=False,
action_read=True,
action_update=False,
action_delete=False,
),
coursechapters=Permission(
action_create=False,
action_read=True,
action_update=False,
action_delete=False,
),
activities=Permission(
action_create=False,
action_read=True,
action_update=False,
action_delete=False,
),
),
org_id="*",
role_id="role_member",
created_at=str(datetime.now()),
updated_at=str(datetime.now()),
)
try:
# insert default roles
await roles.insert_many([USER_ROLE.dict(), ADMIN_ROLE.dict()])
return True
except Exception:
raise HTTPException(
status_code=400,
detail="Error while inserting default roles",
)
# Organization creation
async def install_create_organization(
request: Request,
org_object: Organization,
):
orgs = request.app.db["organizations"]
request.app.db["users"]
# find if org already exists using name
isOrgAvailable = await orgs.find_one({"slug": org_object.slug.lower()})
if isOrgAvailable:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Organization slug already exists",
)
# generate org_id with uuid4
org_id = str(f"org_{uuid4()}")
org = OrganizationInDB(org_id=org_id, **org_object.dict())
org_in_db = await orgs.insert_one(org.dict())
if not org_in_db:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Unavailable database",
)
return org.dict()
async def install_create_organization_user(
request: Request, user_object: UserWithPassword, org_slug: str
):
users = request.app.db["users"]
isUsernameAvailable = await users.find_one({"username": user_object.username})
isEmailAvailable = await users.find_one({"email": user_object.email})
if isUsernameAvailable:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Username already exists"
)
if isEmailAvailable:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Email already exists"
)
# Generate user_id with uuid4
user_id = str(f"user_{uuid4()}")
# Set the username & hash the password
user_object.username = user_object.username.lower()
user_object.password = await security_hash_password(user_object.password)
# Get org_id from org_slug
orgs = request.app.db["organizations"]
# Check if the org exists
isOrgExists = await orgs.find_one({"slug": org_slug})
# If the org does not exist, raise an error
if not isOrgExists:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="You are trying to create a user in an organization that does not exist",
)
org_id = isOrgExists["org_id"]
# Create initial orgs list with the org_id passed in
orgs = [UserOrganization(org_id=org_id, org_role="owner")]
# Give role
roles = [UserRolesInOrganization(role_id="role_admin", org_id=org_id)]
# Create the user
user = UserInDB(
user_id=user_id,
creation_date=str(datetime.now()),
update_date=str(datetime.now()),
orgs=orgs,
roles=roles,
**user_object.dict(),
)
# Insert the user into the database
await users.insert_one(user.dict())
return User(**user.dict())
async def create_sample_data(org_slug: str, username: str, request: Request):
Faker(["en_US"])
fake_multilang = Faker(
["en_US", "de_DE", "ja_JP", "es_ES", "it_IT", "pt_BR", "ar_PS"]
)
users = request.app.db["users"]
orgs = request.app.db["organizations"]
user = await users.find_one({"username": username})
org = await orgs.find_one({"slug": org_slug.lower()})
user_id = user["user_id"]
org_id = org["org_id"]
current_user = PublicUser(**user)
for i in range(0, 5):
# get image in BinaryIO format from unsplash and save it to disk
image = requests.get("https://source.unsplash.com/random/800x600")
with open("thumbnail.jpg", "wb") as f:
f.write(image.content)
course_id = f"course_{uuid4()}"
course = CourseInDB(
name=fake_multilang.unique.sentence(),
description=fake_multilang.unique.text(),
mini_description=fake_multilang.unique.text(),
thumbnail="thumbnail",
org_id=org_id,
learnings=[fake_multilang.unique.sentence() for i in range(0, 5)],
public=True,
chapters=[],
course_id=course_id,
creationDate=str(datetime.now()),
updateDate=str(datetime.now()),
authors=[user_id],
chapters_content=[],
)
courses = request.app.db["courses"]
course = CourseInDB(**course.dict())
await courses.insert_one(course.dict())
# create chapters
for i in range(0, 5):
coursechapter = CourseChapter(
name=fake_multilang.unique.sentence(),
description=fake_multilang.unique.text(),
activities=[],
)
coursechapter = await create_coursechapter(
request, coursechapter, course_id, current_user
)
if coursechapter:
# create activities
for i in range(0, 5):
activity = Activity(
name=fake_multilang.unique.sentence(),
type="dynamic",
content={},
)
activity = await create_activity(
request,
activity,
org_id,
coursechapter["coursechapter_id"],
current_user,
)