Module sanic_security.oauth
Functions
def initialize_oauth(app: sanic.app.Sanic) ‑> None
-
Expand source code
def initialize_oauth(app: Sanic) -> None: """ Attaches session middleware. Args: app (Sanic): Sanic application instance. """ @app.on_response async def session_middleware(request, response): if hasattr(request.ctx, "oauth"): if request.ctx.oauth.get("is_refresh"): del request.ctx.oauth["is_refresh"] oauth_encode(response, request.ctx.oauth) elif request.ctx.oauth.get("revoked"): response.delete_cookie(f"{config.SESSION_PREFIX}_oauth")
Attaches session middleware.
Args
app
:Sanic
- Sanic application instance.
async def oauth_callback(request: sanic.request.types.Request,
client: httpx_oauth.oauth2.BaseOAuth2,
redirect_uri: str = None,
code_verifier: str = None,
link: bool = True) ‑> tuple[dict, AuthenticationSession]-
Expand source code
async def oauth_callback( request: Request, client: BaseOAuth2, redirect_uri: str = config.OAUTH_REDIRECT, code_verifier: str = None, link: bool = True, ) -> tuple[dict, AuthenticationSession]: """ Requests an access token using the authorization code obtained after the user has authorized the application. An account is retrieved if it already exists, created if it doesn't, and the user is logged in. Args: request (Request): Sanic request parameter. client (BaseOAuth2): OAuth provider. redirect_uri (str): The URL where the user was redirected after authorization. code_verifier (str): Optional code verifier used in the [PKCE](https://datatracker.ietf.org/doc/html/rfc7636)) flow. link (bool): Determines if client is logged into account on OAuth flow completion. Raises: CredentialsError OAuthError Returns: token_info, authentication_session """ try: token_info = await client.get_access_token( request.args.get("code"), redirect_uri, code_verifier, ) if "expires_at" not in token_info: token_info["expires_at"] = time.time() + token_info["expires_in"] authentication_session = None if link: oauth_id, email = await client.get_id_email(token_info["access_token"]) try: account = await Account.get(oauth_id=oauth_id) except DoesNotExist: account = await Account.create( email=email, username=email.split("@")[0], password="", oauth_id=oauth_id, verified=True, ) authentication_session = await AuthenticationSession.new(request, account) logger.info( f"Client {get_ip(request)} has logged in via {client.__class__.__name__} with authentication session {authentication_session.id if link else None}." ) return token_info, authentication_session except IntegrityError: raise CredentialsError( "Account may not be linked to this OAuth provider if it already exists.", 409, ) except GetAccessTokenError as e: raise OAuthError(f"Failed to retrieve access token: {e.response.text}") except GetIdEmailError as e: raise OAuthError(f"Failed to retrieve id and email: {e.response.text}")
Requests an access token using the authorization code obtained after the user has authorized the application. An account is retrieved if it already exists, created if it doesn't, and the user is logged in.
Args
request
:Request
- Sanic request parameter.
client
:BaseOAuth2
- OAuth provider.
redirect_uri
:str
- The URL where the user was redirected after authorization.
code_verifier
:str
- Optional code verifier used in the PKCE) flow.
link
:bool
- Determines if client is logged into account on OAuth flow completion.
Raises
CredentialsError OAuthError
Returns
token_info, authentication_session
async def oauth_decode(request: sanic.request.types.Request,
client: httpx_oauth.oauth2.BaseOAuth2,
refresh=True) ‑> dict-
Expand source code
async def oauth_decode(request: Request, client: BaseOAuth2, refresh=True) -> dict: """ Decodes JWT from client cookie into token info. Args: request (Request): Sanic request parameter. client (BaseOAuth2): OAuth provider. refresh (bool): Determines that the decoded access token is refreshed during expiration. Raises: OAuthError Returns: token_info """ try: token_info = jwt.decode( request.cookies.get( f"{config.SESSION_PREFIX}_oauth", ), config.PUBLIC_SECRET or config.SECRET, config.SESSION_ENCODING_ALGORITHM, ) if refresh and time.time() > token_info["expires_at"]: token_info = await client.refresh_token(token_info.get("refresh_token")) token_info["is_refresh"] = True if "expires_at" not in token_info: token_info["expires_at"] = time.time() + token_info["expires_in"] request.ctx.oauth = token_info return token_info except RefreshTokenError as e: raise OAuthError(f"Failed to refresh access token: {e.response.text}") except DecodeError: raise OAuthError(f"Access token invalid, not provided, or expired.", 400)
Decodes JWT from client cookie into token info.
Args
request
:Request
- Sanic request parameter.
client
:BaseOAuth2
- OAuth provider.
refresh
:bool
- Determines that the decoded access token is refreshed during expiration.
Raises
OAuthError
Returns
token_info
def oauth_encode(response: sanic.response.types.HTTPResponse, token_info: dict) ‑> None
-
Expand source code
def oauth_encode(response: HTTPResponse, token_info: dict) -> None: """ Transforms token info into JWT and then is stored in a cookie. Args: response (HTTPResponse): Sanic response used to store JWT into a cookie on the client. token_info (dict): OAuth access token. """ response.cookies.add_cookie( f"{config.SESSION_PREFIX}_oauth", str( jwt.encode( token_info, config.SECRET, config.SESSION_ENCODING_ALGORITHM, ), ), httponly=config.SESSION_HTTPONLY, samesite=config.SESSION_SAMESITE, secure=config.SESSION_SECURE, domain=config.SESSION_DOMAIN, max_age=token_info["expires_in"] + config.AUTHENTICATION_REFRESH_EXPIRATION, )
Transforms token info into JWT and then is stored in a cookie.
Args
response
:HTTPResponse
- Sanic response used to store JWT into a cookie on the client.
token_info
:dict
- OAuth access token.
async def oauth_revoke(request: sanic.request.types.Request, client: httpx_oauth.oauth2.BaseOAuth2) ‑> dict
-
Expand source code
async def oauth_revoke(request: Request, client: BaseOAuth2) -> dict: """ Revokes the client's access token. Args: request (Request): Sanic request parameter. client (BaseOAuth2): OAuth provider. Raises: OAuthError """ if request.cookies.get(f"{config.SESSION_PREFIX}_oauth"): try: token_info = await oauth_decode(request, client, False) request.ctx.oauth["revoked"] = True with suppress(RevokeTokenNotSupportedError): await client.revoke_token( token_info.get("access_token"), "access_token" ) return token_info except RevokeTokenError as e: raise OAuthError(f"Failed to revoke access token {e.response.text}")
Revokes the client's access token.
Args
request
:Request
- Sanic request parameter.
client
:BaseOAuth2
- OAuth provider.
Raises
OAuthError
def requires_oauth(client: httpx_oauth.oauth2.BaseOAuth2)
-
Expand source code
def requires_oauth(client: BaseOAuth2): """ Decodes JWT token from client cookie into an access token. Args: client (BaseOAuth2): OAuth provider. Example: This method is not called directly and instead used as a decorator: @app.get('api/oauth') @requires_oauth async def on_oauth(request): return text('Access token retrieved!') Raises: OAuthError """ def decorator(func): @functools.wraps(func) async def wrapper(request, *args, **kwargs): await oauth_decode(request, client) return await func(request, *args, **kwargs) return wrapper return decorator
Decodes JWT token from client cookie into an access token.
Args
client
:BaseOAuth2
- OAuth provider.
Example
This method is not called directly and instead used as a decorator:
@app.get('api/oauth') @requires_oauth async def on_oauth(request): return text('Access token retrieved!')
Raises
OAuthError