Repository URL to install this package:
|
Version:
8.0.0 ▾
|
from typing import Any, Dict, NotRequired, TypedDict
from sincpro_framework import UseFramework
from sincpro_siat_soap.adapters.siat_exception_builder import siat_exception_builder
from sincpro_siat_soap.adapters.soap import Client, ProxySiatRegistry, proxy_siat
from sincpro_siat_soap.domain import SIATEnvironment
from sincpro_siat_soap.infrastructure.error_handler import siat_global_error_handler
class SIATContext(TypedDict, total=False):
"""Typed context keys injected into Features and ApplicationServices."""
TOKEN: NotRequired[str]
SIAT_ENV: NotRequired[SIATEnvironment]
SIGN_KEY_PASSWORD: NotRequired[str]
class DependencyContextType:
proxy_siat: ProxySiatRegistry
context: SIATContext
def soap_client(self, wsdl: str) -> Client:
"""
Helper function to get SOAP client with context-aware proxy selection.
Args:
wsdl: WSDL service name to get client for
Returns:
Client: SOAP client instance for the requested service
"""
token = self.context.get("TOKEN")
siat_env = self.context.get("SIAT_ENV")
if token and siat_env:
return self.proxy_siat.get_proxy(
SIATEnvironment(siat_env), token
).get_client_for_service(wsdl)
return self.proxy_siat.default.get_client_for_service(wsdl)
def raise_if_transaction_is_false(self, response: Dict[str, Any]) -> None:
"""
Raise exception if transaction response indicates failure.
Args:
response: Response dictionary containing transaction status
Raises:
SIATValidationError: If transaction is False
"""
if response.get("transaccion") is False:
fn_raise_exception = siat_exception_builder(response)
fn_raise_exception()
def register_dependencies(framework_instance: UseFramework) -> None:
"""Register all dependencies and error handlers into the framework instance."""
framework_instance.add_dependency("proxy_siat", proxy_siat)
framework_instance.add_global_error_handler(siat_global_error_handler)