Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
faust-streaming / faust / web / cache / backends / base.py
Size: Mime:
"""Cache backend - base implementation."""
import abc
from typing import Any, ClassVar, Optional, Tuple, Type, Union

from mode import Service
from mode.utils.contexts import asynccontextmanager
from mode.utils.logging import get_logger
from mode.utils.typing import AsyncGenerator
from yarl import URL

from faust.types import AppT
from faust.types.web import CacheBackendT
from faust.web.cache.exceptions import CacheUnavailable

logger = get_logger(__name__)

E_CACHE_IRRECOVERABLE = "Cache disabled for irrecoverable error: %r"
E_CACHE_INVALIDATING = "Destroying cache for key %r caused error: %r"
E_CANNOT_INVALIDATE = "Unable to invalidate key %r: %r"
E_CACHE_INOPERATIONAL = "Cache operational error: %r"


class CacheBackend(CacheBackendT, Service):
    """Backend for cache operations."""

    logger = logger

    Unavailable: Type[BaseException] = CacheUnavailable

    operational_errors: ClassVar[Tuple[Type[BaseException], ...]] = ()
    invalidating_errors: ClassVar[Tuple[Type[BaseException], ...]] = ()
    irrecoverable_errors: ClassVar[Tuple[Type[BaseException], ...]] = ()

    def __init__(
        self, app: AppT, url: Union[URL, str] = "memory://", **kwargs: Any
    ) -> None:
        self.app = app
        self.url = URL(url)
        Service.__init__(self, **kwargs)

    @abc.abstractmethod
    async def _get(self, key: str) -> Optional[bytes]:
        ...

    @abc.abstractmethod
    async def _set(self, key: str, value: bytes, timeout: float = None) -> None:
        ...

    @abc.abstractmethod
    async def _delete(self, key: str) -> None:
        ...

    async def get(self, key: str) -> Optional[bytes]:
        """Get cached-value by key."""
        async with self._recovery_context(key):
            return await self._get(key)

    async def set(self, key: str, value: bytes, timeout: float = None) -> None:
        """Set cached-value by key."""
        async with self._recovery_context(key):
            await self._set(key, value, timeout)

    async def delete(self, key: str) -> None:
        """Forget value for cache key."""
        async with self._recovery_context(key):
            await self._delete(key)

    @asynccontextmanager
    async def _recovery_context(self, key: str) -> AsyncGenerator:
        try:
            yield
        except self.irrecoverable_errors as exc:
            self.log.exception(E_CACHE_IRRECOVERABLE, exc)  # noqa: G200
            raise self.Unavailable(exc)
        except self.invalidating_errors as exc:
            self.log.warning(E_CACHE_INVALIDATING, key, exc, exc_info=1)  # noqa: G200
            try:
                await self._delete(key)
            except self.operational_errors + self.invalidating_errors as exc:
                self.log.exception(E_CANNOT_INVALIDATE, key, exc)  # noqa: G200
            raise self.Unavailable()
        except self.operational_errors as exc:
            self.log.warning(E_CACHE_INOPERATIONAL, exc, exc_info=1)  # noqa: G200
            raise self.Unavailable()

    def _repr_info(self) -> str:
        return f"url={self.url!r}"