Module sanic_security.models
Classes
class Account (**kwargs: Any)-
Expand source code
class Account(BaseModel): """ Contains all identifiable user information. Attributes: username (str): Public identifier. email (str): Private identifier and can be used for verification. phone (str): Mobile phone number with country code included and can be used for verification. Can be null or empty. password (str): Password of account for user protection, must be hashed via Argon2. oauth_id (str): Identifier associated with an OAuth authorization flow. disabled (bool): Renders the account unusable but available. verified (bool): Renders the account unusable until verified via two-step verification or other method. roles (ManyToManyRelation[Role]): Roles associated with this account. """ username: str = fields.CharField( unique=config.ALLOW_LOGIN_WITH_USERNAME, max_length=32, ) email: str = fields.CharField( unique=True, max_length=255, validators=[ RegexValidator(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$", re.I) ], ) phone: str = fields.CharField( unique=True, max_length=20, null=True, validators=[ RegexValidator(r"^(\+\d{1,2}\s)?\(?\d{3}\)?[\s.-]?\d{3}[\s.-]?\d{4}$", re.I) ], ) password: str = fields.CharField(max_length=255) oauth_id: str = fields.CharField(unique=True, null=True, max_length=255) disabled: bool = fields.BooleanField(default=False) verified: bool = fields.BooleanField(default=False) roles: fields.ManyToManyRelation["Role"] = fields.ManyToManyField( "models.Role", through="account_role" ) def validate(self) -> None: """ Raises an error with respect to account state. Raises: DeletedError UnverifiedError DisabledError """ if self.deleted: raise DeletedError("Account has been deleted.") elif not self.verified: raise UnverifiedError elif self.disabled: raise DisabledError async def disable(self): """ Renders account unusable. Raises: DisabledError """ if self.disabled: raise DisabledError("Account is already disabled.") else: self.disabled = True await self.save(update_fields=["disabled"]) @property def json(self) -> dict: return { "id": self.id, "date_created": str(self.date_created), "date_updated": str(self.date_updated), "email": self.email, "username": self.username, "phone": self.phone, "disabled": self.disabled, "verified": self.verified, } @staticmethod async def get_via_email(email: str): """ Retrieve an account with an email. Args: email (str): Email associated to account being retrieved. Returns: account Raises: NotFoundError """ try: return await Account.filter(email=email.lower(), deleted=False).get() except (DoesNotExist, ValidationError): raise NotFoundError("Account with this email does not exist.") @staticmethod async def get_via_username(username: str): """ Retrieve an account with a username. Args: username (str): Username associated to account being retrieved. Returns: account Raises: NotFoundError """ try: return await Account.filter(username=username, deleted=False).get() except (DoesNotExist, ValidationError): raise NotFoundError("Account with this username does not exist.") @staticmethod async def get_via_credential(credential: str): """ Retrieve an account with an email or username. Args: credential (str): Email or username associated to account being retrieved. Returns: account Raises: NotFoundError """ try: account = await Account.get_via_email(credential) except NotFoundError as e: if config.ALLOW_LOGIN_WITH_USERNAME: account = await Account.get_via_username(credential) else: raise e return account @staticmethod async def get_via_header(request: Request): """ Retrieve an account via the basic authorization header. Args: request (Request): Sanic request parameter. Returns: account, password Raises: NotFoundError """ if request.headers.get("Authorization"): authorization_type, credentials = request.headers.get( "Authorization" ).split() if authorization_type == "Basic": email_or_username, password = ( base64.b64decode(credentials).decode().split(":") ) account = await Account.get_via_credential(email_or_username) return account, password else: raise CredentialsError("Invalid authorization type.") else: raise CredentialsError("Authorization header not provided.") @staticmethod async def get_via_phone(phone: str): """ Retrieve an account via a phone number. Args: phone (str): Phone number associated to account being retrieved. Returns: account Raises: NotFoundError """ try: return await Account.filter(phone=phone, deleted=False).get() except (DoesNotExist, ValidationError): raise NotFoundError("Account with this phone number does not exist.")Contains all identifiable user information.
Attributes
username:str- Public identifier.
email:str- Private identifier and can be used for verification.
phone:str- Mobile phone number with country code included and can be used for verification. Can be null or empty.
password:str- Password of account for user protection, must be hashed via Argon2.
oauth_id:str- Identifier associated with an OAuth authorization flow.
disabled:bool- Renders the account unusable but available.
verified:bool- Renders the account unusable until verified via two-step verification or other method.
roles:ManyToManyRelation[Role]- Roles associated with this account.
Ancestors
- BaseModel
- tortoise.models.Model
Class variables
var disabled : boolvar email : strvar oauth_id : strvar password : strvar phone : strvar roles : tortoise.fields.relational.ManyToManyRelation[Role]var username : strvar verified : bool
Static methods
async def get_via_credential(credential: str)-
Expand source code
@staticmethod async def get_via_credential(credential: str): """ Retrieve an account with an email or username. Args: credential (str): Email or username associated to account being retrieved. Returns: account Raises: NotFoundError """ try: account = await Account.get_via_email(credential) except NotFoundError as e: if config.ALLOW_LOGIN_WITH_USERNAME: account = await Account.get_via_username(credential) else: raise e return accountRetrieve an account with an email or username.
Args
credential:str- Email or username associated to account being retrieved.
Returns
account
Raises
NotFoundError
async def get_via_email(email: str)-
Expand source code
@staticmethod async def get_via_email(email: str): """ Retrieve an account with an email. Args: email (str): Email associated to account being retrieved. Returns: account Raises: NotFoundError """ try: return await Account.filter(email=email.lower(), deleted=False).get() except (DoesNotExist, ValidationError): raise NotFoundError("Account with this email does not exist.")Retrieve an account with an email.
Args
email:str- Email associated to account being retrieved.
Returns
account
Raises
NotFoundError
async def get_via_header(request: sanic.request.types.Request)-
Expand source code
@staticmethod async def get_via_header(request: Request): """ Retrieve an account via the basic authorization header. Args: request (Request): Sanic request parameter. Returns: account, password Raises: NotFoundError """ if request.headers.get("Authorization"): authorization_type, credentials = request.headers.get( "Authorization" ).split() if authorization_type == "Basic": email_or_username, password = ( base64.b64decode(credentials).decode().split(":") ) account = await Account.get_via_credential(email_or_username) return account, password else: raise CredentialsError("Invalid authorization type.") else: raise CredentialsError("Authorization header not provided.")Retrieve an account via the basic authorization header.
Args
request:Request- Sanic request parameter.
Returns
account, password
Raises
NotFoundError
async def get_via_phone(phone: str)-
Expand source code
@staticmethod async def get_via_phone(phone: str): """ Retrieve an account via a phone number. Args: phone (str): Phone number associated to account being retrieved. Returns: account Raises: NotFoundError """ try: return await Account.filter(phone=phone, deleted=False).get() except (DoesNotExist, ValidationError): raise NotFoundError("Account with this phone number does not exist.")Retrieve an account via a phone number.
Args
phone:str- Phone number associated to account being retrieved.
Returns
account
Raises
NotFoundError
async def get_via_username(username: str)-
Expand source code
@staticmethod async def get_via_username(username: str): """ Retrieve an account with a username. Args: username (str): Username associated to account being retrieved. Returns: account Raises: NotFoundError """ try: return await Account.filter(username=username, deleted=False).get() except (DoesNotExist, ValidationError): raise NotFoundError("Account with this username does not exist.")Retrieve an account with a username.
Args
username:str- Username associated to account being retrieved.
Returns
account
Raises
NotFoundError
Methods
async def disable(self)-
Expand source code
async def disable(self): """ Renders account unusable. Raises: DisabledError """ if self.disabled: raise DisabledError("Account is already disabled.") else: self.disabled = True await self.save(update_fields=["disabled"])Renders account unusable.
Raises
DisabledError
def validate(self) ‑> None-
Expand source code
def validate(self) -> None: """ Raises an error with respect to account state. Raises: DeletedError UnverifiedError DisabledError """ if self.deleted: raise DeletedError("Account has been deleted.") elif not self.verified: raise UnverifiedError elif self.disabled: raise DisabledErrorRaises an error with respect to account state.
Raises
DeletedError UnverifiedError DisabledError
Inherited members
class AuthenticationSession (**kwargs)-
Expand source code
class AuthenticationSession(Session): """ Used to authenticate and identify a client. Attributes: refresh_expiration_date (datetime): Date and time the session can no longer be refreshed. requires_second_factor (bool): Determines if session requires a second factor. user_agent (bool): Identifies client application, operating system, vendor, and/or version. is_refresh (bool): Will only be true once when instantiated during the refresh of expired session. """ refresh_expiration_date: datetime.datetime = fields.DatetimeField(null=True) requires_second_factor: bool = fields.BooleanField(default=False) user_agent: str = fields.CharField(max_length=255, null=True) is_refresh: bool = False def validate(self) -> None: """ Raises an error with respect to session state. Raises: DeletedError ExpiredError DeactivatedError SecondFactorRequiredError """ super().validate() if self.requires_second_factor: raise SecondFactorRequiredError async def refresh(self, request: Request): """ Refreshes session if within refresh date. Args: request (Request): Sanic request parameter. Raises: ExpiredError Returns: session """ if self.active and not is_expired(self.refresh_expiration_date): await self.deactivate() logging.info( f"Client {get_ip(request)} has refreshed authentication session {self.id}." ) return await self.new(request, self.bearer) else: raise ExpiredError @classmethod async def new( cls, request: Request, account: Account = None, **kwargs: Union[int, str, bool, float, list, dict], ): authentication_session = await cls.create( **kwargs, bearer=account, ip=get_ip(request), user_agent=request.headers.get("user-agent"), expiration_date=get_expiration_date( config.AUTHENTICATION_SESSION_EXPIRATION ), refresh_expiration_date=get_expiration_date( config.AUTHENTICATION_REFRESH_EXPIRATION ), ) return authentication_session class Meta: table = "authentication_session"Used to authenticate and identify a client.
Attributes
refresh_expiration_date:datetime- Date and time the session can no longer be refreshed.
requires_second_factor:bool- Determines if session requires a second factor.
user_agent:bool- Identifies client application, operating system, vendor, and/or version.
is_refresh:bool- Will only be true once when instantiated during the refresh of expired session.
Ancestors
Class variables
var Metavar is_refresh : boolvar refresh_expiration_date : datetime.datetimevar requires_second_factor : boolvar user_agent : str
Methods
async def refresh(self, request: sanic.request.types.Request)-
Expand source code
async def refresh(self, request: Request): """ Refreshes session if within refresh date. Args: request (Request): Sanic request parameter. Raises: ExpiredError Returns: session """ if self.active and not is_expired(self.refresh_expiration_date): await self.deactivate() logging.info( f"Client {get_ip(request)} has refreshed authentication session {self.id}." ) return await self.new(request, self.bearer) else: raise ExpiredErrorRefreshes session if within refresh date.
Args
request:Request- Sanic request parameter.
Raises
ExpiredError
Returns
session
def validate(self) ‑> None-
Expand source code
def validate(self) -> None: """ Raises an error with respect to session state. Raises: DeletedError ExpiredError DeactivatedError SecondFactorRequiredError """ super().validate() if self.requires_second_factor: raise SecondFactorRequiredErrorRaises an error with respect to session state.
Raises
DeletedError ExpiredError DeactivatedError SecondFactorRequiredError
Inherited members
class BaseModel (**kwargs: Any)-
Expand source code
class BaseModel(Model): """ Base Sanic Security model that all other models derive from. Attributes: id (str): Primary key of model. date_created (datetime): Time this model was created in the database. date_updated (datetime): Time this model was updated in the database. deleted (bool): Renders the model filterable without removing from the database. """ id: str = fields.CharField( pk=True, max_length=36, default=lambda: str(uuid.uuid4()) ) date_created: datetime.datetime = fields.DatetimeField(auto_now_add=True) date_updated: datetime.datetime = fields.DatetimeField(auto_now=True) deleted: bool = fields.BooleanField(default=False) def validate(self) -> None: """ Raises an error with respect to model's state. Raises: SecurityError """ raise NotImplementedError @property def json(self) -> dict: """ A JSON serializable dict to be used in a request or response. Example: Below is an example of this method returning a dict to be used for JSON serialization. def json(self): return { 'id': id, 'date_created': str(self.date_created), 'date_updated': str(self.date_updated), 'email': self.email, 'username': self.username, 'disabled': self.disabled, 'verified': self.verified } """ raise NotImplementedError class Meta: abstract = TrueBase Sanic Security model that all other models derive from.
Attributes
id:str- Primary key of model.
date_created:datetime- Time this model was created in the database.
date_updated:datetime- Time this model was updated in the database.
deleted:bool- Renders the model filterable without removing from the database.
Ancestors
- tortoise.models.Model
Subclasses
Class variables
var Metavar date_created : datetime.datetimevar date_updated : datetime.datetimevar deleted : boolvar id : str
Instance variables
prop json : dict-
Expand source code
@property def json(self) -> dict: """ A JSON serializable dict to be used in a request or response. Example: Below is an example of this method returning a dict to be used for JSON serialization. def json(self): return { 'id': id, 'date_created': str(self.date_created), 'date_updated': str(self.date_updated), 'email': self.email, 'username': self.username, 'disabled': self.disabled, 'verified': self.verified } """ raise NotImplementedErrorA JSON serializable dict to be used in a request or response.
Example
Below is an example of this method returning a dict to be used for JSON serialization.
def json(self): return { 'id': id, 'date_created': str(self.date_created), 'date_updated': str(self.date_updated), 'email': self.email, 'username': self.username, 'disabled': self.disabled, 'verified': self.verified }
Methods
def validate(self) ‑> None-
Expand source code
def validate(self) -> None: """ Raises an error with respect to model's state. Raises: SecurityError """ raise NotImplementedErrorRaises an error with respect to model's state.
Raises
SecurityError
class CaptchaSession (**kwargs)-
Expand source code
class CaptchaSession(VerificationSession): """Validates client with a captcha challenge via image or audio.""" @classmethod async def new( cls, request: Request, **kwargs: Union[int, str, bool, float, list, dict], ): return await cls.create( **kwargs, ip=get_ip(request), code=get_code(), expiration_date=get_expiration_date(config.CAPTCHA_SESSION_EXPIRATION), ) def get_image(self) -> bytes: """ Retrieves captcha image data. Returns: captcha_image """ return image_generator.generate(self.code, "jpeg").getvalue() def get_audio(self) -> bytes: """ Retrieves captcha audio data. Returns: captcha_audio """ return bytes(audio_generator.generate(self.code)) class Meta: table = "captcha_session"Validates client with a captcha challenge via image or audio.
Ancestors
- VerificationSession
- Session
- BaseModel
- tortoise.models.Model
Class variables
var Meta
Methods
def get_audio(self) ‑> bytes-
Expand source code
def get_audio(self) -> bytes: """ Retrieves captcha audio data. Returns: captcha_audio """ return bytes(audio_generator.generate(self.code))Retrieves captcha audio data.
Returns
captcha_audio
def get_image(self) ‑> bytes-
Expand source code
def get_image(self) -> bytes: """ Retrieves captcha image data. Returns: captcha_image """ return image_generator.generate(self.code, "jpeg").getvalue()Retrieves captcha image data.
Returns
captcha_image
Inherited members
class Role (**kwargs: Any)-
Expand source code
class Role(BaseModel): """ Assigned to an account to authorize an action. Attributes: name (str): Name of the role. description (str): Description of the role. permissions (list[str]): Permissions of the role, must in wildcard format (printer:query, dashboard:info,delete). """ name: str = fields.CharField(unique=True, max_length=255) description: str = fields.CharField(max_length=255, null=True) permissions: list[str] = fields.JSONField(null=True) def validate(self) -> None: raise NotImplementedError @property def json(self) -> dict: return { "id": self.id, "date_created": str(self.date_created), "date_updated": str(self.date_updated), "name": self.name, "description": self.description, "permissions": self.permissions, }Assigned to an account to authorize an action.
Attributes
name:str- Name of the role.
description:str- Description of the role.
permissions:list[str]- Permissions of the role, must in wildcard format (printer:query, dashboard:info,delete).
Ancestors
- BaseModel
- tortoise.models.Model
Class variables
var description : strvar name : strvar permissions : list[str]
Inherited members
class Session (**kwargs)-
Expand source code
class Session(BaseModel): """ Used for client identification and verification. Base session model that all session models derive from. Attributes: expiration_date (datetime): Date and time the session expires and can no longer be used. active (bool): Determines if the session can be used. ip (str): IP address of client instantiating session. bearer (ForeignKeyRelation[Account]): Account associated with this session. """ expiration_date: datetime.datetime = fields.DatetimeField(null=True) active: bool = fields.BooleanField(default=True) ip: str = fields.CharField(max_length=16) bearer: fields.ForeignKeyRelation["Account"] = fields.ForeignKeyField( "models.Account", null=True ) def __init__(self, **kwargs): super().__init__(**kwargs) def validate(self) -> None: """ Raises an error with respect to session state. Raises: DeletedError ExpiredError DeactivatedError """ if self.deleted: raise DeletedError("Session has been deleted.") elif not self.active: raise DeactivatedError elif is_expired(self.expiration_date): raise ExpiredError async def deactivate(self) -> None: """ Renders session deactivated and therefor unusable. Raises: DeactivatedError """ if self.active: self.active = False await self.save(update_fields=["active"]) else: raise DeactivatedError("Session is already deactivated.", 403) def encode(self, response: HTTPResponse) -> None: """ Transforms session into JWT and then is stored in a cookie. Args: response (HTTPResponse): Sanic response used to store JWT into a cookie on the client. """ encoded_session = jwt.encode( { "id": self.id, "date_created": str(self.date_created), "expiration_date": str(self.expiration_date), "bearer": self.bearer.id if isinstance(self.bearer, Account) else None, "ip": self.ip, }, config.SECRET, config.SESSION_ENCODING_ALGORITHM, ) response.cookies.add_cookie( f"{config.SESSION_PREFIX}_{self.__class__.__name__[:7].lower()}", str(encoded_session), httponly=config.SESSION_HTTPONLY, samesite=config.SESSION_SAMESITE, secure=config.SESSION_SECURE, domain=config.SESSION_DOMAIN, expires=getattr(self, "refresh_expiration_date", self.expiration_date), ) @property def json(self) -> dict: return { "id": self.id, "date_created": str(self.date_created), "date_updated": str(self.date_updated), "expiration_date": str(self.expiration_date), "bearer": self.bearer.id if isinstance(self.bearer, Account) else None, "active": self.active, } @property def anonymous(self) -> bool: """ Determines if an account is associated with session. Returns: anonymous """ return self.bearer is None @classmethod async def new( cls, request: Request, account: Account, **kwargs: dict[str, Union[int, str, bool, float, list, dict]], ): """ Creates session with pre-set values. Args: request (Request): Sanic request parameter. account (Account): Account being associated to the session. **kwargs (Union[int, str, bool, float, list, dict]): Extra arguments applied during session creation. Returns: session """ raise NotImplementedError @classmethod async def get_associated(cls, account: Account): """ Retrieves sessions associated to an account. Args: account (Account): Account associated with sessions being retrieved. Returns: sessions Raises: NotFoundError """ sessions = await cls.filter(bearer=account, deleted=False).all() if not sessions: raise NotFoundError("No sessions associated to account were found.") return sessions @classmethod def decode_raw(cls, request: Request) -> dict: """ Decodes session JWT token from client cookie into a python dict. Args: request (Request): Sanic request parameter. Returns: session_dict Raises: JWTDecodeError """ cookie = request.cookies.get( f"{config.SESSION_PREFIX}_{cls.__name__[:7].lower()}" ) try: if not cookie: raise JWTDecodeError else: return jwt.decode( cookie, config.PUBLIC_SECRET or config.SECRET, config.SESSION_ENCODING_ALGORITHM, ) except DecodeError as e: raise JWTDecodeError(str(e)) @classmethod async def decode( cls, request: Request, raw: dict = None, **kwargs: Union[int, str, bool, float], ): """ Decodes session JWT token from client cookie into a session model. Args: request (Request): Sanic request parameter. raw (Request): Decoded JWT token provided by the client, include only for optimization purposes. **kwargs (Union[int, str, bool, float]): Extra filter arguments applied during session decoding. Returns: session Raises: JWTDecodeError NotFoundError """ try: decoded_raw = raw or cls.decode_raw(request) decoded_session = ( await cls.filter(**kwargs, id=decoded_raw["id"], deleted=False) .prefetch_related("bearer") .get() ) request.ctx.session = decoded_session except DoesNotExist: raise NotFoundError("Session could not be found.") return decoded_session class Meta: abstract = TrueUsed for client identification and verification. Base session model that all session models derive from.
Attributes
expiration_date:datetime- Date and time the session expires and can no longer be used.
active:bool- Determines if the session can be used.
ip:str- IP address of client instantiating session.
bearer:ForeignKeyRelation[Account]- Account associated with this session.
Ancestors
- BaseModel
- tortoise.models.Model
Subclasses
Class variables
var Metavar active : boolvar bearer : tortoise.fields.relational.ForeignKeyFieldInstance[Account]var expiration_date : datetime.datetimevar ip : str
Static methods
async def decode(request: sanic.request.types.Request,
raw: dict = None,
**kwargs: int | str | bool | float)-
Decodes session JWT token from client cookie into a session model.
Args
request:Request- Sanic request parameter.
raw:Request- Decoded JWT token provided by the client, include only for optimization purposes.
**kwargs:Union[int, str, bool, float]- Extra filter arguments applied during session decoding.
Returns
session
Raises
JWTDecodeError NotFoundError
def decode_raw(request: sanic.request.types.Request) ‑> dict-
Decodes session JWT token from client cookie into a python dict.
Args
request:Request- Sanic request parameter.
Returns
session_dict
Raises
JWTDecodeError
async def get_associated(account: Account)-
Retrieves sessions associated to an account.
Args
account:Account- Account associated with sessions being retrieved.
Returns
sessions
Raises
NotFoundError
async def new(request: sanic.request.types.Request,
account: Account,
**kwargs: dict[str, int | str | bool | float | list | dict])-
Creates session with pre-set values.
Args
request:Request- Sanic request parameter.
account:Account- Account being associated to the session.
**kwargs:Union[int, str, bool, float, list, dict]- Extra arguments applied during session creation.
Returns
session
Instance variables
prop anonymous : bool-
Expand source code
@property def anonymous(self) -> bool: """ Determines if an account is associated with session. Returns: anonymous """ return self.bearer is NoneDetermines if an account is associated with session.
Returns
anonymous
Methods
async def deactivate(self) ‑> None-
Expand source code
async def deactivate(self) -> None: """ Renders session deactivated and therefor unusable. Raises: DeactivatedError """ if self.active: self.active = False await self.save(update_fields=["active"]) else: raise DeactivatedError("Session is already deactivated.", 403)Renders session deactivated and therefor unusable.
Raises
DeactivatedError
def encode(self, response: sanic.response.types.HTTPResponse) ‑> None-
Expand source code
def encode(self, response: HTTPResponse) -> None: """ Transforms session into JWT and then is stored in a cookie. Args: response (HTTPResponse): Sanic response used to store JWT into a cookie on the client. """ encoded_session = jwt.encode( { "id": self.id, "date_created": str(self.date_created), "expiration_date": str(self.expiration_date), "bearer": self.bearer.id if isinstance(self.bearer, Account) else None, "ip": self.ip, }, config.SECRET, config.SESSION_ENCODING_ALGORITHM, ) response.cookies.add_cookie( f"{config.SESSION_PREFIX}_{self.__class__.__name__[:7].lower()}", str(encoded_session), httponly=config.SESSION_HTTPONLY, samesite=config.SESSION_SAMESITE, secure=config.SESSION_SECURE, domain=config.SESSION_DOMAIN, expires=getattr(self, "refresh_expiration_date", self.expiration_date), )Transforms session into JWT and then is stored in a cookie.
Args
response:HTTPResponse- Sanic response used to store JWT into a cookie on the client.
def validate(self) ‑> None-
Expand source code
def validate(self) -> None: """ Raises an error with respect to session state. Raises: DeletedError ExpiredError DeactivatedError """ if self.deleted: raise DeletedError("Session has been deleted.") elif not self.active: raise DeactivatedError elif is_expired(self.expiration_date): raise ExpiredErrorRaises an error with respect to session state.
Raises
DeletedError ExpiredError DeactivatedError
Inherited members
class TwoStepSession (**kwargs)-
Expand source code
class TwoStepSession(VerificationSession): """ Validates client using a code sent via email or text. Attributes: tag (str): Label used to distinguish sessions for specific purposes. """ tag: str = fields.CharField(max_length=20) @classmethod async def new( cls, request: Request, account: Account, **kwargs: Union[int, str, bool, float, list, dict], ): return await cls.create( **kwargs, ip=get_ip(request), bearer=account, expiration_date=get_expiration_date(config.TWO_STEP_SESSION_EXPIRATION), code=get_code(True), ) class Meta: table = "two_step_session"Validates client using a code sent via email or text.
Attributes
tag:str- Label used to distinguish sessions for specific purposes.
Ancestors
- VerificationSession
- Session
- BaseModel
- tortoise.models.Model
Class variables
var Metavar tag : str
Inherited members
class VerificationSession (**kwargs)-
Expand source code
class VerificationSession(Session): """ Used for client verification challenges that require some form of code or key. Attributes: attempts (int): The amount of times a user entered a code not equal to this verification sessions code. code (str): A secret key that would be sent via email, text, etc. """ attempts: int = fields.IntField(default=0) code: str = fields.CharField(max_length=6, null=True) async def check_code(self, code: str) -> None: """ Checks if code passed is equivalent to the session code. Args: code (str): Code being cross-checked with session code. Raises: ChallengeError MaxedOutChallengeError """ if not code or self.code != code.upper(): self.attempts += 1 if self.attempts < config.MAX_CHALLENGE_ATTEMPTS: await self.save(update_fields=["attempts"]) raise ChallengeError( "Your code does not match verification session code." ) else: raise MaxedOutChallengeError else: await self.deactivate() @classmethod async def new( cls, request: Request, account: Account, **kwargs: Union[int, str, bool, float, list, dict], ): raise NotImplementedError class Meta: abstract = TrueUsed for client verification challenges that require some form of code or key.
Attributes
attempts:int- The amount of times a user entered a code not equal to this verification sessions code.
code:str- A secret key that would be sent via email, text, etc.
Ancestors
Subclasses
Class variables
var Metavar attempts : intvar code : str
Methods
async def check_code(self, code: str) ‑> None-
Expand source code
async def check_code(self, code: str) -> None: """ Checks if code passed is equivalent to the session code. Args: code (str): Code being cross-checked with session code. Raises: ChallengeError MaxedOutChallengeError """ if not code or self.code != code.upper(): self.attempts += 1 if self.attempts < config.MAX_CHALLENGE_ATTEMPTS: await self.save(update_fields=["attempts"]) raise ChallengeError( "Your code does not match verification session code." ) else: raise MaxedOutChallengeError else: await self.deactivate()Checks if code passed is equivalent to the session code.
Args
code:str- Code being cross-checked with session code.
Raises
ChallengeError MaxedOutChallengeError
Inherited members