Skip to content

PolicyEngine#

authx.policy.PolicyEngine #

PolicyEngine(rules=None, evaluators=None, default_allow=False)

Evaluate policy rules for AuthX-managed identities.

Initialize the policy engine.

PARAMETER DESCRIPTION
rules

Initial policy rules.

TYPE: Optional[Sequence[PolicyRule]] DEFAULT: None

evaluators

Global custom evaluators that must pass for matching rules.

TYPE: Optional[Sequence[PolicyEvaluator]] DEFAULT: None

default_allow

Whether to allow requests when no rule matches.

TYPE: bool DEFAULT: False

Source code in authx/policy.py
def __init__(
    self,
    rules: Optional[Sequence[PolicyRule]] = None,
    evaluators: Optional[Sequence[PolicyEvaluator]] = None,
    default_allow: bool = False,
) -> None:
    """Initialize the policy engine.

    Args:
        rules: Initial policy rules.
        evaluators: Global custom evaluators that must pass for matching rules.
        default_allow: Whether to allow requests when no rule matches.
    """
    self._rules = list(rules or [])
    self._evaluators = list(evaluators or [])
    self.default_allow = default_allow

default_allow instance-attribute #

default_allow = default_allow

rules property #

rules

Return registered policy rules.

add_rule #

add_rule(rule)

Register a policy rule.

Source code in authx/policy.py
def add_rule(self, rule: PolicyRule) -> None:
    """Register a policy rule."""
    self._rules.append(rule)

add_evaluator #

add_evaluator(evaluator)

Register a global custom policy evaluator.

Source code in authx/policy.py
def add_evaluator(self, evaluator: PolicyEvaluator) -> None:
    """Register a global custom policy evaluator."""
    self._evaluators.append(evaluator)

evaluate async #

evaluate(context)

Evaluate context against registered rules.

Explicit deny rules win over allow rules. If no rule matches, the configured default decision is returned.

Source code in authx/policy.py
async def evaluate(self, context: PolicyContext) -> PolicyDecision:
    """Evaluate context against registered rules.

    Explicit deny rules win over allow rules. If no rule matches, the
    configured default decision is returned.
    """
    allow_decision: Optional[PolicyDecision] = None
    for rule in self._rules:
        if not await self._matches_rule(context, rule):
            continue

        reason = rule.reason or f"Policy rule {rule.effect} matched"
        decision = PolicyDecision(allowed=(rule.effect == "allow"), reason=reason, rule=rule)
        if rule.effect == "deny":
            return decision
        allow_decision = decision

    if allow_decision is not None:
        return allow_decision

    if self.default_allow:
        return PolicyDecision(allowed=True, reason="No policy rule matched; default allow")
    return PolicyDecision(allowed=False, reason="No policy rule matched")

PolicyRule#

authx.policy.PolicyRule dataclass #

PolicyRule(effect, actions, resources, conditions=list(), scopes=None, all_scopes_required=True, evaluators=list(), reason=None)

A single authorization rule.

effect instance-attribute #

effect

actions instance-attribute #

actions

resources instance-attribute #

resources

conditions class-attribute instance-attribute #

conditions = field(default_factory=list)

scopes class-attribute instance-attribute #

scopes = None

all_scopes_required class-attribute instance-attribute #

all_scopes_required = True

evaluators class-attribute instance-attribute #

evaluators = field(default_factory=list)

reason class-attribute instance-attribute #

reason = None

matches_action #

matches_action(action)

Return whether the requested action matches this rule.

Source code in authx/policy.py
def matches_action(self, action: str) -> bool:
    """Return whether the requested action matches this rule."""
    return any(rule_action == "*" or match_scope(action, rule_action) for rule_action in self.actions)

matches_resource #

matches_resource(resource)

Return whether the requested resource matches this rule.

Source code in authx/policy.py
def matches_resource(self, resource: str) -> bool:
    """Return whether the requested resource matches this rule."""
    return any(rule_resource == "*" or match_scope(resource, rule_resource) for rule_resource in self.resources)

matches_scopes #

matches_scopes(payload)

Return whether token scopes satisfy this rule.

Source code in authx/policy.py
def matches_scopes(self, payload: Optional[TokenPayload]) -> bool:
    """Return whether token scopes satisfy this rule."""
    if self.scopes is None:
        return True
    provided = payload.scopes if payload is not None else None
    return has_required_scopes(self.scopes, provided, all_required=self.all_scopes_required)

PolicyContext#

authx.policy.PolicyContext dataclass #

PolicyContext(login_type, action, resource, payload=None, request=None, subject=None, resource_attrs=None, environment=dict())

Context passed to policy evaluators.

login_type instance-attribute #

login_type

action instance-attribute #

action

resource instance-attribute #

resource

payload class-attribute instance-attribute #

payload = None

request class-attribute instance-attribute #

request = None

subject class-attribute instance-attribute #

subject = None

resource_attrs class-attribute instance-attribute #

resource_attrs = None

environment class-attribute instance-attribute #

environment = field(default_factory=dict)

get_source #

get_source(source)

Return a named source for policy conditions.

Source code in authx/policy.py
def get_source(self, source: PolicySource) -> Any:
    """Return a named source for policy conditions."""
    if source == "subject":
        return self.subject
    if source == "resource":
        return self.resource_attrs
    if source == "environment":
        return self.environment
    if source == "token" and self.payload is not None:
        return self.payload.model_dump()
    if source == "token":
        return {}
    raise PolicyEvaluationError(f"Unsupported policy source: {source}")  # pragma: no cover

PolicyDecision#

authx.policy.PolicyDecision dataclass #

PolicyDecision(allowed, reason, rule=None)

Result of policy evaluation.

allowed instance-attribute #

allowed

reason instance-attribute #

reason

rule class-attribute instance-attribute #

rule = None

PolicyEvaluator#

authx.policy.PolicyEvaluator #

Bases: Protocol

Protocol for custom policy evaluators.

PolicyCondition#

authx.policy.PolicyCondition dataclass #

PolicyCondition(source, key, operator='equals', value=None)

A condition evaluated against a policy context source.

source instance-attribute #

source

key instance-attribute #

key

operator class-attribute instance-attribute #

operator = 'equals'

value class-attribute instance-attribute #

value = None

matches #

matches(context)

Return whether this condition matches the given context.

Source code in authx/policy.py
def matches(self, context: "PolicyContext") -> bool:
    """Return whether this condition matches the given context."""
    candidate = _read_value(context.get_source(self.source), self.key)

    if self.operator == "exists":
        return candidate is not None
    if self.operator == "equals":
        return candidate == self.value
    if self.operator == "not_equals":
        return candidate != self.value
    if self.operator == "in":
        return candidate in _as_sequence(self.value)
    if self.operator == "not_in":
        return candidate not in _as_sequence(self.value)
    if self.operator == "contains":
        if candidate is None:
            return False
        return self.value in candidate
    if self.operator == "gt":
        return candidate is not None and candidate > self.value
    if self.operator == "gte":
        return candidate is not None and candidate >= self.value
    if self.operator == "lt":
        return candidate is not None and candidate < self.value
    if self.operator == "lte":
        return candidate is not None and candidate <= self.value
    raise PolicyEvaluationError(f"Unsupported policy operator: {self.operator}")  # pragma: no cover