Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
lib-py-b2b / shopify.py
Size: Mime:
from decimal import Decimal
from typing import Type, Optional

from py_aws_util.logging import log_data

from lib_b2b.address import Address
from lib_b2b.financial_status import FinancialStatus
from lib_b2b.order_builder import OrderBuilder
from lib_b2b.order_change.address import OrderAddressChangeRequest, OrderAddressChangeDataProvider
from lib_b2b.order_change.cancel import OrderCancelChangeRequest, OrderCancelChangeDataProvider
from lib_b2b.order_change.cancel_line import OrderCancelLineChangeRequest, OrderCancelLineChangeDataProvider, \
    OrderLineCancellation
from lib_b2b.order_change.paid import OrderFinancialStatusChangeRequest, OrderFinancialStatusChangeDataProvider
from lib_b2b.order_change.revision import OrderRevisionChangeRequest, OrderRevisionChangeDataProvider
from lib_b2b.order_change.ship_date import OrderShipDateChangeRequest, OrderShipDateChangeDataProvider
from lib_b2b.order_change.total import OrderTotalChangeDataProvider, OrderTotalChangeRequest
from lib_b2b.order_request import OrderRequestType, OrderRequest
from lib_b2b.order_totals import OrderTotals
from lib_b2b.refund import Refund, RefundLine
from .errors import InvalidSignatureError, ShopifyOrderExistsError
from .dialect import Dialect, CreateOrderRequestDataProvider, CancelOrderRequestDataProvider
from .util import safe_str, as_decimal
from .customer import Customer
from .util import UtilityEncoder
from .errors import ShipNoticeFailedError, OrderNotFoundError
from .profile import Profile
from jsonschema.exceptions import ValidationError
import logging
from os import environ
import hashlib, base64, hmac
from datetime import datetime, timezone
from dateutil.parser import isoparse
from aws_xray_sdk.core import xray_recorder
import requests
import json

log_level = logging.getLevelName(environ['LOG_LEVEL']) if 'LOG_LEVEL' in environ else logging.INFO
logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
                    datefmt='%d-%m-%Y:%H:%M:%S',
                    level=log_level)
logger = logging.getLogger('lib-b2b-shopify')


class ShopifyDialect(Dialect, CreateOrderRequestDataProvider, CancelOrderRequestDataProvider):
    @staticmethod
    def matches_dialect(request):
        return ShopifyDialect.header_exists('x-shopify-shop-domain', request)

    def __init__(self, request=None, inbound_order=None, customer=None):
        self.request = request
        self.request_data = inbound_order
        self.customer = customer
        self.topic = None
        self.shopify_order_id = None

    @staticmethod
    def get_header(key: str, request):
        return request.headers.get(key, request.headers.get(key.title()))

    @staticmethod
    def header_exists(key: str, request):
        return request.headers.get(key, request.headers.get(key.title(), None)) is not None

    def __get_header(self, key: str):
        return ShopifyDialect.get_header(key, self.request)

    def __header_exists(self, key: str):
        return ShopifyDialect.header_exists(key, self.request)

    def _hmac_is_valid(self, signature):
        key = self.customer['shopify_config']['secret'].encode('utf-8')
        _hash = hmac.new(key, self.request.raw_body, hashlib.sha256)
        hmac_calculated = base64.b64encode(_hash.digest()).decode()
        return hmac_calculated == signature

    @property
    def request_type(self) -> OrderRequestType:
        topic = self.__get_header('x-shopify-topic')
        return OrderRequestType(topic)

    @xray_recorder.capture()
    def verify_request(self):
        # is the shopify shop domain configured for a valid customer?
        shopify_domain = self.__get_header('x-shopify-shop-domain')
        # customer fetch_by_container_id will throw exception is customer is not found
        self.customer = Customer.fetch_by_shopify_domain(shopify_domain)

        # is the request a valid shopify request
        if not self.request.json_body:
            raise ValidationError("JSON body not found.")
        self.request_data = self.request.json_body
        logger.debug(self.request_data)

        if not self.__header_exists('x-shopify-hmac-sha256'):
            raise ValidationError("Missing shopify signature")
        signature = self.__get_header('x-shopify-hmac-sha256')

        if not self.__header_exists('x-shopify-topic'):
            raise ValidationError("Missing shopify topic")
        self.topic = self.__get_header('x-shopify-topic')
        if self.topic not in ['orders/create', 'orders/paid', 'orders/cancelled', 'orders/updated']:
            raise ValidationError(f"Invalid topic [{self.topic}]")

        if not self.__header_exists('x-shopify-order-id'):
            raise ValidationError(f"Missing shopify order id")
        self.shopify_order_id = self.__get_header('x-shopify-order-id')

        if not self._hmac_is_valid(signature):
            raise InvalidSignatureError("Shopify signature not valid")

    @xray_recorder.capture()
    def verify_order(self, override=False):
        from .order_status import OrderStatus
        order_id = Order.generate_id(self.customer, self.request_data['order_number'])
        revision = 1
        if self.topic == 'orders/create':
            if Orders.exists(order_id=order_id, revision=revision) and not override:
                existing_order = Orders.for_(order_id)
                if 'order_status' in existing_order and OrderStatus.for_(existing_order['order_status']) == OrderStatus.REJECTED:
                    logger.info(f"Allowing order {order_id} to be re-submitted because it was previously rejected.")
                else:
                    logger.warning("Purchase Orders Already Exists.")
                    raise ShopifyOrderExistsError(f"Purchase Orders Already Exists.[order: {order_id}, "
                                                  f"revision: {revision}]")
        if 'shipping_address' not in self.request_data:
            from .notification import Notifier, NotificationType, ErrorNotificationData
            Notifier.notify(
                notification_type=NotificationType.BUSINESS,
                customer=self.customer['customer_edi_id'],
                subject=f"B2B Orders Error ({order_id})",
                data=ErrorNotificationData(
                    order_id=order_id,
                    message="The order was not imported due to:",
                    errors=[f'Orders {order_id} does not have a shipping address']
                )
            )
            raise ValidationError(f'Orders {order_id} does not have a shipping address')

    @xray_recorder.capture()
    def verify_cancellation(self):
        if 'cancel_reason' not in self.request_data:
            raise ValidationError("Missing field cancellation_reason")

    @xray_recorder.capture()
    def verify(self, request_type: OrderRequestType):
        if self.request:
            self.verify_request()
        if self.request_data:
            if request_type is OrderRequestType.CREATE:
                self.verify_order()
            elif request_type is OrderRequestType.CANCEL:
                self.verify_cancellation()

    @property
    def order_id(self):
        return Order.generate_id(self.customer, self.request_data['order_number'])

    @property
    def cancellation_reason(self):
        return self.request_data.get('cancel_reason', None)

    @property
    def cancelled_at(self):
        return isoparse(self.request_data.get('cancelled_at', datetime.now(tz=timezone.utc).isoformat()))

    @property
    def updated_at(self):
        return isoparse(self.request_data.get('updated_at',
                                              datetime.now(tz=timezone.utc).isoformat())).astimezone(timezone.utc)


    @property
    def shipTo(self):
        _shipping_address = self.request_data.get('shipping_address')
        _country = 'USA'
        return Address(
            address1=safe_str(_shipping_address, 'address1'),
            address2=safe_str(_shipping_address, 'address2'),
            city=safe_str(_shipping_address, 'city'),
            state=safe_str(_shipping_address, 'province_code'),
            postalCode=safe_str(_shipping_address, 'zip'),
            country='USA',
            phone=safe_str(_shipping_address, 'phone'),
            name=safe_str(_shipping_address, 'name'),
            company=safe_str(_shipping_address, 'company')
        )

    @property
    def refunds(self) -> [Refund]:
        _refunds = []
        if self.request_data.get('refunds'):
            for _refund in self.request_data.get('refunds'):
                _lines = []
                for _refund_line in _refund.get('refund_line_items'):
                    _lines.append(
                        RefundLine(
                            line_id=_refund_line.get('id'),
                            quantity=_refund_line.get('quantity'),
                            line_item_id=_refund_line.get('line_item_id'),
                            product_id=_refund_line.get('line_item', {'sku': 'Unknown'}).get('sku'),
                            amount=Decimal(
                                str((_refund_line.get('subtotal', 0.0) + _refund_line.get('total_tax', 0.0))))
                        )
                    )
                _refunds.append(
                    Refund(
                        refund_id=_refund.get('id'),
                        order_id=self.order_id,
                        created_at=isoparse(_refund.get('created_at', datetime.now(tz=timezone.utc).isoformat())),
                        note=_refund.get('note', '-'),
                        lines=_lines
                    )
                )
        return _refunds

    def data_provider_for(self, order_request: OrderRequest, change_request_type: Type[OrderRequest.C]):
        if change_request_type is OrderAddressChangeRequest:
            class __OCDP(OrderAddressChangeDataProvider):
                def version(self) -> Optional[str]:
                    return order_request.dialect.updated_at.isoformat()

                @property
                def new_address(self) -> Address:
                    return order_request.dialect.shipTo

                def data(self) -> dict:
                    return order_request.request_data
            return __OCDP()
        elif change_request_type is OrderCancelChangeRequest:
            class __OCDP(OrderCancelChangeDataProvider):
                def version(self) -> Optional[str]:
                    return order_request.dialect.updated_at.isoformat()

                @property
                def cancellation_reason(self) -> Optional[str]:
                    return order_request.request_data.get('cancel_reason', None)

                @property
                def cancelled_on(self) -> Optional[datetime]:
                    cancelled_dt = order_request.request_data.get('cancelled_at', None)
                    if cancelled_dt:
                        return isoparse(cancelled_dt)
                    else:
                        return None

                def data(self) -> dict:
                    return order_request.request_data
            return __OCDP()
        elif change_request_type is OrderCancelLineChangeRequest:
            class __OCDP(OrderCancelLineChangeDataProvider):

                def version(self) -> Optional[str]:
                    return order_request.dialect.updated_at.isoformat()

                @property
                def cancellations(self) -> [OrderLineCancellation]:
                    _cancellation_lists = list(map(lambda x: x.as_cancellations(), order_request.dialect.refunds or []))
                    _cancellations = [cancellation for cancellation_list in _cancellation_lists for cancellation in cancellation_list]
                    return _cancellations

                def data(self) -> dict:
                    return order_request.request_data
            return __OCDP()
        elif change_request_type is OrderShipDateChangeRequest:
            class __OCDP(OrderShipDateChangeDataProvider):

                def version(self) -> Optional[str]:
                    return order_request.dialect.updated_at.isoformat()

                @property
                def new_ship_date(self) -> Optional[datetime]:
                    not_before_date = None
                    try:
                        if order_request.request_data.get('note_attributes'):
                            delay_list = list(map(lambda y: y['value'], filter(lambda x: x['name'] == 'Delay Date',
                                                                               order_request.request_data.get('note_attributes'))))
                            if delay_list:
                                not_before_date = datetime.strptime(delay_list[0], '%Y-%m-%d')
                    except Exception as e:
                        logger.exception(
                            f"Error attempting to parse Delay Date from "
                            f"Shopify order {order_request.dialect.order_id}", e)
                    return not_before_date

                def data(self) -> dict:
                    return order_request.request_data
            return __OCDP()
        elif change_request_type is OrderFinancialStatusChangeRequest:
            class __OCDP(OrderFinancialStatusChangeDataProvider):

                def version(self) -> Optional[str]:
                    return order_request.dialect.updated_at.isoformat()

                @property
                def financial_status(self) -> FinancialStatus:
                    return FinancialStatus(order_request.request_data.get('financial_status'))

                def data(self) -> dict:
                    return order_request.request_data
            return __OCDP()
        elif change_request_type is OrderRevisionChangeRequest:
            class __OCDP(OrderRevisionChangeDataProvider):

                def version(self) -> Optional[str]:
                    return order_request.dialect.updated_at.isoformat()

                @property
                def new_revision(self) -> Optional[int]:
                    return None

                @property
                def request(self) -> 'OrderRequest':
                    return self.request

                def data(self) -> dict:
                    return self.request.request_data
            return __OCDP()
        elif change_request_type is OrderTotalChangeRequest:
            _order_data = OrderBuilder.for_(order_request).build()
            _order = Order.from_dict(_order_data)

            class __OCDP(OrderTotalChangeDataProvider):

                def version(self) -> Optional[str]:
                    return order_request.dialect.updated_at.isoformat()

                @property
                def new_totals(self) -> OrderTotals:
                    return OrderTotals.create(
                        order_lines=_order.lines,
                        discounts=_order.discounts,
                        additional_charges=_order.additional_charges,
                        refunds=order_request.dialect.refunds,
                        total_amount=as_decimal(order_request.request_data.get('total_price'), 'total_amount'),
                        total_line_amount=as_decimal(order_request.request_data.get('total_line_items_price'), 'total_line_amount'),
                        total_tax=as_decimal(order_request.request_data.get('total_tax'), 'total_tax'),
                        total_discount=as_decimal(order_request.request_data.get('total_discounts'), 'total_discount')
                    )

                def data(self) -> dict:
                    return order_request.request_data
            return __OCDP()
        else:
            # We don't support tracking financial status changes for non-shopify orders
            raise NotImplementedError


class FullfillmentAction:
    def __init__(self, fulfillment):
        self.fulfillment = fulfillment
        self.customer = Customer.fetch_by_edi_id(self.fulfillment['customer_edi_id'])
        self.profile = Profile.profile_for(self.customer)
        self.location = self.profile.shopify_config.default_location

    def __get_url(self):
        try:
            url = f"https://{self.profile.shopify_config.key}:{self.profile.shopify_config.password}@{self.profile.shopify_config.domain}/admin/orders/{self.fulfillment['channel_order_id']}/fulfillments.json"
            return url
        except Exception as e:
            logger.error("Unable to generate url for shopify fullfillment action.", e)
            print(self.fulfillment)
            return None

    def __build_payload(self):
        try:
            fulfillment = {}
            fulfillment['location_id'] = self.location
            # Should always have the fulfillment object
            fulfillment['tracking_company'] = self.fulfillment['carrier']
            fulfillment['tracking_numbers'] = self.fulfillment['tracking_numbers']
            fulfillment['tracking_urls'] = self.fulfillment['tracking_urls']
            fulfillment['line_items'] = self.fulfillment['line_items']

            for item in fulfillment['line_items']:
                item['id'] = item['channel_line_id']
                del item['purchase_order_line']
                del item['line_id']
                del item['channel_line_id']

            payload = {
                "fulfillment": fulfillment
            }
            return payload
        except Exception as e:
            logger.error("Unable to build payload for shopify fullfillment action.", e)
            print(self.fulfillment)
            return None

    def send(self):
        try:
            notification_url = self.__get_url()
            if not notification_url:
                raise ShipNoticeFailedError("Unable to generate url for fulfillment.")
            payload = self.__build_payload()
            if not payload:
                raise ShipNoticeFailedError("Unable to generate payload for fulfillment.")
            json_payload = json.dumps(payload, cls=UtilityEncoder)
            logger.debug(f"Calling shopify fullfillment with url: {notification_url}")
            logger.debug(json_payload)
            response = requests.post(notification_url, data=json_payload, headers={'Content-type': 'application/json'})
            logger.debug(f"Fulfillment response: {response}")
            logger.debug(response.content)
            logger.debug(response.text)
            try:
                from .fulfillment import Fulfillment
                response_json = response.json()
                channel_fulfillment_id = response_json['fulfillment']['id']
                Fulfillment.update_channel_id(fulfillment_id=self.fulfillment['id'], channel_fulfillment_id=channel_fulfillment_id)
                order = Orders.for_(self.fulfillment['order_id'])
                if order:
                    line_item_str = ', '.join([line['purchase_order_line'] for line in self.fulfillment['line_items'] or []] or [])
                    order.chronicle(message=f"Sent fulfillment information to Shopify for lines: {line_item_str}",
                                    when=datetime.now(),
                                    data=payload)
            except OrderNotFoundError:
                logger.error(f"Unable to find order for fulfillment {self.fulfillment['id']}",
                             extra=log_data(fulfillment_id=self.fulfillment['id']))
            except ValueError as ve:
                logger.warning("Fulfillment response did not contain json.", ve)
            except KeyError as ke:
                logger.warning(f"Unable to extract channel fulfillment id from {str(response_json)} :: {str(ke)}")
            if response.status_code == 422:
                logger.info(response.text)
                logger.info("Considering ship notice successful since it's already fulfilled")
            elif response.status_code > 400:
                e = ShipNoticeFailedError(str(response) + ' on fulfillment ' + self.fulfillment['id'])
                logger.error(response.text)
                logger.error(e)
                raise e
            return
        except requests.exceptions.RequestException as re:
            logger.error(f"Unable to call notification url [{notification_url}] :: {str(re)}")
            raise re


class CancelFullfillmentAction:
    def __init__(self, fulfillment):
        self.fulfillment = fulfillment
        self.customer = Customer.fetch_by_edi_id(self.fulfillment['customer_edi_id'])
        self.profile = Profile.profile_for(self.customer)

    def __get_url(self):
        try:
            url = f"https://{self.profile.shopify_config.key}:{self.profile.shopify_config.password}@{self.profile.shopify_config.domain}/admin/orders/{self.fulfillment['channel_order_id']}/fulfillments/{self.fulfillment['channel_fulfillment_id']}/cancel.json"
            return url
        except Exception as e:
            logger.error("Unable to generate url for shopify cancel fulfillment action.", e)
            print(self.fulfillment)
            return None

    def send(self):
        try:
            if 'channel_fulfillment_id' in self.fulfillment:
                notification_url = self.__get_url()
                if not notification_url:
                    raise ShipNoticeFailedError("Unable to generate url for fulfillment.")
                logger.debug(f"Calling shopify cancel fullfillment with url: {notification_url}")
                response = requests.post(notification_url, headers={'Content-type': 'application/json'})
                logger.debug(f"Fulfillment cancel response: {response}")
                logger.debug(response.content)
                logger.debug(response.text)
                try:
                    order = Orders.for_(self.fulfillment['order_id'])
                    if order:
                        order.chronicle(message=f"Sent fulfillment cancellation to Shopify for fulfillment {self.fulfillment['id']}",
                                        when=datetime.now())
                except Exception as e:
                    logger.exception("Unable to chronical event")
                return
            else:
                logger.warning(f"Unable to send cancellation, fulfillment {self.fulfillment['id']} does not contain a channel_fulfillment_id.")
        except requests.exceptions.RequestException as re:
            logger.error(f"Unable to call cancel notification url [{notification_url}] :: {str(re)}")
            raise re


