Repository URL to install this package:
|
Version:
0.4.139 ▾
|
lib-py-b2b
/
shopify.py
|
|---|
from decimal import Decimal
from typing import Type, Optional
from py_aws_util.logging import log_data
from lib_b2b.address import Address
from lib_b2b.financial_status import FinancialStatus
from lib_b2b.order_builder import OrderBuilder
from lib_b2b.order_change.address import OrderAddressChangeRequest, OrderAddressChangeDataProvider
from lib_b2b.order_change.cancel import OrderCancelChangeRequest, OrderCancelChangeDataProvider
from lib_b2b.order_change.cancel_line import OrderCancelLineChangeRequest, OrderCancelLineChangeDataProvider, \
OrderLineCancellation
from lib_b2b.order_change.paid import OrderFinancialStatusChangeRequest, OrderFinancialStatusChangeDataProvider
from lib_b2b.order_change.revision import OrderRevisionChangeRequest, OrderRevisionChangeDataProvider
from lib_b2b.order_change.ship_date import OrderShipDateChangeRequest, OrderShipDateChangeDataProvider
from lib_b2b.order_change.total import OrderTotalChangeDataProvider, OrderTotalChangeRequest
from lib_b2b.order_request import OrderRequestType, OrderRequest
from lib_b2b.order_totals import OrderTotals
from lib_b2b.refund import Refund, RefundLine
from .errors import InvalidSignatureError, ShopifyOrderExistsError
from .dialect import Dialect, CreateOrderRequestDataProvider, CancelOrderRequestDataProvider
from .util import safe_str, as_decimal
from .customer import Customer
from .util import UtilityEncoder
from .errors import ShipNoticeFailedError, OrderNotFoundError
from .profile import Profile
from jsonschema.exceptions import ValidationError
import logging
from os import environ
import hashlib, base64, hmac
from datetime import datetime, timezone
from dateutil.parser import isoparse
from aws_xray_sdk.core import xray_recorder
import requests
import json
log_level = logging.getLevelName(environ['LOG_LEVEL']) if 'LOG_LEVEL' in environ else logging.INFO
logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
datefmt='%d-%m-%Y:%H:%M:%S',
level=log_level)
logger = logging.getLogger('lib-b2b-shopify')
class ShopifyDialect(Dialect, CreateOrderRequestDataProvider, CancelOrderRequestDataProvider):
@staticmethod
def matches_dialect(request):
return ShopifyDialect.header_exists('x-shopify-shop-domain', request)
def __init__(self, request=None, inbound_order=None, customer=None):
self.request = request
self.request_data = inbound_order
self.customer = customer
self.topic = None
self.shopify_order_id = None
@staticmethod
def get_header(key: str, request):
return request.headers.get(key, request.headers.get(key.title()))
@staticmethod
def header_exists(key: str, request):
return request.headers.get(key, request.headers.get(key.title(), None)) is not None
def __get_header(self, key: str):
return ShopifyDialect.get_header(key, self.request)
def __header_exists(self, key: str):
return ShopifyDialect.header_exists(key, self.request)
def _hmac_is_valid(self, signature):
key = self.customer['shopify_config']['secret'].encode('utf-8')
_hash = hmac.new(key, self.request.raw_body, hashlib.sha256)
hmac_calculated = base64.b64encode(_hash.digest()).decode()
return hmac_calculated == signature
@property
def request_type(self) -> OrderRequestType:
topic = self.__get_header('x-shopify-topic')
return OrderRequestType(topic)
@xray_recorder.capture()
def verify_request(self):
# is the shopify shop domain configured for a valid customer?
shopify_domain = self.__get_header('x-shopify-shop-domain')
# customer fetch_by_container_id will throw exception is customer is not found
self.customer = Customer.fetch_by_shopify_domain(shopify_domain)
# is the request a valid shopify request
if not self.request.json_body:
raise ValidationError("JSON body not found.")
self.request_data = self.request.json_body
logger.debug(self.request_data)
if not self.__header_exists('x-shopify-hmac-sha256'):
raise ValidationError("Missing shopify signature")
signature = self.__get_header('x-shopify-hmac-sha256')
if not self.__header_exists('x-shopify-topic'):
raise ValidationError("Missing shopify topic")
self.topic = self.__get_header('x-shopify-topic')
if self.topic not in ['orders/create', 'orders/paid', 'orders/cancelled', 'orders/updated']:
raise ValidationError(f"Invalid topic [{self.topic}]")
if not self.__header_exists('x-shopify-order-id'):
raise ValidationError(f"Missing shopify order id")
self.shopify_order_id = self.__get_header('x-shopify-order-id')
if not self._hmac_is_valid(signature):
raise InvalidSignatureError("Shopify signature not valid")
@xray_recorder.capture()
def verify_order(self, override=False):
from .order_status import OrderStatus
order_id = Order.generate_id(self.customer, self.request_data['order_number'])
revision = 1
if self.topic == 'orders/create':
if Orders.exists(order_id=order_id, revision=revision) and not override:
existing_order = Orders.for_(order_id)
if 'order_status' in existing_order and OrderStatus.for_(existing_order['order_status']) == OrderStatus.REJECTED:
logger.info(f"Allowing order {order_id} to be re-submitted because it was previously rejected.")
else:
logger.warning("Purchase Orders Already Exists.")
raise ShopifyOrderExistsError(f"Purchase Orders Already Exists.[order: {order_id}, "
f"revision: {revision}]")
if 'shipping_address' not in self.request_data:
from .notification import Notifier, NotificationType, ErrorNotificationData
Notifier.notify(
notification_type=NotificationType.BUSINESS,
customer=self.customer['customer_edi_id'],
subject=f"B2B Orders Error ({order_id})",
data=ErrorNotificationData(
order_id=order_id,
message="The order was not imported due to:",
errors=[f'Orders {order_id} does not have a shipping address']
)
)
raise ValidationError(f'Orders {order_id} does not have a shipping address')
@xray_recorder.capture()
def verify_cancellation(self):
if 'cancel_reason' not in self.request_data:
raise ValidationError("Missing field cancellation_reason")
@xray_recorder.capture()
def verify(self, request_type: OrderRequestType):
if self.request:
self.verify_request()
if self.request_data:
if request_type is OrderRequestType.CREATE:
self.verify_order()
elif request_type is OrderRequestType.CANCEL:
self.verify_cancellation()
@property
def order_id(self):
return Order.generate_id(self.customer, self.request_data['order_number'])
@property
def cancellation_reason(self):
return self.request_data.get('cancel_reason', None)
@property
def cancelled_at(self):
return isoparse(self.request_data.get('cancelled_at', datetime.now(tz=timezone.utc).isoformat()))
@property
def updated_at(self):
return isoparse(self.request_data.get('updated_at',
datetime.now(tz=timezone.utc).isoformat())).astimezone(timezone.utc)
@property
def shipTo(self):
_shipping_address = self.request_data.get('shipping_address')
_country = 'USA'
return Address(
address1=safe_str(_shipping_address, 'address1'),
address2=safe_str(_shipping_address, 'address2'),
city=safe_str(_shipping_address, 'city'),
state=safe_str(_shipping_address, 'province_code'),
postalCode=safe_str(_shipping_address, 'zip'),
country='USA',
phone=safe_str(_shipping_address, 'phone'),
name=safe_str(_shipping_address, 'name'),
company=safe_str(_shipping_address, 'company')
)
@property
def refunds(self) -> [Refund]:
_refunds = []
if self.request_data.get('refunds'):
for _refund in self.request_data.get('refunds'):
_lines = []
for _refund_line in _refund.get('refund_line_items'):
_lines.append(
RefundLine(
line_id=_refund_line.get('id'),
quantity=_refund_line.get('quantity'),
line_item_id=_refund_line.get('line_item_id'),
product_id=_refund_line.get('line_item', {'sku': 'Unknown'}).get('sku'),
amount=Decimal(
str((_refund_line.get('subtotal', 0.0) + _refund_line.get('total_tax', 0.0))))
)
)
_refunds.append(
Refund(
refund_id=_refund.get('id'),
order_id=self.order_id,
created_at=isoparse(_refund.get('created_at', datetime.now(tz=timezone.utc).isoformat())),
note=_refund.get('note', '-'),
lines=_lines
)
)
return _refunds
def data_provider_for(self, order_request: OrderRequest, change_request_type: Type[OrderRequest.C]):
if change_request_type is OrderAddressChangeRequest:
class __OCDP(OrderAddressChangeDataProvider):
def version(self) -> Optional[str]:
return order_request.dialect.updated_at.isoformat()
@property
def new_address(self) -> Address:
return order_request.dialect.shipTo
def data(self) -> dict:
return order_request.request_data
return __OCDP()
elif change_request_type is OrderCancelChangeRequest:
class __OCDP(OrderCancelChangeDataProvider):
def version(self) -> Optional[str]:
return order_request.dialect.updated_at.isoformat()
@property
def cancellation_reason(self) -> Optional[str]:
return order_request.request_data.get('cancel_reason', None)
@property
def cancelled_on(self) -> Optional[datetime]:
cancelled_dt = order_request.request_data.get('cancelled_at', None)
if cancelled_dt:
return isoparse(cancelled_dt)
else:
return None
def data(self) -> dict:
return order_request.request_data
return __OCDP()
elif change_request_type is OrderCancelLineChangeRequest:
class __OCDP(OrderCancelLineChangeDataProvider):
def version(self) -> Optional[str]:
return order_request.dialect.updated_at.isoformat()
@property
def cancellations(self) -> [OrderLineCancellation]:
_cancellation_lists = list(map(lambda x: x.as_cancellations(), order_request.dialect.refunds or []))
_cancellations = [cancellation for cancellation_list in _cancellation_lists for cancellation in cancellation_list]
return _cancellations
def data(self) -> dict:
return order_request.request_data
return __OCDP()
elif change_request_type is OrderShipDateChangeRequest:
class __OCDP(OrderShipDateChangeDataProvider):
def version(self) -> Optional[str]:
return order_request.dialect.updated_at.isoformat()
@property
def new_ship_date(self) -> Optional[datetime]:
not_before_date = None
try:
if order_request.request_data.get('note_attributes'):
delay_list = list(map(lambda y: y['value'], filter(lambda x: x['name'] == 'Delay Date',
order_request.request_data.get('note_attributes'))))
if delay_list:
not_before_date = datetime.strptime(delay_list[0], '%Y-%m-%d')
except Exception as e:
logger.exception(
f"Error attempting to parse Delay Date from "
f"Shopify order {order_request.dialect.order_id}", e)
return not_before_date
def data(self) -> dict:
return order_request.request_data
return __OCDP()
elif change_request_type is OrderFinancialStatusChangeRequest:
class __OCDP(OrderFinancialStatusChangeDataProvider):
def version(self) -> Optional[str]:
return order_request.dialect.updated_at.isoformat()
@property
def financial_status(self) -> FinancialStatus:
return FinancialStatus(order_request.request_data.get('financial_status'))
def data(self) -> dict:
return order_request.request_data
return __OCDP()
elif change_request_type is OrderRevisionChangeRequest:
class __OCDP(OrderRevisionChangeDataProvider):
def version(self) -> Optional[str]:
return order_request.dialect.updated_at.isoformat()
@property
def new_revision(self) -> Optional[int]:
return None
@property
def request(self) -> 'OrderRequest':
return self.request
def data(self) -> dict:
return self.request.request_data
return __OCDP()
elif change_request_type is OrderTotalChangeRequest:
_order_data = OrderBuilder.for_(order_request).build()
_order = Order.from_dict(_order_data)
class __OCDP(OrderTotalChangeDataProvider):
def version(self) -> Optional[str]:
return order_request.dialect.updated_at.isoformat()
@property
def new_totals(self) -> OrderTotals:
return OrderTotals.create(
order_lines=_order.lines,
discounts=_order.discounts,
additional_charges=_order.additional_charges,
refunds=order_request.dialect.refunds,
total_amount=as_decimal(order_request.request_data.get('total_price'), 'total_amount'),
total_line_amount=as_decimal(order_request.request_data.get('total_line_items_price'), 'total_line_amount'),
total_tax=as_decimal(order_request.request_data.get('total_tax'), 'total_tax'),
total_discount=as_decimal(order_request.request_data.get('total_discounts'), 'total_discount')
)
def data(self) -> dict:
return order_request.request_data
return __OCDP()
else:
# We don't support tracking financial status changes for non-shopify orders
raise NotImplementedError
class FullfillmentAction:
def __init__(self, fulfillment):
self.fulfillment = fulfillment
self.customer = Customer.fetch_by_edi_id(self.fulfillment['customer_edi_id'])
self.profile = Profile.profile_for(self.customer)
self.location = self.profile.shopify_config.default_location
def __get_url(self):
try:
url = f"https://{self.profile.shopify_config.key}:{self.profile.shopify_config.password}@{self.profile.shopify_config.domain}/admin/orders/{self.fulfillment['channel_order_id']}/fulfillments.json"
return url
except Exception as e:
logger.error("Unable to generate url for shopify fullfillment action.", e)
print(self.fulfillment)
return None
def __build_payload(self):
try:
fulfillment = {}
fulfillment['location_id'] = self.location
# Should always have the fulfillment object
fulfillment['tracking_company'] = self.fulfillment['carrier']
fulfillment['tracking_numbers'] = self.fulfillment['tracking_numbers']
fulfillment['tracking_urls'] = self.fulfillment['tracking_urls']
fulfillment['line_items'] = self.fulfillment['line_items']
for item in fulfillment['line_items']:
item['id'] = item['channel_line_id']
del item['purchase_order_line']
del item['line_id']
del item['channel_line_id']
payload = {
"fulfillment": fulfillment
}
return payload
except Exception as e:
logger.error("Unable to build payload for shopify fullfillment action.", e)
print(self.fulfillment)
return None
def send(self):
try:
notification_url = self.__get_url()
if not notification_url:
raise ShipNoticeFailedError("Unable to generate url for fulfillment.")
payload = self.__build_payload()
if not payload:
raise ShipNoticeFailedError("Unable to generate payload for fulfillment.")
json_payload = json.dumps(payload, cls=UtilityEncoder)
logger.debug(f"Calling shopify fullfillment with url: {notification_url}")
logger.debug(json_payload)
response = requests.post(notification_url, data=json_payload, headers={'Content-type': 'application/json'})
logger.debug(f"Fulfillment response: {response}")
logger.debug(response.content)
logger.debug(response.text)
try:
from .fulfillment import Fulfillment
response_json = response.json()
channel_fulfillment_id = response_json['fulfillment']['id']
Fulfillment.update_channel_id(fulfillment_id=self.fulfillment['id'], channel_fulfillment_id=channel_fulfillment_id)
order = Orders.for_(self.fulfillment['order_id'])
if order:
line_item_str = ', '.join([line['purchase_order_line'] for line in self.fulfillment['line_items'] or []] or [])
order.chronicle(message=f"Sent fulfillment information to Shopify for lines: {line_item_str}",
when=datetime.now(),
data=payload)
except OrderNotFoundError:
logger.error(f"Unable to find order for fulfillment {self.fulfillment['id']}",
extra=log_data(fulfillment_id=self.fulfillment['id']))
except ValueError as ve:
logger.warning("Fulfillment response did not contain json.", ve)
except KeyError as ke:
logger.warning(f"Unable to extract channel fulfillment id from {str(response_json)} :: {str(ke)}")
if response.status_code == 422:
logger.info(response.text)
logger.info("Considering ship notice successful since it's already fulfilled")
elif response.status_code > 400:
e = ShipNoticeFailedError(str(response) + ' on fulfillment ' + self.fulfillment['id'])
logger.error(response.text)
logger.error(e)
raise e
return
except requests.exceptions.RequestException as re:
logger.error(f"Unable to call notification url [{notification_url}] :: {str(re)}")
raise re
class CancelFullfillmentAction:
def __init__(self, fulfillment):
self.fulfillment = fulfillment
self.customer = Customer.fetch_by_edi_id(self.fulfillment['customer_edi_id'])
self.profile = Profile.profile_for(self.customer)
def __get_url(self):
try:
url = f"https://{self.profile.shopify_config.key}:{self.profile.shopify_config.password}@{self.profile.shopify_config.domain}/admin/orders/{self.fulfillment['channel_order_id']}/fulfillments/{self.fulfillment['channel_fulfillment_id']}/cancel.json"
return url
except Exception as e:
logger.error("Unable to generate url for shopify cancel fulfillment action.", e)
print(self.fulfillment)
return None
def send(self):
try:
if 'channel_fulfillment_id' in self.fulfillment:
notification_url = self.__get_url()
if not notification_url:
raise ShipNoticeFailedError("Unable to generate url for fulfillment.")
logger.debug(f"Calling shopify cancel fullfillment with url: {notification_url}")
response = requests.post(notification_url, headers={'Content-type': 'application/json'})
logger.debug(f"Fulfillment cancel response: {response}")
logger.debug(response.content)
logger.debug(response.text)
try:
order = Orders.for_(self.fulfillment['order_id'])
if order:
order.chronicle(message=f"Sent fulfillment cancellation to Shopify for fulfillment {self.fulfillment['id']}",
when=datetime.now())
except Exception as e:
logger.exception("Unable to chronical event")
return
else:
logger.warning(f"Unable to send cancellation, fulfillment {self.fulfillment['id']} does not contain a channel_fulfillment_id.")
except requests.exceptions.RequestException as re:
logger.error(f"Unable to call cancel notification url [{notification_url}] :: {str(re)}")
raise re
class CapturePaymentAction:
def __init__(self, fulfillment):
self.fulfillment = fulfillment
self.customer = Customer.fetch_by_edi_id(self.fulfillment['customer_edi_id'])
self.profile = Profile.profile_for(self.customer)
def __get_url(self):
url = f"https://{self.profile.shopify_config.key}:{self.profile.shopify_config.password}@{self.profile.shopify_config.domain}/admin/orders/{self.fulfillment['channel_order_id']}/transactions.json"
return url
def __build_payload(self):
return {
'kind': 'capture'
}
def send(self):
try:
notification_url = self.__get_url()
payload = self.__build_payload()
json_payload = json.dumps(payload, cls=UtilityEncoder)
logger.debug(f"Calling shopify capture transaction with url: {notification_url}")
response = requests.post(notification_url, data=json_payload, headers={'Content-type': 'application/json'})
logger.debug(f"Capture response: {response}")
logger.debug(response.raw)
try:
order = Orders.for_(self.fulfillment['order_id'])
if order:
order.chronicle(
message=f"Captured payment for for fulfillment {self.fulfillment['id']}",
when=datetime.now())
except Exception as e:
logger.exception("Unable to chronicle event")
return
except requests.exceptions.RequestException as re:
logger.error(f"Unable to call capture url [{notification_url}] :: {str(re)}")
raise re
class CancelOrderAction:
def __init__(self, order):
self.order = order
def __get_url(self):
pass
def __build_payload(self):
pass
def send(self):
pass
# Add a note to order
#
# PUT /admin/api/#{api_version}/orders/#{order_id}.json
# {
# "order": {
# "id": 450789469,
# "note": "Customer contacted us about a custom engraving on this iPod"
# }
# }
class AddNoteAction:
def __init__(self, order, note):
self.order = Orders.for_(order)
self.note = note
self.customer = Customer.fetch_by_edi_id(self.order['customer_edi_id'])
self.profile = Profile.profile_for(self.customer)
def __get_url(self):
url = f"https://{self.profile.shopify_config.key}:{self.profile.shopify_config.password}@{self.profile.shopify_config.domain}/admin/orders/{self.order['channel_order_id']}.json"
return url
def __build_payload(self):
return {
"order": {
"id": self.order['channel_order_id'],
"note": self.note
}
}
def send(self):
try:
notification_url = self.__get_url()
payload = self.__build_payload()
json_payload = json.dumps(payload, cls=UtilityEncoder)
logger.debug(f"Calling shopify add note with url: {notification_url}")
response = requests.post(notification_url, data=json_payload, headers={'Content-type': 'application/json'})
logger.debug(f"Add Note response: {response}")
logger.debug(response.raw)
try:
self.order.chronicle(
message=f"Added note.",
when=datetime.now())
except Exception as e:
logger.exception("Unable to chronicle event")
return
except requests.exceptions.RequestException as re:
logger.error(f"Unable to add note using [{notification_url}] :: {str(re)}")
raise re
class ManualOrderImportAction:
def __init__(self, customer_edi_id, order_number, override=False):
self.order_number = order_number
self.customer = Customer.fetch_by_edi_id(customer_edi_id)
self.profile = Profile.profile_for(customer=self.customer)
self.override = override
def __get_url(self):
url = f"https://{self.profile.shopify_config.key}:{self.profile.shopify_config.password}@{self.profile.shopify_config.domain}/admin/orders/{self.order_number}.json"
return url
def __retreive_from_shopify(self):
try:
shopify_url = self.__get_url()
logger.debug(f"Calling shopify get order with url: {shopify_url}")
response = requests.get(shopify_url)
logger.debug(f"Get order response: {response}")
logger.debug(response.raw)
data = response.json()
if data and 'order' in data:
return data['order']
else:
raise OrderNotFoundError(f"Unable to find shopify order {self.order_number}")
except requests.exceptions.RequestException as re:
raise OrderNotFoundError(f"Unable to find shopify order {self.order_number}") from re
def __import_order(self, order_data):
#
# ShopifyOrderRequest()
# from lib_b2b.shopify_order_builder import ShopifyOrderBuilder
# return ShopifyOrderBuilder(request)
# order_data = OrderBuilder.for_(request).build()
# from lib_b2b.order import Order
# Order.create(order_data)
# shopify_dialect = ShopifyDialect(inbound_order=inbound_order, customer=self.customer)
# shopify_dialect.verify_order(override=self.override)
# _inbound_order = shopify_dialect.standardize()
# print(_inbound_order)
#
# # TODO: - replace with CreateOrderTransaction
# Orders.save_record(_inbound_order)
# return json.dumps({'id': _inbound_order['id']})
# TODO: - Refactor the Order Builder to enable this use case. Can't be request based.
pass
def do(self):
order_data = self.__retreive_from_shopify()
self.__import_order(order_data)
# TODO: ---
# try:
# order = Orders.for_(self.fulfillment['order_id'])
# if order:
# order.chronicle(
# message=f"Captured payment for for fulfillment {self.fulfillment['id']}",
# when=datetime.now())
# except Exception as e:
# logger.exception("Unable to chronicle event")
# Moved here to avoid silly python ImportError
from .orders import Orders
from .order import Order