feat: add endpoint to retrieve user certificates for a specific course

This commit is contained in:
swve 2025-07-16 20:07:52 +02:00
parent 306230174e
commit 7539b27d0f
2 changed files with 74 additions and 7 deletions

View file

@ -6,6 +6,7 @@ from src.db.courses.certifications import (
CertificationCreate,
CertificationRead,
CertificationUpdate,
CertificateUserRead,
)
from src.db.users import PublicUser
from src.security.auth import get_current_user
@ -15,6 +16,7 @@ from src.services.courses.certifications import (
get_certifications_by_course,
update_certification,
delete_certification,
get_user_certificates_for_course,
)
router = APIRouter()
@ -93,4 +95,19 @@ async def api_delete_certification(
"""
return await delete_certification(
request, certification_uuid, current_user, db_session
)
@router.get("/user/course/{course_uuid}")
async def api_get_user_certificates_for_course(
request: Request,
course_uuid: str,
current_user: PublicUser = Depends(get_current_user),
db_session: Session = Depends(get_db_session),
) -> List[dict]:
"""
Get all certificates for the current user in a specific course with certification details
"""
return await get_user_certificates_for_course(
request, course_uuid, current_user, db_session
)

View file

@ -298,6 +298,60 @@ async def create_certificate_user(
return CertificateUserRead(**certificate_user.model_dump())
async def get_user_certificates_for_course(
request: Request,
course_uuid: str,
current_user: PublicUser | AnonymousUser,
db_session: Session,
) -> List[dict]:
"""Get all certificates for a user in a specific course with certification details"""
# Check if course exists
statement = select(Course).where(Course.course_uuid == course_uuid)
course = db_session.exec(statement).first()
if not course:
raise HTTPException(
status_code=404,
detail="Course not found",
)
# RBAC check
await rbac_check(request, course_uuid, current_user, "read", db_session)
# Get all certifications for this course
statement = select(Certifications).where(Certifications.course_id == course.id)
certifications = db_session.exec(statement).all()
if not certifications:
return []
# Get all certificate users for this user and these certifications
certification_ids = [cert.id for cert in certifications if cert.id]
if not certification_ids:
return []
# Query certificate users for this user and these certifications
result = []
for cert_id in certification_ids:
statement = select(CertificateUser).where(
CertificateUser.user_id == current_user.id,
CertificateUser.certification_id == cert_id
)
cert_user = db_session.exec(statement).first()
if cert_user:
# Get the associated certification
statement = select(Certifications).where(Certifications.id == cert_id)
certification = db_session.exec(statement).first()
result.append({
"certificate_user": CertificateUserRead(**cert_user.model_dump()),
"certification": CertificationRead(**certification.model_dump()) if certification else None
})
return result
async def check_course_completion_and_create_certificate(
request: Request,
user_id: int,
@ -356,17 +410,13 @@ async def rbac_check(
):
if action == "read":
if current_user.id == 0: # Anonymous user
res = await authorization_verify_if_element_is_public(
await authorization_verify_if_element_is_public(
request, course_uuid, action, db_session
)
return res
else:
res = (
await authorization_verify_based_on_roles_and_authorship(
request, current_user.id, action, course_uuid, db_session
)
await authorization_verify_based_on_roles_and_authorship(
request, current_user.id, action, course_uuid, db_session
)
return res
else:
await authorization_verify_if_user_is_anon(current_user.id)