Skip to content

AuthManager#

authx.manager.AuthManager #

AuthManager(policy_engine=None, policy_rules=None)

Bases: _ErrorHandler

Manage multiple isolated AuthX instances by login type.

Initialize AuthManager.

PARAMETER DESCRIPTION
policy_engine

Optional policy engine instance.

TYPE: Optional[PolicyEngine] DEFAULT: None

policy_rules

Optional rules used when creating the default policy engine.

TYPE: Optional[Sequence[PolicyRule]] DEFAULT: None

Source code in authx/manager.py
def __init__(
    self,
    policy_engine: Optional[PolicyEngine] = None,
    policy_rules: Optional[Sequence[PolicyRule]] = None,
) -> None:
    """Initialize AuthManager.

    Args:
        policy_engine: Optional policy engine instance.
        policy_rules: Optional rules used when creating the default policy engine.
    """
    self._auth_by_type: dict[str, AuthX[Any]] = {}
    self.policy_engine = policy_engine or PolicyEngine(rules=policy_rules)

policy_engine instance-attribute #

policy_engine = policy_engine or PolicyEngine(rules=policy_rules)

login_types property #

login_types

Return registered login types.

MSG_TokenError class-attribute instance-attribute #

MSG_TokenError = 'Token Error'

MSG_MissingTokenError class-attribute instance-attribute #

MSG_MissingTokenError = 'Missing JWT in request'

MSG_MissingCSRFTokenError class-attribute instance-attribute #

MSG_MissingCSRFTokenError = None

MSG_TokenTypeError class-attribute instance-attribute #

MSG_TokenTypeError = 'Bad token type'

MSG_RevokedTokenError class-attribute instance-attribute #

MSG_RevokedTokenError = 'Invalid token'

MSG_TokenRequiredError class-attribute instance-attribute #

MSG_TokenRequiredError = 'Token required'

MSG_FreshTokenRequiredError class-attribute instance-attribute #

MSG_FreshTokenRequiredError = 'Fresh token required'

MSG_AccessTokenRequiredError class-attribute instance-attribute #

MSG_AccessTokenRequiredError = 'Access token required'

MSG_RefreshTokenRequiredError class-attribute instance-attribute #

MSG_RefreshTokenRequiredError = 'Refresh token required'

MSG_CSRFError class-attribute instance-attribute #

MSG_CSRFError = 'CSRF double submit does not match'

MSG_JWTDecodeError class-attribute instance-attribute #

MSG_JWTDecodeError = 'Invalid Token'

MSG_InsufficientScopeError class-attribute instance-attribute #

MSG_InsufficientScopeError = None

MSG_LoginTypeMismatchError class-attribute instance-attribute #

MSG_LoginTypeMismatchError = None

MSG_PolicyDeniedError class-attribute instance-attribute #

MSG_PolicyDeniedError = None

MSG_PolicyEvaluationError class-attribute instance-attribute #

MSG_PolicyEvaluationError = 'Policy evaluation failed'

register #

register(auth)

Register an AuthX instance with a unique login type.

Source code in authx/manager.py
def register(self, auth: AuthX[Any]) -> None:
    """Register an AuthX instance with a unique login type."""
    if auth.login_type is None:
        raise BadConfigurationError("AuthX instances registered with AuthManager require a login_type")
    if auth.login_type in self._auth_by_type:
        raise BadConfigurationError(f"AuthX login_type '{auth.login_type}' is already registered")
    self._auth_by_type[auth.login_type] = auth

get #

get(login_type)

Return the AuthX instance for a login type.

Source code in authx/manager.py
def get(self, login_type: str) -> AuthX[Any]:
    """Return the AuthX instance for a login type."""
    try:
        return self._auth_by_type[login_type]
    except KeyError as e:
        raise BadConfigurationError(f"Unknown login_type '{login_type}'") from e

add_policy_rule #

add_policy_rule(rule)

Register a policy rule.

Source code in authx/manager.py
def add_policy_rule(self, rule: PolicyRule) -> None:
    """Register a policy rule."""
    self.policy_engine.add_rule(rule)

add_policy_evaluator #

add_policy_evaluator(evaluator)

Register a global policy evaluator.

Source code in authx/manager.py
def add_policy_evaluator(self, evaluator: PolicyEvaluator) -> None:
    """Register a global policy evaluator."""
    self.policy_engine.add_evaluator(evaluator)

create_access_token #

create_access_token(login_type, uid, fresh=False, headers=None, expiry=None, data=None, audience=None, scopes=None, *args, **kwargs)

Create an access token for a registered login type.

