Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
sincpro-siat-soap / infrastructure / dependencies.py
Size: Mime:
from typing import Any, Dict, NotRequired, TypedDict

from sincpro_framework import UseFramework

from sincpro_siat_soap.adapters.siat_exception_builder import siat_exception_builder
from sincpro_siat_soap.adapters.soap import Client, ProxySiatRegistry, proxy_siat
from sincpro_siat_soap.domain import SIATEnvironment
from sincpro_siat_soap.infrastructure.error_handler import siat_global_error_handler


class SIATContext(TypedDict, total=False):
    """Typed context keys injected into Features and ApplicationServices."""

    TOKEN: NotRequired[str]
    SIAT_ENV: NotRequired[SIATEnvironment]
    SIGN_KEY_PASSWORD: NotRequired[str]


class DependencyContextType:
    proxy_siat: ProxySiatRegistry
    context: SIATContext

    def soap_client(self, wsdl: str) -> Client:
        """
        Helper function to get SOAP client with context-aware proxy selection.

        Args:
            wsdl: WSDL service name to get client for

        Returns:
            Client: SOAP client instance for the requested service
        """
        token = self.context.get("TOKEN")
        siat_env = self.context.get("SIAT_ENV")

        if token and siat_env:
            return self.proxy_siat.get_proxy(
                SIATEnvironment(siat_env), token
            ).get_client_for_service(wsdl)

        return self.proxy_siat.default.get_client_for_service(wsdl)

    def raise_if_transaction_is_false(self, response: Dict[str, Any]) -> None:
        """
        Raise exception if transaction response indicates failure.

        Args:
            response: Response dictionary containing transaction status

        Raises:
            SIATValidationError: If transaction is False
        """
        if response.get("transaccion") is False:
            fn_raise_exception = siat_exception_builder(response)
            fn_raise_exception()


def register_dependencies(framework_instance: UseFramework) -> None:
    """Register all dependencies and error handlers into the framework instance."""
    framework_instance.add_dependency("proxy_siat", proxy_siat)
    framework_instance.add_global_error_handler(siat_global_error_handler)