Skip to content

HTTPCacheBackend

authx_extra.cache.HTTPCacheBackend

HTTPCacheBackend(redis, namespace=None)
PARAMETER DESCRIPTION
redis

TYPE: Redis

namespace

TYPE: Optional[str] DEFAULT: None

Source code in authx_extra/cache.py
def __init__(self, redis: redis.Redis, namespace: Optional[str] = None):
    self.redis = redis
    self.namespace = namespace or HTTPCache.namespace

redis instance-attribute

redis = redis

namespace instance-attribute

namespace = namespace or namespace

set async

set(key, value, ttl_in_seconds=None, ttl_func=None, end_of_day=False, end_of_week=False)
PARAMETER DESCRIPTION
key

TYPE: str

value

TYPE: str

ttl_in_seconds

TYPE: Optional[int] DEFAULT: None

ttl_func

TYPE: Optional[Callable] DEFAULT: None

end_of_day

TYPE: bool DEFAULT: False

end_of_week

TYPE: bool DEFAULT: False

Source code in authx_extra/cache.py
async def set(
    self,
    key: str,
    value: str,
    ttl_in_seconds: Optional[int] = None,
    ttl_func: Optional[Callable] = None,
    end_of_day: bool = False,
    end_of_week: bool = False,
):
    ttl: int = await HTTPExpiry.get_ttl(
        ttl_in_seconds=ttl_in_seconds,
        end_of_day=end_of_day,
        end_of_week=end_of_week,
        ttl_func=ttl_func,
    )

    stringified_value = value if isinstance(value, bytes) else json.dumps(value)
    with self.redis.pipeline(transaction=True) as pipe:
        pipe.multi()
        pipe.delete(key)
        pipe.set(key, stringified_value, ex=ttl)
        log_info(msg=f"CacheSet: {key}")
        result = pipe.execute()

    del_status, set_status = result

    if del_status:
        log_info(msg=f"CacheClearedOnSet: {key}")

    if set_status:
        log_info(msg=f"CacheSet: {key}")
    return result

get async

get(key)
PARAMETER DESCRIPTION
key

TYPE: str

Source code in authx_extra/cache.py
async def get(self, key: str) -> Tuple[Union[int, None], Union[Any, None]]:
    with self.redis.pipeline(transaction=True) as pipe:
        pipe.ttl(key).get(key)
        ttl, result = pipe.execute()

    if result:
        original_val = json.loads(result)
        log_info(msg=f"CacheHit: {key}")
    else:
        original_val = None
    return ttl, original_val

invalidate async

invalidate(key)

Invalidates the passed key

PARAMETER DESCRIPTION
key

TYPE: str

Source code in authx_extra/cache.py
async def invalidate(self, key: str) -> bool:
    """Invalidates the passed key"""

    with self.redis.pipeline(transaction=True) as pipe:
        pipe.multi()
        pipe.delete(key)
        log_info(msg=f"CacheInvalidated: {key}")
        result = pipe.execute()
    return result

invalidate_all async

invalidate_all(keys)

Invalidates a collection of keys

PARAMETER DESCRIPTION
keys

TYPE: List

Source code in authx_extra/cache.py
async def invalidate_all(self, keys: List) -> List[bool]:
    """Invalidates a collection of keys"""

    with self.redis.pipeline(transaction=True) as pipe:
        pipe.multi()
        for key in keys:
            pipe.delete(key)
            log_info(msg=f"CacheInvalidated: {key}")
        return pipe.execute()