Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
lib-py-b2b / standard.py
Size: Mime:
from typing import Type, Optional

from dateutil.parser import isoparse

from lib_b2b.address import Address
from lib_b2b.order_change import OrderStatusChangeDataProvider
from lib_b2b.order_change.accept import OrderAcceptChangeRequest
from lib_b2b.order_change.address import OrderAddressChangeRequest, OrderAddressChangeDataProvider
from lib_b2b.order_change.cancel import OrderCancelChangeRequest, OrderCancelChangeDataProvider
from lib_b2b.order_change.revision import OrderRevisionChangeRequest, OrderRevisionChangeDataProvider
from lib_b2b.order_change.ship_date import OrderShipDateChangeRequest, OrderShipDateChangeDataProvider
from lib_b2b.order_request import OrderRequestType, OrderRequest
from lib_b2b.order_status import OrderStatus
from .order import Order
from .validator import B2BModel, StandardDialectValidator
from .customer import Customer
from .orders import Orders
from .carrier import TrackingUrl, CarrierType
from jsonschema.exceptions import ValidationError
import logging
from os import environ
from .errors import InvalidAPIKey, OrderExistsError, NotificationFailedError, AcknowledgementUrlNotProvidedError
from aws_xray_sdk.core import xray_recorder
import requests
import json
from datetime import datetime, timezone
from .dialect import Dialect, CreateOrderRequestDataProvider, CancelOrderRequestDataProvider
from .util import UtilityEncoder

log_level = logging.getLevelName(environ['LOG_LEVEL']) if 'LOG_LEVEL' in environ else logging.INFO
logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
                    datefmt='%d-%m-%Y:%H:%M:%S',
                    level=log_level)
logger = logging.getLogger('lib-b2b-standard')


