fix: adapt mock code to new data stuctures

This commit is contained in:
swve 2023-04-08 00:04:09 +02:00
parent 768c270959
commit f4c239d848
3 changed files with 202 additions and 175 deletions

View file

@ -1,186 +1,212 @@
# import requests
# from datetime import datetime
# from fileinput import filename
# from pprint import pprint
# from uuid import uuid4
# from fastapi import File, UploadFile, Request
# from src.services.courses.chapters import CourseChapter, create_coursechapter
# from src.services.courses.activities.activities import Activity, create_activity
# from src.services.courses.thumbnails import upload_thumbnail
# from src.services.users.users import PublicUser, User, UserInDB, UserWithPassword
import requests
from datetime import datetime
from fileinput import filename
from pprint import pprint
from uuid import uuid4
from fastapi import File, UploadFile, Request
from src.security.security import security_hash_password
from src.services.courses.chapters import CourseChapter, create_coursechapter
from src.services.courses.activities.activities import Activity, create_activity
from src.services.courses.thumbnails import upload_thumbnail
from src.services.users.users import PublicUser, User, UserInDB, UserWithPassword
# from src.services.orgs import OrganizationInDB, Organization, create_org
# from src.services.roles.schemas.roles import Permission, Elements, RolePolicy, create_role
# from src.services.users.users import create_user
# from src.services.courses.courses import Course, CourseInDB, create_course
# from src.services.roles.roles import Role
# from faker import Faker
from src.services.orgs import OrganizationInDB, Organization, create_org
from src.services.roles.schemas.roles import Permission, Elements, RoleInDB
from src.services.users.users import create_user
from src.services.courses.courses import Course, CourseInDB, create_course
from src.services.roles.roles import Role, create_role
from faker import Faker
# async def create_initial_data(request: Request):
# fake = Faker(['en_US'])
# fake_multilang = Faker(
# ['en_US', 'de_DE', 'ja_JP', 'es_ES', 'it_IT', 'pt_BR', 'ar_PS'])
# # Create users
# ########################################
# database_users = request.app.db["users"]
# await database_users.delete_many({})
# users = []
# admin_user = UserWithPassword(
# username=f"admin",
# email=f"admin@admin.admin",
# password="admin",
# user_type="isOwner",
# )
# admin_user = await create_user(request, admin_user)
# for i in range(0, 20):
# user = UserWithPassword(
# username=fake.simple_profile()['username'],
# email=fake.email(),
# password=fake.password(),
# user_type="isOwner",
# full_name=fake.name(),
# )
# users.append(user)
# for user in users:
# await create_user(request, user)
# # find admin user
# users = request.app.db["users"]
# admin_user = await users.find_one({"username": "admin"})
# if admin_user:
# admin_user = UserInDB(**admin_user)
# current_user = PublicUser(**admin_user.dict())
# else:
# raise Exception("Admin user not found")
# # Create organizations
# ########################################
# database_orgs = request.app.db["organizations"]
# await database_orgs.delete_many({})
# organizations = []
# for i in range(0, 5):
# company = fake.company()
# # remove whitespace and special characters and make lowercase
# slug = ''.join(e for e in company if e.isalnum()).lower()
# org = Organization(
# name=company,
# description=fake.unique.text(),
# email=fake.unique.email(),
# slug=slug,
# )
# organizations.append(org)
# await create_org(request, org, current_user)
# # Create roles
# ########################################
# database_roles = request.app.db["roles"]
# await database_roles.delete_many({})
async def create_initial_data(request: Request):
fake = Faker(['en_US'])
fake_multilang = Faker(
['en_US', 'de_DE', 'ja_JP', 'es_ES', 'it_IT', 'pt_BR', 'ar_PS'])
# Create users
########################################
database_users = request.app.db["users"]
await database_users.delete_many({})
users = []
admin_user = UserInDB(
user_id="user_admin",
creation_date=str(datetime.now()),
update_date=str(datetime.now()),
roles=["role_admin"],
orgs=[],
username=f"admin",
email=f"admin@admin.admin",
password=str(await security_hash_password("admin")),
)
db_admin_user = await database_users.insert_one(admin_user.dict())
# find admin user
users = request.app.db["users"]
admin_user = await users.find_one({"username": "admin"})
if admin_user:
admin_user = UserInDB(**admin_user)
current_user = PublicUser(**admin_user.dict())
else:
raise Exception("Admin user not found")
# Create roles
########################################
database_roles = request.app.db["roles"]
await database_roles.delete_many({})
# roles = []
# admin_role = Role(
# name="admin",
# description="admin",
# policies=[RolePolicy(permissions=Permission(
# action_create=True,
# action_read=True,
# action_update=True,
# action_delete=True,
# ),
# elements=Elements(
# courses=["*"],
# users=["*"],
# houses=["*"],
# collections=["*"],
# organizations=["*"],
# coursechapters=["*"],
# activities=["*"],
# ))],
# linked_users=[admin_user.user_id],
# )
# roles.append(admin_role)
roles = []
admin_role = RoleInDB(
name="Admin",
description="Admin",
elements=Elements(
courses=Permission(
action_create=True,
action_read=True,
action_update=True,
action_delete=True,
),
users=Permission(
action_create=True,
action_read=True,
action_update=True,
action_delete=True,
),
houses=Permission(
action_create=True,
action_read=True,
action_update=True,
action_delete=True,
),
collections=Permission(
action_create=True,
action_read=True,
action_update=True,
action_delete=True,
),
organizations=Permission(
action_create=True,
action_read=True,
action_update=True,
action_delete=True,
),
coursechapters=Permission(
action_create=True,
action_read=True,
action_update=True,
action_delete=True,
),
activities=Permission(
action_create=True,
action_read=True,
action_update=True,
action_delete=True,
),
),
org_id="org_test",
role_id="role_admin",
created_at=str(datetime.now()),
updated_at=str(datetime.now()),
)
# await create_role(request, admin_role, current_user)
roles.append(admin_role)
# # Generate Courses and CourseChapters
# ########################################
for role in roles:
database_roles.insert_one(role.dict())
# database_courses = request.app.db["courses"]
# database_chapters = request.app.db["coursechapters"]
# await database_courses.delete_many({})
# await database_chapters.delete_many({})
# Create organizations
########################################
# courses = []
# orgs = request.app.db["organizations"]
database_orgs = request.app.db["organizations"]
await database_orgs.delete_many({})
# if await orgs.count_documents({}) > 0:
# for org in await orgs.find().to_list(length=100):
# for i in range(0, 5):
organizations = []
for i in range(0, 5):
company = fake.company()
# remove whitespace and special characters and make lowercase
slug = ''.join(e for e in company if e.isalnum()).lower()
org = Organization(
name=company,
description=fake.unique.text(),
email=fake.unique.email(),
slug=slug,
default=False
)
organizations.append(org)
await create_org(request, org, current_user)
# # get image in BinaryIO format from unsplash and save it to disk
# image = requests.get(
# "https://source.unsplash.com/random/800x600")
# with open("thumbnail.jpg", "wb") as f:
# f.write(image.content)
# course_id = f"course_{uuid4()}"
# course = CourseInDB(
# name=fake_multilang.unique.sentence(),
# description=fake_multilang.unique.text(),
# mini_description=fake_multilang.unique.text(),
# thumbnail="thumbnail",
# org_id=org['org_id'],
# learnings=[fake_multilang.unique.sentence()
# for i in range(0, 5)],
# public=True,
# chapters=[],
# course_id=course_id,
# creationDate=str(datetime.now()),
# updateDate=str(datetime.now()),
# authors=[current_user.user_id],
# )
# Generate Courses and CourseChapters
########################################
# courses = request.app.db["courses"]
# name_in_disk = f"test_mock{course_id}.jpeg"
database_courses = request.app.db["courses"]
await database_courses.delete_many({})
# image = requests.get(
# "https://source.unsplash.com/random/800x600")
# with open(f"content/uploads/img/{name_in_disk}", "wb") as f:
# f.write(image.content)
courses = []
orgs = request.app.db["organizations"]
# course.thumbnail = name_in_disk
if await orgs.count_documents({}) > 0:
for org in await orgs.find().to_list(length=100):
for i in range(0, 5):
# course = CourseInDB(**course.dict())
# course_in_db = await courses.insert_one(course.dict())
# get image in BinaryIO format from unsplash and save it to disk
image = requests.get(
"https://source.unsplash.com/random/800x600")
with open("thumbnail.jpg", "wb") as f:
f.write(image.content)
# # create chapters
# for i in range(0, 5):
# coursechapter = CourseChapter(
# name=fake_multilang.unique.sentence(),
# description=fake_multilang.unique.text(),
# activities=[],
# )
# coursechapter = await create_coursechapter(request,coursechapter, course_id, current_user)
# pprint(coursechapter)
# if coursechapter:
# # create activities
# for i in range(0, 5):
# activity = Activity(
# name=fake_multilang.unique.sentence(),
# type="dynamic",
# content={},
# )
# activity = await create_activity(request,activity, coursechapter['coursechapter_id'], current_user)
course_id = f"course_{uuid4()}"
course = CourseInDB(
name=fake_multilang.unique.sentence(),
description=fake_multilang.unique.text(),
mini_description=fake_multilang.unique.text(),
thumbnail="thumbnail",
org_id=org['org_id'],
learnings=[fake_multilang.unique.sentence()
for i in range(0, 5)],
public=True,
chapters=[],
course_id=course_id,
creationDate=str(datetime.now()),
updateDate=str(datetime.now()),
authors=[current_user.user_id],
chapters_content=[],
)
courses = request.app.db["courses"]
name_in_disk = f"test_mock{course_id}.jpeg"
image = requests.get(
"https://source.unsplash.com/random/800x600")
with open(f"content/uploads/img/{name_in_disk}", "wb") as f:
f.write(image.content)
course.thumbnail = name_in_disk
course = CourseInDB(**course.dict())
course_in_db = await courses.insert_one(course.dict())
# create chapters
for i in range(0, 5):
coursechapter = CourseChapter(
name=fake_multilang.unique.sentence(),
description=fake_multilang.unique.text(),
activities=[],
)
coursechapter = await create_coursechapter(request,coursechapter, course_id, current_user)
pprint(coursechapter)
if coursechapter:
# create activities
for i in range(0, 5):
activity = Activity(
name=fake_multilang.unique.sentence(),
type="dynamic",
content={},
)
activity = await create_activity(request,activity, "org_test", coursechapter['coursechapter_id'], current_user)

View file

@ -185,9 +185,8 @@ async def add_course_to_trail(request: Request, user: PublicUser, orgslug: str,
if not trail:
trail = TrailInDB(trail_id=f"trail_{uuid4()}", user_id=user.user_id, org_id=org["org_id"], courses=[])
trail = await trails.insert_one(trail.dict())
trail = await trails.find_one({"trail_id": trail.inserted_id})
trail_to_insert = TrailInDB(trail_id=f"trail_{uuid4()}", user_id=user.user_id, org_id=org["org_id"], courses=[])
trail_to_insert = await trails.insert_one(trail_to_insert.dict())
# check if course is already present in the trail
for element in trail["courses"]: