Module sanic_security.test.server

Functions

async def on_account_creation(request)
Expand source code
@app.post("api/test/account")
async def on_account_creation(request):
    """Quick account creation."""
    account = await Account.create(
        username=request.form.get("username"),
        email=request.form.get("email"),
        password=password_hasher.hash("password"),
        verified=True,
        disabled=False,
    )
    response = json("Account creation successful!", account.json)
    return response

Quick account creation.

async def on_authenticate(request)
Expand source code
@app.post("api/test/auth")
@requires_authentication
async def on_authenticate(request):
    """Authenticate client session and account."""
    response = json(
        "Authenticated!",
        {
            "bearer": (
                request.ctx.session.bearer.json
                if not request.ctx.session.anonymous
                else None
            ),
            "refresh": request.ctx.session.is_refresh,
        },
    )
    return response

Authenticate client session and account.

async def on_authentication_expire(request)
Expand source code
@app.post("api/test/auth/expire")
@requires_authentication
async def on_authentication_expire(request):
    """Expire client's session."""
    request.ctx.session.expiration_date = datetime.datetime.now(datetime.UTC)
    await request.ctx.session.save(update_fields=["expiration_date"])
    return json("Authentication expired!", request.ctx.session.json)

Expire client's session.

async def on_authorization(request)
Expand source code
@app.post("api/test/auth/roles")
async def on_authorization(request):
    """Check if client is authorized with sufficient roles and permissions."""
    await check_roles(request, request.form.get("role"))
    if request.form.get("permissions_required"):
        await check_permissions(
            request, *request.form.get("permissions_required").split(", ")
        )
    return text("Account permitted!")

Check if client is authorized with sufficient roles and permissions.

async def on_captcha_attempt(request)
Expand source code
@app.post("api/test/capt")
@requires_captcha
async def on_captcha_attempt(request):
    """Attempt captcha challenge."""
    return json("Captcha attempt successful!", request.ctx.session.json)

Attempt captcha challenge.

async def on_captcha_audio(request)
Expand source code
@app.get("api/test/capt/audio")
async def on_captcha_audio(request):
    """Request captcha audio."""
    captcha_session = await CaptchaSession.decode(request)
    return raw(captcha_session.get_audio(), content_type="audio/mpeg")

Request captcha audio.

async def on_captcha_image(request)
Expand source code
@app.get("api/test/capt/image")
async def on_captcha_image(request):
    """Request captcha image."""
    captcha_session = await CaptchaSession.decode(request)
    return raw(captcha_session.get_image(), content_type="image/jpeg")

Request captcha image.

async def on_captcha_request(request)
Expand source code
@app.get("api/test/capt/request")
async def on_captcha_request(request):
    """Request captcha with solution in response."""
    captcha_session = await CaptchaSession.new(request)
    response = json("Captcha request successful!", captcha_session.code)
    captcha_session.encode(response)
    return response

Request captcha with solution in response.

async def on_get_associated_authentication_sessions(request)
Expand source code
@app.get("api/test/auth/associated")
@requires_authentication
async def on_get_associated_authentication_sessions(request):
    """Retrieves authentication sessions associated with logged in account."""
    authentication_sessions = await AuthenticationSession.get_associated(
        request.ctx.session.bearer
    )
    return json(
        "Associated authentication sessions retrieved!",
        [auth_session.json for auth_session in authentication_sessions],
    )

Retrieves authentication sessions associated with logged in account.

async def on_login(request)
Expand source code
@app.post("api/test/auth/login")
async def on_login(request):
    """Login to an account with an email and password."""
    authentication_session = await login(
        request,
        require_second_factor=str_to_bool(
            request.args.get("two-factor-authentication")
        ),
    )
    if str_to_bool(request.args.get("two-factor-authentication")):
        two_step_session = await request_two_step_verification(
            request, authentication_session.bearer, "2fa"
        )
        response = json(
            "Login successful! Two-factor authentication required.",
            two_step_session.code,
        )
        two_step_session.encode(response)
    else:
        response = json("Login successful!", authentication_session.json)
    authentication_session.encode(response)
    return response

Login to an account with an email and password.

async def on_login_anonymous(request)
Expand source code
@app.post("api/test/auth/login/anon")
async def on_login_anonymous(request):
    """Login as anonymous user."""
    authentication_session = await AuthenticationSession.new(request)
    response = json(
        "Anonymous user now associated with session!", authentication_session.json
    )
    authentication_session.encode(response)
    return response

Login as anonymous user.

async def on_logout(request)
Expand source code
@app.post("api/test/auth/logout")
async def on_logout(request):
    """Logout of currently logged in account."""
    authentication_session = await logout(request)
    await oauth_revoke(request, google_oauth)
    response = json("Logout successful!", authentication_session.json)
    return response

Logout of currently logged in account.

async def on_oauth_callback(request)
Expand source code
@app.get("api/test/oauth/callback")
async def on_oauth_callback(request):
    """OAuth callback."""
    token_info, authentication_session = await oauth_callback(
        request,
        google_oauth,
        "http://localhost:8000/api/test/oauth/callback",
    )
    response = json(
        "OAuth successful.",
        {"token_info": token_info, "auth_session": authentication_session.json},
    )
    oauth_encode(response, token_info)
    authentication_session.encode(response)
    return response

OAuth callback.

async def on_oauth_request(request)
Expand source code
@app.route("api/test/oauth", methods=["GET", "POST"])
async def on_oauth_request(request):
    """OAuth request."""
    return redirect(
        await google_oauth.get_authorization_url(
            "http://localhost:8000/api/test/oauth/callback",
            scope=google_oauth.base_scopes,
        )
    )

OAuth request.

async def on_oauth_revoke(request)
Expand source code
@app.route("api/test/oauth/revoke", methods=["GET", "POST"])
async def on_oauth_revoke(request):
    """OAuth token revocation."""
    token_info = await oauth_revoke(request, google_oauth)
    return json("Access token revoked!", token_info)

OAuth token revocation.

async def on_oauth_token(request)
Expand source code
@app.get("api/test/oauth/token")
@requires_authentication
async def on_oauth_token(request):
    """OAuth token retrieval."""
    token_info = await oauth_decode(request, google_oauth)
    return json(
        "Access token retrieved!",
        {"token_info": token_info, "auth_session": request.ctx.session.json},
    )

OAuth token retrieval.

async def on_register(request)
Expand source code
@app.post("api/test/auth/register")
async def on_register(request):
    """Register an account with email and password."""
    account = await register(
        request,
        verified=str_to_bool(request.form.get("verified")),
        disabled=str_to_bool(request.form.get("disabled")),
    )
    if not account.verified:
        two_step_session = await request_two_step_verification(request, account, "2fa")
        response = json(
            "Registration successful! Verification required.", two_step_session.code
        )
        two_step_session.encode(response)
    else:
        response = json("Registration successful!", account.json)
    return response

Register an account with email and password.

async def on_request_verification(request)
Expand source code
@app.post("api/test/two-step/request")
async def on_request_verification(request):
    """Request two-step verification with code in the response."""
    two_step_session = await request_two_step_verification(request)
    response = json("Verification request successful!", two_step_session.code)
    two_step_session.encode(response)
    return response

Request two-step verification with code in the response.

async def on_role_assign(request)
Expand source code
@app.post("api/test/auth/roles/assign")
@requires_authentication
async def on_role_assign(request):
    """Assign authenticated account a role."""
    await assign_role(
        request.form.get("name"),
        request.ctx.session.bearer,
        "Role used for testing.",
        *(
            request.form.get("permissions").split(", ")
            if request.form.get("permissions")
            else []
        ),
    )
    return text("Role assigned!")

Assign authenticated account a role.

async def on_security_error(request, exception)
Expand source code
@app.exception(SecurityError)
async def on_security_error(request, exception):
    """Handles security errors with correct response."""
    traceback.print_exc()
    return exception.json

Handles security errors with correct response.

async def on_two_factor_authentication(request)
Expand source code
@app.post("api/test/auth/validate-2fa")
async def on_two_factor_authentication(request):
    """Fulfills client authentication session's second factor requirement."""
    authentication_session = await fulfill_second_factor(request)
    response = json(
        "Authentication session second-factor fulfilled! You are now authenticated.",
        authentication_session.bearer.json,
    )
    return response

Fulfills client authentication session's second factor requirement.

async def on_verification_attempt(request)
Expand source code
@app.post("api/test/two-step")
@requires_two_step_verification
async def on_verification_attempt(request):
    """Attempt two-step verification challenge."""
    return json("Two step verification attempt successful!", request.ctx.session.json)

Attempt two-step verification challenge.

async def on_verify(request)
Expand source code
@app.post("api/test/auth/verify")
async def on_verify(request):
    """Verifies client account."""
    two_step_session = await verify_account(request)
    return json(
        "You have verified your account and may login!", two_step_session.bearer.json
    )

Verifies client account.