class StandardDialect(Dialect, CreateOrderRequestDataProvider, CancelOrderRequestDataProvider):
    @staticmethod
    def matches_dialect(request):
        return 'x-api-key' in request.headers

    def __init__(self, request):
        self.request = request
        self.request_data = None
        self.customer = None
        self.validator = StandardDialectValidator()

    @property
    def request_type(self) -> OrderRequestType:
        # TODO: - look into imrpoving the intelligence of this
        return OrderRequestType.CREATE

    def __validate_api_key(self, api_key, customer):
        if customer['api_key'] != api_key:
            raise InvalidAPIKey(f"{[api_key]} is invalid for customer {customer['customer_edi_id']}")

    @xray_recorder.capture()
    def verify_request(self):
        if not self.request.json_body:
            raise ValidationError("JSON body not found.")
        self.request_data = self.request.json_body

        if 'api-key' not in self.request.headers and 'x-api-key' not in self.request.headers:
            logger.error('Missing API Key')
            raise InvalidAPIKey('Missing API Key')

        api_key = self.request.headers.get('api-key', self.request.headers.get('x-api-key'))
        self.customer = Customer.fetch_by_edi_id(customer_edi_id=self.request_data['customer_edi_id'])
        self.__validate_api_key(
            api_key=api_key,
            customer=self.customer
        )

    @xray_recorder.capture()
    def verify_order(self):
        self.validator.validate_model(B2BModel.ORDER, self.request_data)
        if 'order_lines' not in self.request_data or not self.request_data['order_lines']:
            logger.error('Orders must contain lines.')
            raise ValidationError('Orders must contain lines.')
        order_id = Order.generate_id(self.customer, self.request_data['purchase_order'])
        if Orders.exists(order_id=order_id, revision=self.request_data['purchase_order_revision']):
            logger.error("Purchase Orders Already Exists.")
            raise OrderExistsError(f"Purchase Orders Already Exists.[order: {order_id}, "
                                   f"revision: {self.request_data['purchase_order_revision']}]")

    @xray_recorder.capture()
    def verify_cancellation(self):
        if 'cancellation_reason' not in self.request_data:
            raise ValidationError("Missing field cancellation_reason")
        if 'purchase_order' not in self.request_data:
            raise ValidationError("Missing field purchase_order")

    @xray_recorder.capture()
    def verify(self, request_type: OrderRequestType):
        if self.request:
            self.verify_request()
        if self.request_data:
            if request_type is OrderRequestType.CREATE:
                self.verify_order()
            elif request_type is OrderRequestType.CANCEL:
                self.verify_cancellation()

    @property
    def cancellation_reason(self):
        return self.request_data.get('cancellation_reason', None)

    @property
    def cancelled_at(self):
        return isoparse(self.request_data.get('cancelled_at', datetime.now(tz=timezone.utc).isoformat()))

    @property
    def order_id(self):
        return Order.generate_id(self.customer, self.request_data['purchase_order'])

    @property
    def shipTo(self):
        return Address.from_dict(self.request_data['shipTo'])

    def data_provider_for(self, order_request: OrderRequest, change_request_type: Type[OrderRequest.C]):
        if change_request_type is OrderAcceptChangeRequest:
            class __OCDP(OrderStatusChangeDataProvider):
                def data(self) -> dict:
                    return order_request.request_data

                @property
                def new_status(self):
                    return OrderStatus.ACCEPTED
            return __OCDP()
        elif change_request_type is OrderAddressChangeRequest:
            class __OCDP(OrderAddressChangeDataProvider):
                @property
                def new_address(self) -> Address:
                    return order_request.dialect.shipTo

                def data(self) -> dict:
                    return order_request.request_data
            return __OCDP()
        elif change_request_type is OrderCancelChangeRequest:
            class __OCDP(OrderCancelChangeDataProvider):
                @property
                def cancellation_reason(self) -> Optional[str]:
                    return order_request.request_data.get('cancellation_reason', None)

                @property
                def cancelled_on(self) -> Optional[datetime]:
                    cancelled_dt = order_request.request_data.get('cancelled_at', None)
                    if cancelled_dt:
                        return isoparse(cancelled_dt)
                    else:
                        return None

                def data(self) -> dict:
                    return order_request.request_data
            return __OCDP()
        elif change_request_type is OrderRevisionChangeRequest:
            class __OCDP(OrderRevisionChangeDataProvider):
                @property
                def new_revision(self) -> Optional[int]:
                    return self.request.request_data.get('purchase_order_revision')

                @property
                def request(self) -> 'OrderRequest':
                    return self.request

                def data(self) -> dict:
                    return self.request.request_data
            return __OCDP()
        # Don't think that standard orders should support line level cancellations
        # elif change_request_type is OrderCancelLineChangeRequest:
        #     class __OCDP(OrderCancelLineChangeDataProvider):
        #         def cancellations(self) -> [OrderLineCancellation]:
        #             return [OrderLineCancellation(
        #                 cancellation_reason=order_request.request_data.get('cancellation_reason', None),
        #                 cancelled_at=isoparse(
        #                     order_request.request_data.get('cancelled_at',
        #                     datetime.now(tz=timezone.utc).isoformat())),
        #                 purchase_order_line=order_request.request_data.get('purchase_order_line')
        #             )]
        #
        #         def data(self) -> dict:
        #             return order_request.request_data
        #     return __OCDP()
        elif change_request_type is OrderShipDateChangeRequest:
            class __OCDP(OrderShipDateChangeDataProvider):
                @property
                def new_ship_date(self) -> Optional[datetime]:
                    datestr = order_request.request_data.get('not_before_date',
                                                             order_request.request_data.get('required_date', None))
                    if datestr:
                        return isoparse(datestr)
                    else:
                        return None

                def data(self) -> dict:
                    return order_request.request_data
            return __OCDP()
        else:
            # We don't support tracking financial status changes for non-shopify orders
            raise NotImplementedError