class CapturePaymentAction:
    def __init__(self, fulfillment):
        self.fulfillment = fulfillment
        self.customer = Customer.fetch_by_edi_id(self.fulfillment['customer_edi_id'])
        self.profile = Profile.profile_for(self.customer)

    def __get_url(self):
        url = f"https://{self.profile.shopify_config.key}:{self.profile.shopify_config.password}@{self.profile.shopify_config.domain}/admin/orders/{self.fulfillment['channel_order_id']}/transactions.json"
        return url

    def __build_payload(self):
        return {
            'kind': 'capture'
        }

    def send(self):
        try:
            notification_url = self.__get_url()
            payload = self.__build_payload()
            json_payload = json.dumps(payload, cls=UtilityEncoder)
            logger.debug(f"Calling shopify capture transaction with url: {notification_url}")
            response = requests.post(notification_url, data=json_payload, headers={'Content-type': 'application/json'})
            logger.debug(f"Capture response: {response}")
            logger.debug(response.raw)
            try:
                order = Orders.for_(self.fulfillment['order_id'])
                if order:
                    order.chronicle(
                        message=f"Captured payment for for fulfillment {self.fulfillment['id']}",
                        when=datetime.now())
            except Exception as e:
                logger.exception("Unable to chronicle event")
            return
        except requests.exceptions.RequestException as re:
            logger.error(f"Unable to call capture url [{notification_url}] :: {str(re)}")
            raise re


