Module sanic_security.authentication
Functions
async def authenticate(request: sanic.request.types.Request) ‑> AuthenticationSession
-
Expand source code
async def authenticate(request: Request) -> AuthenticationSession: """ Validates client's authentication session and account. New/Refreshed session automatically returned if client's session expired during authentication. Args: request (Request): Sanic request parameter. Returns: authentication_session Raises: NotFoundError JWTDecodeError DeletedError DeactivatedError UnverifiedError DisabledError SecondFactorRequiredError ExpiredError CredentialsError """ authentication_jwt = AuthenticationSession.decode_raw(request) if not await AuthenticationSession.filter( bearer=authentication_jwt["bearer"], ip=get_ip(request), user_agent=request.headers.get("user-agent"), active=True, deleted=False, ).exists(): logger.warning( f"Unrecognized client {get_ip(request)} attempted to utilize authentication session {authentication_jwt["id"]}." ) raise CredentialsError("Client is unrecognized.") authentication_session = await AuthenticationSession.decode( request, authentication_jwt ) try: authentication_session.validate() if not authentication_session.anonymous: authentication_session.bearer.validate() except ExpiredError: authentication_session = await authentication_session.refresh(request) authentication_session.is_refresh = True request.ctx.session = authentication_session return authentication_session
Validates client's authentication session and account. New/Refreshed session automatically returned if client's session expired during authentication.
Args
request
:Request
- Sanic request parameter.
Returns
authentication_session
Raises
NotFoundError JWTDecodeError DeletedError DeactivatedError UnverifiedError DisabledError SecondFactorRequiredError ExpiredError CredentialsError
async def fulfill_second_factor(request: sanic.request.types.Request) ‑> AuthenticationSession
-
Expand source code
async def fulfill_second_factor(request: Request) -> AuthenticationSession: """ Fulfills client authentication session's second factor requirement via two-step session code. Args: request (Request): Sanic request parameter. Request body should contain form-data with the following argument(s): code. Raises: NotFoundError JWTDecodeError DeletedError ExpiredError DeactivatedError ChallengeError MaxedOutChallengeError Returns: authentication_session """ authentication_session = await AuthenticationSession.decode(request) if not authentication_session.requires_second_factor: raise DeactivatedError("Session second factor requirement already met.", 403) two_step_session = await TwoStepSession.decode(request, tag="2fa") two_step_session.validate() await two_step_session.check_code(request.form.get("code")) authentication_session.requires_second_factor = False await authentication_session.save(update_fields=["requires_second_factor"]) logger.info( f"Client {get_ip(request)} has fulfilled authentication session {authentication_session.id} second factor." ) return authentication_session
Fulfills client authentication session's second factor requirement via two-step session code.
Args
request
:Request
- Sanic request parameter. Request body should contain form-data with the following argument(s): code.
Raises
NotFoundError JWTDecodeError DeletedError ExpiredError DeactivatedError ChallengeError MaxedOutChallengeError
Returns
authentication_session
def initialize_security(app: sanic.app.Sanic, create_root: bool = True) ‑> None
-
Expand source code
def initialize_security(app: Sanic, create_root: bool = True) -> None: """ Audits configuration, creates root administrator account, and attaches session middleware. Args: app (Sanic): Sanic application instance. create_root (bool): Determines root account creation on initialization. """ @app.listener("before_server_start") async def audit_configuration(app, loop): if config.SECRET == DEFAULT_CONFIG["SECRET"]: warnings.warn("Secret should be changed from default.", AuditWarning, 2) if not config.SESSION_HTTPONLY: warnings.warn("HttpOnly should be enabled.", AuditWarning, 2) if not config.SESSION_SECURE: warnings.warn("Secure should be enabled.", AuditWarning, 2) if not config.SESSION_SAMESITE or config.SESSION_SAMESITE.lower() == "none": warnings.warn("SameSite should not be none.", AuditWarning, 2) if not config.SESSION_DOMAIN: warnings.warn("Domain should not be none.", AuditWarning, 2) if ( create_root and config.INITIAL_ADMIN_EMAIL == DEFAULT_CONFIG["INITIAL_ADMIN_EMAIL"] ): warnings.warn( "Initial admin email should be changed from default.", AuditWarning, 2 ) if ( create_root and config.INITIAL_ADMIN_PASSWORD == DEFAULT_CONFIG["INITIAL_ADMIN_PASSWORD"] ): warnings.warn( "Initial admin password should be changed from default.", AuditWarning, 2, ) @app.listener("before_server_start") async def create_root_account(app, loop): if not create_root: return try: role = await Role.filter(name="Root").get() except DoesNotExist: role = await Role.create( description="Has administrator abilities, assign sparingly.", permissions=["*:*"], name="Root", ) try: account = await Account.filter(email=config.INITIAL_ADMIN_EMAIL).get() await account.fetch_related("roles") if role not in account.roles: await account.roles.add(role) logger.warning("Initial admin account role has been reinstated.") except DoesNotExist: account = await Account.create( username="Root", email=config.INITIAL_ADMIN_EMAIL, password=password_hasher.hash(config.INITIAL_ADMIN_PASSWORD), verified=True, ) await account.roles.add(role) logger.info("Initial admin account created.") @app.on_response async def session_middleware(request, response): if hasattr(request.ctx, "session"): if getattr(request.ctx.session, "is_refresh", False): request.ctx.session.encode(response) elif not request.ctx.session.active: response.delete_cookie( f"{config.SESSION_PREFIX}_{request.ctx.session.__class__.__name__[:7].lower()}" )
Audits configuration, creates root administrator account, and attaches session middleware.
Args
app
:Sanic
- Sanic application instance.
create_root
:bool
- Determines root account creation on initialization.
async def login(request: sanic.request.types.Request,
*,
require_second_factor: bool = False,
email: str = None,
password: str = None) ‑> AuthenticationSession-
Expand source code
async def login( request: Request, *, require_second_factor: bool = False, email: str = None, password: str = None, ) -> AuthenticationSession: """ Login with email or username (if enabled) and password. Args: request (Request): Sanic request parameter, login credentials are retrieved via the authorization header. require_second_factor (bool): Determines authentication session second factor requirement on login. email (str): Email (or username) of account being logged into, overrides account retrieved via authorization header. password (str): Overrides user's password attempt retrieved via the authorization header. Returns: authentication_session Raises: CredentialsError NotFoundError DeletedError UnverifiedError DisabledError """ if not email: account, password = await Account.get_via_header(request) elif not password: raise CredentialsError("Password parameter is empty.") else: account = await Account.get_via_credential(email) try: password_hasher.verify(account.password, password) if password_hasher.check_needs_rehash(account.password): account.password = password_hasher.hash(password) await account.save(update_fields=["password"]) account.validate() authentication_session = await AuthenticationSession.new( request, account, requires_second_factor=require_second_factor ) logger.info( f"Client {get_ip(request)} has logged in with authentication session {authentication_session.id}." ) return authentication_session except (VerificationError, InvalidHashError): logger.warning( f"Client {get_ip(request)} has failed to log into account {account.id}." ) raise CredentialsError("Incorrect password.", 401)
Login with email or username (if enabled) and password.
Args
request
:Request
- Sanic request parameter, login credentials are retrieved via the authorization header.
require_second_factor
:bool
- Determines authentication session second factor requirement on login.
email
:str
- Email (or username) of account being logged into, overrides account retrieved via authorization header.
password
:str
- Overrides user's password attempt retrieved via the authorization header.
Returns
authentication_session
Raises
CredentialsError NotFoundError DeletedError UnverifiedError DisabledError
async def logout(request: sanic.request.types.Request) ‑> AuthenticationSession
-
Expand source code
async def logout(request: Request) -> AuthenticationSession: """ Deactivates client's authentication session. Args: request (Request): Sanic request parameter. Raises: NotFoundError JWTDecodeError DeactivatedError Returns: authentication_session """ authentication_session = await AuthenticationSession.decode(request) if not authentication_session.active: raise DeactivatedError("Already logged out.", 403) authentication_session.active = False await authentication_session.save(update_fields=["active"]) logger.info( f"Client {get_ip(request)} has logged out with authentication session {authentication_session.id}." ) return authentication_session
Deactivates client's authentication session.
Args
request
:Request
- Sanic request parameter.
Raises
NotFoundError JWTDecodeError DeactivatedError
Returns
authentication_session
async def register(request: sanic.request.types.Request,
verified: bool = False,
disabled: bool = False) ‑> Account-
Expand source code
async def register( request: Request, verified: bool = False, disabled: bool = False ) -> Account: """ Registers a new account that can be logged into. Args: request (Request): Sanic request parameter. Request body should contain form-data with the following argument(s): email, username, password, phone (including country code). verified (bool): Sets the verification requirement for the account being registered. disabled (bool): Renders the account being registered unusable until manual activation. Returns: account Raises: CredentialsError """ try: account = await Account.create( email=request.form.get("email").lower(), username=request.form.get("username"), password=password_hasher.hash( validate_password(request.form.get("password")) ), phone=request.form.get("phone"), verified=verified, disabled=disabled, ) logger.info(f"Client {get_ip(request)} has registered account {account.id}.") return account except ValidationError as e: raise CredentialsError( "Username must be 3-32 characters long." if "username" in e.args[0] else "Invalid email or phone number." ) except IntegrityError as e: raise CredentialsError( f"An account with this {"username" if "username" in str(e.args[0]) else "email or phone number"} " "may already exist.", 409, )
Registers a new account that can be logged into.
Args
request
:Request
- Sanic request parameter. Request body should contain form-data with the following argument(s): email, username, password, phone (including country code).
verified
:bool
- Sets the verification requirement for the account being registered.
disabled
:bool
- Renders the account being registered unusable until manual activation.
Returns
account
Raises
CredentialsError
def requires_authentication(arg=None)
-
Expand source code
def requires_authentication(arg=None): """ Validates client's authentication session and account. New/Refreshed session automatically returned if client's session expired during authentication. Example: This method is not called directly and instead used as a decorator: @app.post('api/authenticate') @requires_authentication async def on_authenticate(request): return text('User is authenticated!') Raises: NotFoundError JWTDecodeError DeletedError DeactivatedError UnverifiedError DisabledError SecondFactorRequiredError ExpiredError """ def decorator(func): @functools.wraps(func) async def wrapper(request, *args, **kwargs): await authenticate(request) return await func(request, *args, **kwargs) return wrapper return decorator(arg) if callable(arg) else decorator
Validates client's authentication session and account. New/Refreshed session automatically returned if client's session expired during authentication.
Example
This method is not called directly and instead used as a decorator:
@app.post('api/authenticate') @requires_authentication async def on_authenticate(request): return text('User is authenticated!')
Raises
NotFoundError JWTDecodeError DeletedError DeactivatedError UnverifiedError DisabledError SecondFactorRequiredError ExpiredError
def validate_password(password: str) ‑> str
-
Expand source code
def validate_password(password: str) -> str: """ Validates password formatting requirements. Args: password (str): Password being validated. Returns: password Raises: CredentialsError """ if not re.search(r"^(?=.*[A-Z])(?=.*\d)(?=.*[@#$%^&+=!]).*$", password): raise CredentialsError( "Password must contain one capital letter, one number, and one special character", 400, ) return password
Validates password formatting requirements.
Args
password
:str
- Password being validated.
Returns
password
Raises
CredentialsError