Source code in authx/manager.py
def create_access_token(
    self,
    login_type: str,
    uid: str,
    fresh: bool = False,
    headers: Optional[dict[str, Any]] = None,
    expiry: Optional[DateTimeExpression] = None,
    data: Optional[dict[str, Any]] = None,
    audience: Optional[StringOrSequence] = None,
    scopes: Optional[list[str]] = None,
    *args: Any,
    **kwargs: Any,
) -> str:
    """Create an access token for a registered login type."""
    auth = self.get(login_type)
    return auth.create_access_token(uid, fresh, headers, expiry, data, audience, scopes, *args, **kwargs)

create_refresh_token #

create_refresh_token(login_type, uid, headers=None, expiry=None, data=None, audience=None, scopes=None, *args, **kwargs)

Create a refresh token for a registered login type.

Source code in authx/manager.py
def create_refresh_token(
    self,
    login_type: str,
    uid: str,
    headers: Optional[dict[str, Any]] = None,
    expiry: Optional[DateTimeExpression] = None,
    data: Optional[dict[str, Any]] = None,
    audience: Optional[StringOrSequence] = None,
    scopes: Optional[list[str]] = None,
    *args: Any,
    **kwargs: Any,
) -> str:
    """Create a refresh token for a registered login type."""
    auth = self.get(login_type)
    return auth.create_refresh_token(uid, headers, expiry, data, audience, scopes, *args, **kwargs)

create_token_pair #

create_token_pair(login_type, uid, fresh=False, headers=None, access_expiry=None, refresh_expiry=None, data=None, audience=None, access_scopes=None, refresh_scopes=None)

Create access and refresh tokens for a registered login type.

Source code in authx/manager.py
def create_token_pair(
    self,
    login_type: str,
    uid: str,
    fresh: bool = False,
    headers: Optional[dict[str, Any]] = None,
    access_expiry: Optional[DateTimeExpression] = None,
    refresh_expiry: Optional[DateTimeExpression] = None,
    data: Optional[dict[str, Any]] = None,
    audience: Optional[StringOrSequence] = None,
    access_scopes: Optional[list[str]] = None,
    refresh_scopes: Optional[list[str]] = None,
) -> TokenResponse:
    """Create access and refresh tokens for a registered login type."""
    return self.get(login_type).create_token_pair(
        uid=uid,
        fresh=fresh,
        headers=headers,
        access_expiry=access_expiry,
        refresh_expiry=refresh_expiry,
        data=data,
        audience=audience,
        access_scopes=access_scopes,
        refresh_scopes=refresh_scopes,
    )

token_required #

token_required(login_type, type='access', verify_type=True, verify_fresh=False, verify_csrf=None, locations=None)

Dependency factory requiring a token for a specific login type.

Source code in authx/manager.py
def token_required(
    self,
    login_type: str,
    type: str = "access",
    verify_type: bool = True,
    verify_fresh: bool = False,
    verify_csrf: Optional[bool] = None,
    locations: Optional[TokenLocations] = None,
) -> Callable[[Request], Awaitable[TokenPayload]]:
    """Dependency factory requiring a token for a specific login type."""

    async def _auth_required(request: Request) -> TokenPayload:
        return await self._auth_required(
            login_type=login_type,
            request=request,
            type=type,
            verify_type=verify_type,
            verify_fresh=verify_fresh,
            verify_csrf=verify_csrf,
            locations=locations,
        )

    return _auth_required

access_token_required #

access_token_required(login_type)

Dependency factory requiring an access token for a login type.

Source code in authx/manager.py
def access_token_required(self, login_type: str) -> Callable[[Request], Awaitable[TokenPayload]]:
    """Dependency factory requiring an access token for a login type."""
    return self.token_required(login_type=login_type, type="access")

refresh_token_required #

refresh_token_required(login_type)

Dependency factory requiring a refresh token for a login type.

Source code in authx/manager.py
def refresh_token_required(self, login_type: str) -> Callable[[Request], Awaitable[TokenPayload]]:
    """Dependency factory requiring a refresh token for a login type."""
    return self.token_required(login_type=login_type, type="refresh")

fresh_token_required #

fresh_token_required(login_type)

Dependency factory requiring a fresh access token for a login type.

Source code in authx/manager.py
def fresh_token_required(self, login_type: str) -> Callable[[Request], Awaitable[TokenPayload]]:
    """Dependency factory requiring a fresh access token for a login type."""
    return self.token_required(login_type=login_type, type="access", verify_fresh=True)

authorize async #

authorize(login_type, action, resource, *, payload=None, request=None, subject=None, resource_attrs=None, env=None)

Authorize a token payload against the policy engine.