class StandardShipNoticeAction:
    def __init__(self, fulfillment):
        self.fulfillment = fulfillment

    def __get_url(self):
        if 'notification_url' in self.fulfillment:
            return self.fulfillment['notification_url']
        else:
            return None

    def __build_payload(self):
        # Need to decompose fulfillment into a notification per line
        notices = []
        tracking_numbers = self.fulfillment['tracking_numbers']
        tracking_str = ','.join(tracking_numbers)
        tracking_url = TrackingUrl.url_for(tracking_str, CarrierType(self.fulfillment.get('carrier_type', CarrierType.FEDEX_SHIP_MGR.value)))
        for line in self.fulfillment['line_items']:
            ship_notice = {
                "customer_edi_id": self.fulfillment['customer_edi_id'],
                "purchase_order": self.fulfillment['purchase_order'],
                "purchase_order_line": line['purchase_order_line'],
                "tracking": [
                    {
                        "carrier": self.fulfillment.get('carrier_type', CarrierType.FEDEX_SHIP_MGR.value),
                        "tracking": self.fulfillment['tracking_numbers'],
                        "url": tracking_url
                    }
                ]
            }
            if 'channel_name' in self.fulfillment:
                ship_notice['channel_name'] = self.fulfillment['channel_name']
            if 'channel_order_id' in self.fulfillment:
                ship_notice['channel_order_id'] = self.fulfillment['channel_order_id']
            notices.append(ship_notice)
        return notices

    def send(self):
        try:
            notification_url = self.__get_url()
            notices = self.__build_payload()
            for notice in notices:
                json_payload = json.dumps(notice, cls=UtilityEncoder)
                logger.debug(f"Sending ShipNotice with url: {notification_url}")
                response = requests.put(notification_url, data=json_payload, headers={'Content-type': 'application/json'})
                logger.debug(f"ShipNotice response: {response}")
                logger.debug(response.raw)
                try:
                    order = Orders.for_(self.fulfillment['order_id'])
                    if order:
                        order.chronicle(
                            message=f"Sent ship notice for fulfillment {self.fulfillment['id']}",
                            when=datetime.now(),
                            data=notice)
                except Exception as e:
                    logger.exception("Unable to chronicle event")
        except requests.exceptions.RequestException as re:
            raise NotificationFailedError(f"Unable to call notification url [{notification_url}] :: {str(re)}")


class StandardAcknowledgementAction:
    def __init__(self, order):
        self.order = order

    def __get_url(self):
        if 'notification_urls' in self.order and 'order_acknowledgement' in self.order['notification_urls']:
            return self.order['notification_urls']['order_acknowledgement']
        else:
            raise AcknowledgementUrlNotProvidedError(f"Acknowledgement url not provided for [{self.order['id']}]")

    def __build_payload(self):
        ack = {
            "customer_edi_id": self.order['customer_edi_id'],
            "purchase_order": self.order['purchase_order'],
            "order_number": self.order['purchase_order'],
            "status": self.order['order_status'],
        }
        if 'channel_name' in self.order:
            ack['channel_name'] = self.order['channel_name']
        if 'channel_order_id' in self.order:
            ack['channel_order_id'] = self.order['channel_order_id']
        if self.order['order_status'] == 'REJECTED':
            ack["errors"] = [
                {
                    "message": self.order['rejection_reason'] if 'rejection_reason' in self.order else '',
                    "type": "order"
                }
            ]
        return ack

    def send(self):
        try:
            notification_url = self.__get_url()
            payload = self.__build_payload()
            json_payload = json.dumps(payload, cls=UtilityEncoder)
            logger.debug(f"Sending OrderAck with url: {notification_url}")
            response = requests.put(notification_url, data=json_payload, headers={'Content-type': 'application/json'})
            logger.debug(f"OrderAck response: {response}")
            logger.debug(response.raw)
            try:
                self.order.chronicle(
                    message=f"Sent order acknowledgement with status of {self.order.status.value}.",
                    when=datetime.now(),
                    data=payload)
            except Exception as e:
                logger.exception("Unable to chronicle event")
        except requests.exceptions.RequestException as re:
            raise NotificationFailedError(f"Unable to call notification url [{notification_url}] :: {str(re)}")