feat: backend endpoints

This commit is contained in:
swve 2025-07-16 18:16:28 +02:00
parent 86f7a80eb7
commit b0d492a116
6 changed files with 566 additions and 3 deletions

View file

@ -0,0 +1,65 @@
from typing import Optional
from sqlalchemy import JSON, Column, ForeignKey
from sqlmodel import Field, SQLModel
from enum import Enum
from datetime import datetime
class CertificationBase(SQLModel):
course_id: int = Field(sa_column= Column("course_id", ForeignKey("course.id", ondelete="CASCADE")))
config: dict = Field(default={}, sa_column= Column("config", JSON))
class Certifications(CertificationBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
certification_uuid: str = Field(unique=True)
course_id: int = Field(sa_column= Column("course_id", ForeignKey("course.id", ondelete="CASCADE")))
config: dict = Field(default={}, sa_column= Column("config", JSON))
creation_date: str = ""
update_date: str = ""
class CertificationCreate(SQLModel):
course_id: int
config: dict = Field(default={})
class CertificationUpdate(SQLModel):
config: Optional[dict] = None
class CertificationRead(SQLModel):
id: int
certification_uuid: str
course_id: int
config: dict
creation_date: str
update_date: str
class CertificateUserBase(SQLModel):
user_id: int = Field(sa_column= Column("user_id", ForeignKey("user.id", ondelete="CASCADE")))
certification_id: int = Field(sa_column= Column("certification_id", ForeignKey("certifications.id", ondelete="CASCADE")))
user_certification_uuid: str
class CertificateUser(CertificateUserBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
user_id: int = Field(sa_column= Column("user_id", ForeignKey("user.id", ondelete="CASCADE")))
certification_id: int = Field(sa_column= Column("certification_id", ForeignKey("certifications.id", ondelete="CASCADE")))
user_certification_uuid: str
created_at: str = ""
updated_at: str = ""
class CertificateUserCreate(SQLModel):
user_id: int
certification_id: int
user_certification_uuid: str
class CertificateUserRead(SQLModel):
id: int
user_id: int
certification_id: int
user_certification_uuid: str
created_at: str
updated_at: str
class CertificateUserUpdate(SQLModel):
user_id: Optional[int] = None
certification_id: Optional[int] = None
user_certification_uuid: Optional[str] = None

View file

@ -4,7 +4,7 @@ from src.routers import health
from src.routers import usergroups from src.routers import usergroups
from src.routers import dev, trail, users, auth, orgs, roles, search from src.routers import dev, trail, users, auth, orgs, roles, search
from src.routers.ai import ai from src.routers.ai import ai
from src.routers.courses import chapters, collections, courses, assignments from src.routers.courses import chapters, collections, courses, assignments, certifications
from src.routers.courses.activities import activities, blocks from src.routers.courses.activities import activities, blocks
from src.routers.ee import cloud_internal, payments from src.routers.ee import cloud_internal, payments
from src.routers.install import install from src.routers.install import install
@ -33,6 +33,9 @@ v1_router.include_router(activities.router, prefix="/activities", tags=["activit
v1_router.include_router( v1_router.include_router(
collections.router, prefix="/collections", tags=["collections"] collections.router, prefix="/collections", tags=["collections"]
) )
v1_router.include_router(
certifications.router, prefix="/certifications", tags=["certifications"]
)
v1_router.include_router(trail.router, prefix="/trail", tags=["trail"]) v1_router.include_router(trail.router, prefix="/trail", tags=["trail"])
v1_router.include_router(ai.router, prefix="/ai", tags=["ai"]) v1_router.include_router(ai.router, prefix="/ai", tags=["ai"])
v1_router.include_router(payments.router, prefix="/payments", tags=["payments"]) v1_router.include_router(payments.router, prefix="/payments", tags=["payments"])

View file

@ -0,0 +1,96 @@
from typing import List
from fastapi import APIRouter, Depends, Request
from sqlmodel import Session
from src.core.events.database import get_db_session
from src.db.courses.certifications import (
CertificationCreate,
CertificationRead,
CertificationUpdate,
)
from src.db.users import PublicUser
from src.security.auth import get_current_user
from src.services.courses.certifications import (
create_certification,
get_certification,
get_certifications_by_course,
update_certification,
delete_certification,
)
router = APIRouter()
@router.post("/")
async def api_create_certification(
request: Request,
certification_object: CertificationCreate,
current_user: PublicUser = Depends(get_current_user),
db_session: Session = Depends(get_db_session),
) -> CertificationRead:
"""
Create new certification for a course
"""
return await create_certification(
request, certification_object, current_user, db_session
)
@router.get("/{certification_uuid}")
async def api_get_certification(
request: Request,
certification_uuid: str,
current_user: PublicUser = Depends(get_current_user),
db_session: Session = Depends(get_db_session),
) -> CertificationRead:
"""
Get single certification by certification_id
"""
return await get_certification(
request, certification_uuid, current_user, db_session
)
@router.get("/course/{course_uuid}")
async def api_get_certifications_by_course(
request: Request,
course_uuid: str,
current_user: PublicUser = Depends(get_current_user),
db_session: Session = Depends(get_db_session),
) -> List[CertificationRead]:
"""
Get all certifications for a specific course
"""
return await get_certifications_by_course(
request, course_uuid, current_user, db_session
)
@router.put("/{certification_uuid}")
async def api_update_certification(
request: Request,
certification_uuid: str,
certification_object: CertificationUpdate,
current_user: PublicUser = Depends(get_current_user),
db_session: Session = Depends(get_db_session),
) -> CertificationRead:
"""
Update certification by certification_id
"""
return await update_certification(
request, certification_uuid, certification_object, current_user, db_session
)
@router.delete("/{certification_uuid}")
async def api_delete_certification(
request: Request,
certification_uuid: str,
current_user: PublicUser = Depends(get_current_user),
db_session: Session = Depends(get_db_session),
):
"""
Delete certification by certification_id
"""
return await delete_certification(
request, certification_uuid, current_user, db_session
)

View file

@ -44,6 +44,7 @@ from src.services.courses.activities.uploads.tasks_ref_files import (
upload_reference_file, upload_reference_file,
) )
from src.services.trail.trail import check_trail_presence from src.services.trail.trail import check_trail_presence
from src.services.courses.certifications import check_course_completion_and_create_certificate
## > Assignments CRUD ## > Assignments CRUD
@ -1237,6 +1238,12 @@ async def create_assignment_submission(
db_session.commit() db_session.commit()
db_session.refresh(trailstep) db_session.refresh(trailstep)
# Check if all activities in the course are completed and create certificate if so
if course and course.id and user and user.id:
await check_course_completion_and_create_certificate(
request, user.id, course.id, db_session
)
# return assignment user submission read # return assignment user submission read
return AssignmentUserSubmissionRead.model_validate(assignment_user_submission) return AssignmentUserSubmissionRead.model_validate(assignment_user_submission)
@ -1658,6 +1665,12 @@ async def mark_activity_as_done_for_user(
db_session.commit() db_session.commit()
db_session.refresh(trailstep) db_session.refresh(trailstep)
# Check if all activities in the course are completed and create certificate if so
if course and course.id:
await check_course_completion_and_create_certificate(
request, int(user_id), course.id, db_session
)
# return OK # return OK
return {"message": "Activity marked as done for user"} return {"message": "Activity marked as done for user"}

View file

@ -0,0 +1,379 @@
from typing import List, Literal
from uuid import uuid4
from datetime import datetime
from sqlmodel import Session, select
from fastapi import HTTPException, Request
from src.db.courses.certifications import (
Certifications,
CertificationCreate,
CertificationRead,
CertificationUpdate,
CertificateUser,
CertificateUserCreate,
CertificateUserRead,
)
from src.db.courses.courses import Course
from src.db.courses.chapter_activities import ChapterActivity
from src.db.trail_steps import TrailStep
from src.db.users import PublicUser, AnonymousUser
from src.security.rbac.rbac import (
authorization_verify_based_on_roles_and_authorship,
authorization_verify_if_element_is_public,
authorization_verify_if_user_is_anon,
)
####################################################
# CRUD
####################################################
async def create_certification(
request: Request,
certification_object: CertificationCreate,
current_user: PublicUser | AnonymousUser,
db_session: Session,
) -> CertificationRead:
"""Create a new certification for a course"""
# Check if course exists
statement = select(Course).where(Course.id == certification_object.course_id)
course = db_session.exec(statement).first()
if not course:
raise HTTPException(
status_code=404,
detail="Course not found",
)
# RBAC check
await rbac_check(request, course.course_uuid, current_user, "create", db_session)
# Create certification
certification = Certifications(
course_id=certification_object.course_id,
config=certification_object.config or {},
certification_uuid=str(f"certification_{uuid4()}"),
creation_date=str(datetime.now()),
update_date=str(datetime.now()),
)
# Insert certification in DB
db_session.add(certification)
db_session.commit()
db_session.refresh(certification)
return CertificationRead(**certification.model_dump())
async def get_certification(
request: Request,
certification_uuid: str,
current_user: PublicUser | AnonymousUser,
db_session: Session,
) -> CertificationRead:
"""Get a single certification by certification_id"""
statement = select(Certifications).where(Certifications.certification_uuid == certification_uuid)
certification = db_session.exec(statement).first()
if not certification:
raise HTTPException(
status_code=404,
detail="Certification not found",
)
# Get course for RBAC check
statement = select(Course).where(Course.id == certification.course_id)
course = db_session.exec(statement).first()
if not course:
raise HTTPException(
status_code=404,
detail="Course not found",
)
# RBAC check
await rbac_check(request, course.course_uuid, current_user, "read", db_session)
return CertificationRead(**certification.model_dump())
async def get_certifications_by_course(
request: Request,
course_uuid: str,
current_user: PublicUser | AnonymousUser,
db_session: Session,
) -> List[CertificationRead]:
"""Get all certifications for a course"""
# Get course for RBAC check
statement = select(Course).where(Course.course_uuid == course_uuid)
course = db_session.exec(statement).first()
if not course:
raise HTTPException(
status_code=404,
detail="Course not found",
)
# RBAC check
await rbac_check(request, course_uuid, current_user, "read", db_session)
# Get certifications for this course
statement = select(Certifications).where(Certifications.course_id == course.id)
certifications = db_session.exec(statement).all()
return [CertificationRead(**certification.model_dump()) for certification in certifications]
async def update_certification(
request: Request,
certification_uuid: str,
certification_object: CertificationUpdate,
current_user: PublicUser | AnonymousUser,
db_session: Session,
) -> CertificationRead:
"""Update a certification"""
statement = select(Certifications).where(Certifications.certification_uuid == certification_uuid)
certification = db_session.exec(statement).first()
if not certification:
raise HTTPException(
status_code=404,
detail="Certification not found",
)
# Get course for RBAC check
statement = select(Course).where(Course.id == certification.course_id)
course = db_session.exec(statement).first()
if not course:
raise HTTPException(
status_code=404,
detail="Course not found",
)
# RBAC check
await rbac_check(request, course.course_uuid, current_user, "update", db_session)
# Update only the fields that were passed in
for var, value in vars(certification_object).items():
if value is not None:
setattr(certification, var, value)
# Update the update_date
certification.update_date = str(datetime.now())
db_session.add(certification)
db_session.commit()
db_session.refresh(certification)
return CertificationRead(**certification.model_dump())
async def delete_certification(
request: Request,
certification_uuid: str,
current_user: PublicUser | AnonymousUser,
db_session: Session,
) -> dict:
"""Delete a certification"""
statement = select(Certifications).where(Certifications.certification_uuid == certification_uuid)
certification = db_session.exec(statement).first()
if not certification:
raise HTTPException(
status_code=404,
detail="Certification not found",
)
# Get course for RBAC check
statement = select(Course).where(Course.id == certification.course_id)
course = db_session.exec(statement).first()
if not course:
raise HTTPException(
status_code=404,
detail="Course not found",
)
# RBAC check
await rbac_check(request, course.course_uuid, current_user, "delete", db_session)
db_session.delete(certification)
db_session.commit()
return {"detail": "Certification deleted successfully"}
####################################################
# Certificate User Functions
####################################################
async def create_certificate_user(
request: Request,
user_id: int,
certification_id: int,
db_session: Session,
) -> CertificateUserRead:
"""Create a certificate user link"""
# Check if certification exists
statement = select(Certifications).where(Certifications.id == certification_id)
certification = db_session.exec(statement).first()
if not certification:
raise HTTPException(
status_code=404,
detail="Certification not found",
)
# Check if certificate user already exists
statement = select(CertificateUser).where(
CertificateUser.user_id == user_id,
CertificateUser.certification_id == certification_id
)
existing_certificate_user = db_session.exec(statement).first()
if existing_certificate_user:
raise HTTPException(
status_code=400,
detail="User already has a certificate for this course",
)
# Generate readable certificate user UUID
current_year = datetime.now().year
current_month = datetime.now().month
current_day = datetime.now().day
# Get user to extract user_uuid
from src.db.users import User
statement = select(User).where(User.id == user_id)
user = db_session.exec(statement).first()
if not user:
raise HTTPException(
status_code=404,
detail="User not found",
)
# Extract last 4 characters from user_uuid for uniqueness (since all start with "user_")
user_uuid_short = user.user_uuid[-4:] if user.user_uuid else "USER"
# Generate random 2-letter prefix
import random
import string
random_prefix = ''.join(random.choices(string.ascii_uppercase, k=2))
# Get the count of existing certificate users for this user today
today_user_prefix = f"{random_prefix}-{current_year}{current_month:02d}{current_day:02d}-{user_uuid_short}-"
statement = select(CertificateUser).where(
CertificateUser.user_certification_uuid.startswith(today_user_prefix)
)
existing_certificates = db_session.exec(statement).all()
# Generate next sequential number for this user today
next_number = len(existing_certificates) + 1
certificate_number = f"{next_number:03d}" # Format as 3-digit number with leading zeros
user_certification_uuid = f"{today_user_prefix}{certificate_number}"
# Create certificate user
certificate_user = CertificateUser(
user_id=user_id,
certification_id=certification_id,
user_certification_uuid=user_certification_uuid,
created_at=str(datetime.now()),
updated_at=str(datetime.now()),
)
db_session.add(certificate_user)
db_session.commit()
db_session.refresh(certificate_user)
return CertificateUserRead(**certificate_user.model_dump())
async def check_course_completion_and_create_certificate(
request: Request,
user_id: int,
course_id: int,
db_session: Session,
) -> bool:
"""Check if all activities in a course are completed and create certificate if so"""
# Get all activities in the course
statement = select(ChapterActivity).where(ChapterActivity.course_id == course_id)
course_activities = db_session.exec(statement).all()
if not course_activities:
return False # No activities in course
# Get all completed activities for this user in this course
statement = select(TrailStep).where(
TrailStep.user_id == user_id,
TrailStep.course_id == course_id,
TrailStep.complete == True
)
completed_activities = db_session.exec(statement).all()
# Check if all activities are completed
if len(completed_activities) >= len(course_activities):
# All activities completed, check if certification exists for this course
statement = select(Certifications).where(Certifications.course_id == course_id)
certification = db_session.exec(statement).first()
if certification and certification.id:
# Create certificate user link
try:
await create_certificate_user(request, user_id, certification.id, db_session)
return True
except HTTPException as e:
if e.status_code == 400 and "already has a certificate" in e.detail:
# Certificate already exists, which is fine
return True
else:
raise e
return False
####################################################
# RBAC Utils
####################################################
async def rbac_check(
request: Request,
course_uuid: str,
current_user: PublicUser | AnonymousUser,
action: Literal["create", "read", "update", "delete"],
db_session: Session,
):
if action == "read":
if current_user.id == 0: # Anonymous user
res = await authorization_verify_if_element_is_public(
request, course_uuid, action, db_session
)
return res
else:
res = (
await authorization_verify_based_on_roles_and_authorship(
request, current_user.id, action, course_uuid, db_session
)
)
return res
else:
await authorization_verify_if_user_is_anon(current_user.id)
await authorization_verify_based_on_roles_and_authorship(
request,
current_user.id,
action,
course_uuid,
db_session,
)

View file

@ -9,6 +9,7 @@ from src.db.trail_runs import TrailRun, TrailRunRead
from src.db.trail_steps import TrailStep from src.db.trail_steps import TrailStep
from src.db.trails import Trail, TrailCreate, TrailRead from src.db.trails import Trail, TrailCreate, TrailRead
from src.db.users import AnonymousUser, PublicUser from src.db.users import AnonymousUser, PublicUser
from src.services.courses.certifications import check_course_completion_and_create_certificate
async def create_user_trail( async def create_user_trail(
@ -68,7 +69,7 @@ async def get_user_trails(
for trail_run in trail_runs: for trail_run in trail_runs:
statement = select(Course).where(Course.id == trail_run.course_id) statement = select(Course).where(Course.id == trail_run.course_id)
course = db_session.exec(statement).first() course = db_session.exec(statement).first()
trail_run.course = course trail_run.course = course.model_dump() if course else {}
# Add number of activities (steps) in a course # Add number of activities (steps) in a course
statement = select(ChapterActivity).where( statement = select(ChapterActivity).where(
@ -153,7 +154,7 @@ async def get_user_trail_with_orgid(
for trail_run in trail_runs: for trail_run in trail_runs:
statement = select(Course).where(Course.id == trail_run.course_id) statement = select(Course).where(Course.id == trail_run.course_id)
course = db_session.exec(statement).first() course = db_session.exec(statement).first()
trail_run.course = course trail_run.course = course.model_dump() if course else {}
# Add number of activities (steps) in a course # Add number of activities (steps) in a course
statement = select(ChapterActivity).where( statement = select(ChapterActivity).where(
@ -255,6 +256,12 @@ async def add_activity_to_trail(
db_session.commit() db_session.commit()
db_session.refresh(trailstep) db_session.refresh(trailstep)
# Check if all activities in the course are completed and create certificate if so
if course and course.id:
await check_course_completion_and_create_certificate(
request, user.id, course.id, db_session
)
statement = select(TrailRun).where(TrailRun.trail_id == trail.id , TrailRun.user_id == user.id) statement = select(TrailRun).where(TrailRun.trail_id == trail.id , TrailRun.user_id == user.id)
trail_runs = db_session.exec(statement).all() trail_runs = db_session.exec(statement).all()