Coverage for app/core/security.py: 60%

34 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-02 23:27 +0000

1from fastapi import Request, Depends 

2from jose import jwt, JWTError 

3from starlette.status import HTTP_401_UNAUTHORIZED 

4from sqlalchemy.orm import Session 

5from app.core.exceptions import ( 

6 ForbiddenActionError, 

7 MissingCredentialsError, 

8 UnauthorizedError, 

9) 

10from app.db.supabaseDB import get_db 

11from app.db.models import UserCampaignRole 

12import os 

13 

14SUPABASE_JWT_SECRET = os.getenv("SUPABASE_JWT_SECRET") 

15SUPABASE_URL = os.getenv("SUPABASE_URL") 

16 

17if not SUPABASE_JWT_SECRET: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true

18 raise MissingCredentialsError() 

19 

20 

21def get_current_user_id(request: Request): 

22 token = request.headers.get("Authorization") 

23 

24 if not token or not token.startswith("Bearer "): 

25 raise UnauthorizedError("No JWT provided") 

26 

27 try: 

28 payload = jwt.decode( 

29 token[7:], 

30 SUPABASE_JWT_SECRET, 

31 algorithms=["HS256"], 

32 audience="authenticated", 

33 issuer=f"{SUPABASE_URL}/auth/v1", 

34 ) 

35 return payload["sub"] # Supabase puts UUID in `sub` 

36 

37 except JWTError as e: 

38 raise UnauthorizedError(f"Invalid token: {e}") 

39 

40 

41def get_user_role_for_campaign( 

42 campaign_id: int, user_id: str, db: Session = Depends(get_db) 

43): 

44 role = ( 

45 db.query(UserCampaignRole) 

46 .filter_by(user_id=user_id, campaign_id=campaign_id) 

47 .first() 

48 ) 

49 

50 if not role: 

51 raise ForbiddenActionError("You do not belong to this campaign.") 

52 

53 return role.role 

54 

55 

56def require_role(campaign_id_param: str, allowed_roles: list[str]): 

57 def dependency( 

58 request: Request, 

59 db: Session = Depends(get_db), 

60 user_id: str = Depends(get_current_user_id), 

61 ): 

62 campaign_id = request.path_params.get(campaign_id_param) 

63 role = get_user_role_for_campaign(int(campaign_id), user_id, db) 

64 

65 if role not in allowed_roles: 

66 raise ForbiddenActionError(f"Role: {role} is not allowed.") 

67 

68 return {"user_id": user_id, "role": role} 

69 

70 return dependency