Source code in authx/manager.py
async def authorize(
    self,
    login_type: str,
    action: str,
    resource: str,
    *,
    payload: Optional[TokenPayload] = None,
    request: Optional[Request] = None,
    subject: Any = None,
    resource_attrs: Any = None,
    env: Optional[Mapping[str, Any]] = None,
) -> TokenPayload:
    """Authorize a token payload against the policy engine."""
    if payload is None:
        if request is None:
            raise PolicyDeniedError(
                "A request or token payload is required for policy authorization",
                login_type=login_type,
            )
        payload = await self._auth_required(login_type=login_type, request=request)
    else:
        self._verify_login_type(payload, login_type)

    context = PolicyContext(
        login_type=login_type,
        action=action,
        resource=resource,
        payload=payload,
        request=request,
        subject=subject if subject is not None else default_subject_from_payload(payload),
        resource_attrs=resource_attrs or {},
        environment=build_policy_environment(request=request, environment=env),
    )
    decision = await self.policy_engine.evaluate(context)
    if not decision.allowed:
        raise PolicyDeniedError(decision.reason, login_type=login_type)
    return payload

policy_required #

policy_required(login_type, action, resource, *, subject=None, resource_attrs=None, env=None)

Dependency factory requiring policy authorization.

Source code in authx/manager.py
def policy_required(
    self,
    login_type: str,
    action: str,
    resource: str,
    *,
    subject: Any = None,
    resource_attrs: Any = None,
    env: Optional[Mapping[str, Any]] = None,
) -> Callable[[Request], Awaitable[TokenPayload]]:
    """Dependency factory requiring policy authorization."""

    async def _policy_required(request: Request) -> TokenPayload:
        return await self.authorize(
            login_type=login_type,
            action=action,
            resource=resource,
            request=request,
            subject=subject,
            resource_attrs=resource_attrs,
            env=env,
        )

    return _policy_required

handle_errors #

handle_errors(app)

Add the FastAPI.exception_handlers relative to AuthX exceptions.

PARAMETER DESCRIPTION
app

the FastAPI application to handle errors for

TYPE: FastAPI

Source code in authx/_internal/_error.py
def handle_errors(self, app: FastAPI) -> None:
    """Add the `FastAPI.exception_handlers` relative to AuthX exceptions.

    Args:
        app (FastAPI): the FastAPI application to handle errors for
    """
    self._set_app_exception_handler(app, exception=exceptions.JWTDecodeError, status_code=422, message=None)
    self._set_app_exception_handler(
        app,
        exception=exceptions.MissingTokenError,
        status_code=401,
        message=self.MSG_TokenError,
    )
    self._set_app_exception_handler(
        app,
        exception=exceptions.MissingCSRFTokenError,
        status_code=401,
        message=None,  # Use detailed exception message for better user guidance
    )
    self._set_app_exception_handler(
        app,
        exception=exceptions.TokenTypeError,
        status_code=401,
        message=self.MSG_TokenTypeError,
    )
    self._set_app_exception_handler(
        app,
        exception=exceptions.LoginTypeMismatchError,
        status_code=401,
        message=None,
    )
    self._set_app_exception_handler(
        app,
        exception=exceptions.RevokedTokenError,
        status_code=401,
        message=self.MSG_RevokedTokenError,
    )
    self._set_app_exception_handler(
        app,
        exception=exceptions.TokenRequiredError,
        status_code=401,
        message=self.MSG_TokenRequiredError,
    )
    self._set_app_exception_handler(
        app,
        exception=exceptions.FreshTokenRequiredError,
        status_code=401,
        message=self.MSG_FreshTokenRequiredError,
    )
    self._set_app_exception_handler(
        app,
        exception=exceptions.AccessTokenRequiredError,
        status_code=401,
        message=self.MSG_AccessTokenRequiredError,
    )
    self._set_app_exception_handler(
        app,
        exception=exceptions.RefreshTokenRequiredError,
        status_code=401,
        message=self.MSG_RefreshTokenRequiredError,
    )
    self._set_app_exception_handler(
        app,
        exception=exceptions.CSRFError,
        status_code=401,
        message=self.MSG_CSRFError,
    )
    self._set_app_exception_handler(
        app,
        exception=exceptions.InsufficientScopeError,
        status_code=403,
        message=None,  # Use detailed exception message showing required vs provided scopes
    )
    self._set_app_exception_handler(
        app,
        exception=exceptions.PolicyDeniedError,
        status_code=403,
        message=None,
    )
    self._set_app_exception_handler(
        app,
        exception=exceptions.PolicyEvaluationError,
        status_code=500,
        message=self.MSG_PolicyEvaluationError,
    )
    self._set_app_exception_handler(
        app,
        exception=exceptions.SessionRevoked,
        status_code=401,
        message="Session has been revoked",
    )

    async def rate_limit_wrapper(request: Request, exc: exceptions.RateLimitExceeded) -> JSONResponse:
        return await self._rate_limit_handler(request, exc)

    app.exception_handler(exceptions.RateLimitExceeded)(rate_limit_wrapper)