feat: use requests with auth header for home

This commit is contained in:
swve 2023-05-22 16:51:51 +00:00
parent 3457b57c58
commit 9f28dfe7e8
7 changed files with 83 additions and 40 deletions

View file

@ -10,26 +10,26 @@ from fastapi_jwt_auth import AuthJWT
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
#### JWT Auth ####################################################
class Settings(BaseModel):
authjwt_secret_key: str = "secret"
authjwt_token_location = {"cookies"}
authjwt_token_location = {"cookies", "headers"}
authjwt_cookie_csrf_protect = False
authjwt_access_token_expires = False # (pre-alpha only) # TODO: set to 1 hour
authjwt_cookie_samesite = "none"
authjwt_access_token_expires = False # (pre-alpha only) # TODO: set to 1 hour
authjwt_cookie_samesite = "lax"
authjwt_cookie_secure = True
@AuthJWT.load_config # type: ignore
authjwt_cookie_domain = ".localhost"
@AuthJWT.load_config # type: ignore
def get_config():
return Settings()
#### JWT Auth ####################################################
#### Classes ####################################################
@ -41,10 +41,11 @@ class Token(BaseModel):
class TokenData(BaseModel):
username: str | None = None
#### Classes ####################################################
async def authenticate_user(request: Request,email: str, password: str):
async def authenticate_user(request: Request, email: str, password: str):
user = await security_get_user(request, email)
if not user:
return False
@ -64,29 +65,28 @@ def create_access_token(data: dict, expires_delta: timedelta | None = None):
return encoded_jwt
async def get_current_user(request: Request, Authorize: AuthJWT = Depends()):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
Authorize.jwt_optional()
username = Authorize.get_jwt_subject() or None
token_data = TokenData(username=username) # type: ignore
except JWTError:
raise credentials_exception
if username:
user = await security_get_user(request, email=token_data.username) # type: ignore # treated as an email
if username:
user = await security_get_user(request, email=token_data.username) # type: ignore # treated as an email
if user is None:
raise credentials_exception
return PublicUser(**user.dict())
else:
return AnonymousUser()
async def non_public_endpoint(current_user: PublicUser ):
async def non_public_endpoint(current_user: PublicUser):
if isinstance(current_user, AnonymousUser):
raise HTTPException(status_code=401, detail="Not authenticated")
raise HTTPException(status_code=401, detail="Not authenticated")