class CancelOrderAction:
    def __init__(self, order):
        self.order = order

    def __get_url(self):
        pass

    def __build_payload(self):
        pass

    def send(self):
        pass


# Add a note to order
#
# PUT /admin/api/#{api_version}/orders/#{order_id}.json
# {
#   "order": {
#     "id": 450789469,
#     "note": "Customer contacted us about a custom engraving on this iPod"
#   }
# }
class AddNoteAction:
    def __init__(self, order, note):
        self.order = Orders.for_(order)
        self.note = note
        self.customer = Customer.fetch_by_edi_id(self.order['customer_edi_id'])
        self.profile = Profile.profile_for(self.customer)

    def __get_url(self):
        url = f"https://{self.profile.shopify_config.key}:{self.profile.shopify_config.password}@{self.profile.shopify_config.domain}/admin/orders/{self.order['channel_order_id']}.json"
        return url

    def __build_payload(self):
        return {
            "order": {
                "id": self.order['channel_order_id'],
                "note": self.note
            }
        }

    def send(self):
        try:
            notification_url = self.__get_url()
            payload = self.__build_payload()
            json_payload = json.dumps(payload, cls=UtilityEncoder)
            logger.debug(f"Calling shopify add note with url: {notification_url}")
            response = requests.post(notification_url, data=json_payload, headers={'Content-type': 'application/json'})
            logger.debug(f"Add Note response: {response}")
            logger.debug(response.raw)
            try:
                self.order.chronicle(
                    message=f"Added note.",
                    when=datetime.now())
            except Exception as e:
                logger.exception("Unable to chronicle event")
            return
        except requests.exceptions.RequestException as re:
            logger.error(f"Unable to add note using [{notification_url}] :: {str(re)}")
            raise re


class ManualOrderImportAction:
    def __init__(self, customer_edi_id, order_number, override=False):
        self.order_number = order_number
        self.customer = Customer.fetch_by_edi_id(customer_edi_id)
        self.profile = Profile.profile_for(customer=self.customer)
        self.override = override

    def __get_url(self):
        url = f"https://{self.profile.shopify_config.key}:{self.profile.shopify_config.password}@{self.profile.shopify_config.domain}/admin/orders/{self.order_number}.json"
        return url

    def __retreive_from_shopify(self):
        try:
            shopify_url = self.__get_url()
            logger.debug(f"Calling shopify get order with url: {shopify_url}")
            response = requests.get(shopify_url)
            logger.debug(f"Get order response: {response}")
            logger.debug(response.raw)
            data = response.json()
            if data and 'order' in data:
                return data['order']
            else:
                raise OrderNotFoundError(f"Unable to find shopify order {self.order_number}")
        except requests.exceptions.RequestException as re:
            raise OrderNotFoundError(f"Unable to find shopify order {self.order_number}") from re

    def __import_order(self, order_data):
        #
        # ShopifyOrderRequest()
        # from lib_b2b.shopify_order_builder import ShopifyOrderBuilder
        # return ShopifyOrderBuilder(request)
        # order_data = OrderBuilder.for_(request).build()
        # from lib_b2b.order import Order
        # Order.create(order_data)
        # shopify_dialect = ShopifyDialect(inbound_order=inbound_order, customer=self.customer)
        # shopify_dialect.verify_order(override=self.override)
        # _inbound_order = shopify_dialect.standardize()
        # print(_inbound_order)
        #
        # # TODO: - replace with CreateOrderTransaction
        # Orders.save_record(_inbound_order)
        # return json.dumps({'id': _inbound_order['id']})
        # TODO: - Refactor the Order Builder to enable this use case.  Can't be request based.
        pass

    def do(self):
        order_data = self.__retreive_from_shopify()
        self.__import_order(order_data)
        # TODO: ---
        # try:
        #     order = Orders.for_(self.fulfillment['order_id'])
        #     if order:
        #         order.chronicle(
        #             message=f"Captured payment for for fulfillment {self.fulfillment['id']}",
        #             when=datetime.now())
        # except Exception as e:
        #     logger.exception("Unable to chronicle event")


# Moved here to avoid silly python ImportError
from .orders import Orders
from .order import Order