feat: Improve the contributors admin UX/UI

This commit is contained in:
swve 2025-06-08 11:52:36 +02:00
parent 77aec2cf92
commit c0c32f9564
8 changed files with 899 additions and 50 deletions

View file

@ -36,6 +36,8 @@ from src.services.courses.contributors import (
apply_course_contributor,
update_course_contributor,
get_course_contributors,
add_bulk_course_contributors,
remove_bulk_course_contributors,
)
from src.db.resource_authors import ResourceAuthorshipEnum, ResourceAuthorshipStatusEnum
@ -318,3 +320,45 @@ async def api_update_course_contributor(
current_user,
db_session
)
@router.post("/{course_uuid}/bulk-add-contributors")
async def api_add_bulk_course_contributors(
request: Request,
course_uuid: str,
usernames: List[str],
db_session: Session = Depends(get_db_session),
current_user: PublicUser = Depends(get_current_user),
):
"""
Add multiple contributors to a course by their usernames
Only administrators can perform this action
"""
return await add_bulk_course_contributors(
request,
course_uuid,
usernames,
current_user,
db_session
)
@router.put("/{course_uuid}/bulk-remove-contributors")
async def api_remove_bulk_course_contributors(
request: Request,
course_uuid: str,
usernames: List[str],
db_session: Session = Depends(get_db_session),
current_user: PublicUser = Depends(get_current_user),
):
"""
Remove multiple contributors from a course by their usernames
Only administrators can perform this action
"""
return await remove_bulk_course_contributors(
request,
course_uuid,
usernames,
current_user,
db_session
)

View file

@ -173,4 +173,200 @@ async def get_course_contributors(
"user": UserRead.model_validate(user).model_dump()
}
for contributor, user in results
]
]
async def add_bulk_course_contributors(
request: Request,
course_uuid: str,
usernames: List[str],
current_user: PublicUser | AnonymousUser,
db_session: Session,
):
"""
Add multiple contributors to a course by their usernames
Only administrators can perform this action
"""
# Verify user is not anonymous
await authorization_verify_if_user_is_anon(current_user.id)
# RBAC check - verify if user has admin rights
authorized = await authorization_verify_based_on_roles_and_authorship(
request, current_user.id, "update", course_uuid, db_session
)
if not authorized:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not authorized to add contributors",
)
# Check if course exists
statement = select(Course).where(Course.course_uuid == course_uuid)
course = db_session.exec(statement).first()
if not course:
raise HTTPException(
status_code=404,
detail="Course not found",
)
# Process results
results = {
"successful": [],
"failed": []
}
current_time = str(datetime.now())
for username in usernames:
try:
# Find user by username
user_statement = select(User).where(User.username == username)
user = db_session.exec(user_statement).first()
if not user or user.id is None:
results["failed"].append({
"username": username,
"reason": "User not found or invalid"
})
continue
# Check if user already has any authorship role for this course
existing_authorship = db_session.exec(
select(ResourceAuthor).where(
and_(
ResourceAuthor.resource_uuid == course_uuid,
ResourceAuthor.user_id == user.id
)
)
).first()
if existing_authorship:
results["failed"].append({
"username": username,
"reason": "User already has an authorship role for this course"
})
continue
# Create contributor
resource_author = ResourceAuthor(
resource_uuid=course_uuid,
user_id=user.id,
authorship=ResourceAuthorshipEnum.CONTRIBUTOR,
authorship_status=ResourceAuthorshipStatusEnum.PENDING,
creation_date=current_time,
update_date=current_time,
)
db_session.add(resource_author)
db_session.commit()
db_session.refresh(resource_author)
results["successful"].append({
"username": username,
"user_id": user.id
})
except Exception as e:
results["failed"].append({
"username": username,
"reason": str(e)
})
return results
async def remove_bulk_course_contributors(
request: Request,
course_uuid: str,
usernames: List[str],
current_user: PublicUser | AnonymousUser,
db_session: Session,
):
"""
Remove multiple contributors from a course by their usernames
Only administrators can perform this action
"""
# Verify user is not anonymous
await authorization_verify_if_user_is_anon(current_user.id)
# RBAC check - verify if user has admin rights
authorized = await authorization_verify_based_on_roles_and_authorship(
request, current_user.id, "update", course_uuid, db_session
)
if not authorized:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not authorized to remove contributors",
)
# Check if course exists
statement = select(Course).where(Course.course_uuid == course_uuid)
course = db_session.exec(statement).first()
if not course:
raise HTTPException(
status_code=404,
detail="Course not found",
)
# Process results
results = {
"successful": [],
"failed": []
}
for username in usernames:
try:
# Find user by username
user_statement = select(User).where(User.username == username)
user = db_session.exec(user_statement).first()
if not user or user.id is None:
results["failed"].append({
"username": username,
"reason": "User not found or invalid"
})
continue
# Check if user has any authorship role for this course
existing_authorship = db_session.exec(
select(ResourceAuthor).where(
and_(
ResourceAuthor.resource_uuid == course_uuid,
ResourceAuthor.user_id == user.id
)
)
).first()
if not existing_authorship:
results["failed"].append({
"username": username,
"reason": "User is not a contributor for this course"
})
continue
# Don't allow removing the creator
if existing_authorship.authorship == ResourceAuthorshipEnum.CREATOR:
results["failed"].append({
"username": username,
"reason": "Cannot remove the course creator"
})
continue
# Remove the contributor
db_session.delete(existing_authorship)
db_session.commit()
results["successful"].append({
"username": username,
"user_id": user.id
})
except Exception as e:
results["failed"].append({
"username": username,
"reason": str(e)
})
return results