Repository URL to install this package:
|
Version:
0.4.139 ▾
|
lib-py-b2b
/
standard.py
|
|---|
from typing import Type, Optional
from dateutil.parser import isoparse
from lib_b2b.address import Address
from lib_b2b.order_change import OrderStatusChangeDataProvider
from lib_b2b.order_change.accept import OrderAcceptChangeRequest
from lib_b2b.order_change.address import OrderAddressChangeRequest, OrderAddressChangeDataProvider
from lib_b2b.order_change.cancel import OrderCancelChangeRequest, OrderCancelChangeDataProvider
from lib_b2b.order_change.revision import OrderRevisionChangeRequest, OrderRevisionChangeDataProvider
from lib_b2b.order_change.ship_date import OrderShipDateChangeRequest, OrderShipDateChangeDataProvider
from lib_b2b.order_request import OrderRequestType, OrderRequest
from lib_b2b.order_status import OrderStatus
from .order import Order
from .validator import B2BModel, StandardDialectValidator
from .customer import Customer
from .orders import Orders
from .carrier import TrackingUrl, CarrierType
from jsonschema.exceptions import ValidationError
import logging
from os import environ
from .errors import InvalidAPIKey, OrderExistsError, NotificationFailedError, AcknowledgementUrlNotProvidedError
from aws_xray_sdk.core import xray_recorder
import requests
import json
from datetime import datetime, timezone
from .dialect import Dialect, CreateOrderRequestDataProvider, CancelOrderRequestDataProvider
from .util import UtilityEncoder
log_level = logging.getLevelName(environ['LOG_LEVEL']) if 'LOG_LEVEL' in environ else logging.INFO
logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
datefmt='%d-%m-%Y:%H:%M:%S',
level=log_level)
logger = logging.getLogger('lib-b2b-standard')
class StandardDialect(Dialect, CreateOrderRequestDataProvider, CancelOrderRequestDataProvider):
@staticmethod
def matches_dialect(request):
return 'x-api-key' in request.headers
def __init__(self, request):
self.request = request
self.request_data = None
self.customer = None
self.validator = StandardDialectValidator()
@property
def request_type(self) -> OrderRequestType:
# TODO: - look into imrpoving the intelligence of this
return OrderRequestType.CREATE
def __validate_api_key(self, api_key, customer):
if customer['api_key'] != api_key:
raise InvalidAPIKey(f"{[api_key]} is invalid for customer {customer['customer_edi_id']}")
@xray_recorder.capture()
def verify_request(self):
if not self.request.json_body:
raise ValidationError("JSON body not found.")
self.request_data = self.request.json_body
if 'api-key' not in self.request.headers and 'x-api-key' not in self.request.headers:
logger.error('Missing API Key')
raise InvalidAPIKey('Missing API Key')
api_key = self.request.headers.get('api-key', self.request.headers.get('x-api-key'))
self.customer = Customer.fetch_by_edi_id(customer_edi_id=self.request_data['customer_edi_id'])
self.__validate_api_key(
api_key=api_key,
customer=self.customer
)
@xray_recorder.capture()
def verify_order(self):
self.validator.validate_model(B2BModel.ORDER, self.request_data)
if 'order_lines' not in self.request_data or not self.request_data['order_lines']:
logger.error('Orders must contain lines.')
raise ValidationError('Orders must contain lines.')
order_id = Order.generate_id(self.customer, self.request_data['purchase_order'])
if Orders.exists(order_id=order_id, revision=self.request_data['purchase_order_revision']):
logger.error("Purchase Orders Already Exists.")
raise OrderExistsError(f"Purchase Orders Already Exists.[order: {order_id}, "
f"revision: {self.request_data['purchase_order_revision']}]")
@xray_recorder.capture()
def verify_cancellation(self):
if 'cancellation_reason' not in self.request_data:
raise ValidationError("Missing field cancellation_reason")
if 'purchase_order' not in self.request_data:
raise ValidationError("Missing field purchase_order")
@xray_recorder.capture()
def verify(self, request_type: OrderRequestType):
if self.request:
self.verify_request()
if self.request_data:
if request_type is OrderRequestType.CREATE:
self.verify_order()
elif request_type is OrderRequestType.CANCEL:
self.verify_cancellation()
@property
def cancellation_reason(self):
return self.request_data.get('cancellation_reason', None)
@property
def cancelled_at(self):
return isoparse(self.request_data.get('cancelled_at', datetime.now(tz=timezone.utc).isoformat()))
@property
def order_id(self):
return Order.generate_id(self.customer, self.request_data['purchase_order'])
@property
def shipTo(self):
return Address.from_dict(self.request_data['shipTo'])
def data_provider_for(self, order_request: OrderRequest, change_request_type: Type[OrderRequest.C]):
if change_request_type is OrderAcceptChangeRequest:
class __OCDP(OrderStatusChangeDataProvider):
def data(self) -> dict:
return order_request.request_data
@property
def new_status(self):
return OrderStatus.ACCEPTED
return __OCDP()
elif change_request_type is OrderAddressChangeRequest:
class __OCDP(OrderAddressChangeDataProvider):
@property
def new_address(self) -> Address:
return order_request.dialect.shipTo
def data(self) -> dict:
return order_request.request_data
return __OCDP()
elif change_request_type is OrderCancelChangeRequest:
class __OCDP(OrderCancelChangeDataProvider):
@property
def cancellation_reason(self) -> Optional[str]:
return order_request.request_data.get('cancellation_reason', None)
@property
def cancelled_on(self) -> Optional[datetime]:
cancelled_dt = order_request.request_data.get('cancelled_at', None)
if cancelled_dt:
return isoparse(cancelled_dt)
else:
return None
def data(self) -> dict:
return order_request.request_data
return __OCDP()
elif change_request_type is OrderRevisionChangeRequest:
class __OCDP(OrderRevisionChangeDataProvider):
@property
def new_revision(self) -> Optional[int]:
return self.request.request_data.get('purchase_order_revision')
@property
def request(self) -> 'OrderRequest':
return self.request
def data(self) -> dict:
return self.request.request_data
return __OCDP()
# Don't think that standard orders should support line level cancellations
# elif change_request_type is OrderCancelLineChangeRequest:
# class __OCDP(OrderCancelLineChangeDataProvider):
# def cancellations(self) -> [OrderLineCancellation]:
# return [OrderLineCancellation(
# cancellation_reason=order_request.request_data.get('cancellation_reason', None),
# cancelled_at=isoparse(
# order_request.request_data.get('cancelled_at',
# datetime.now(tz=timezone.utc).isoformat())),
# purchase_order_line=order_request.request_data.get('purchase_order_line')
# )]
#
# def data(self) -> dict:
# return order_request.request_data
# return __OCDP()
elif change_request_type is OrderShipDateChangeRequest:
class __OCDP(OrderShipDateChangeDataProvider):
@property
def new_ship_date(self) -> Optional[datetime]:
datestr = order_request.request_data.get('not_before_date',
order_request.request_data.get('required_date', None))
if datestr:
return isoparse(datestr)
else:
return None
def data(self) -> dict:
return order_request.request_data
return __OCDP()
else:
# We don't support tracking financial status changes for non-shopify orders
raise NotImplementedError
class StandardShipNoticeAction:
def __init__(self, fulfillment):
self.fulfillment = fulfillment
def __get_url(self):
if 'notification_url' in self.fulfillment:
return self.fulfillment['notification_url']
else:
return None
def __build_payload(self):
# Need to decompose fulfillment into a notification per line
notices = []
tracking_numbers = self.fulfillment['tracking_numbers']
tracking_str = ','.join(tracking_numbers)
tracking_url = TrackingUrl.url_for(tracking_str, CarrierType(self.fulfillment.get('carrier_type', CarrierType.FEDEX_SHIP_MGR.value)))
for line in self.fulfillment['line_items']:
ship_notice = {
"customer_edi_id": self.fulfillment['customer_edi_id'],
"purchase_order": self.fulfillment['purchase_order'],
"purchase_order_line": line['purchase_order_line'],
"tracking": [
{
"carrier": self.fulfillment.get('carrier_type', CarrierType.FEDEX_SHIP_MGR.value),
"tracking": self.fulfillment['tracking_numbers'],
"url": tracking_url
}
]
}
if 'channel_name' in self.fulfillment:
ship_notice['channel_name'] = self.fulfillment['channel_name']
if 'channel_order_id' in self.fulfillment:
ship_notice['channel_order_id'] = self.fulfillment['channel_order_id']
notices.append(ship_notice)
return notices
def send(self):
try:
notification_url = self.__get_url()
notices = self.__build_payload()
for notice in notices:
json_payload = json.dumps(notice, cls=UtilityEncoder)
logger.debug(f"Sending ShipNotice with url: {notification_url}")
response = requests.put(notification_url, data=json_payload, headers={'Content-type': 'application/json'})
logger.debug(f"ShipNotice response: {response}")
logger.debug(response.raw)
try:
order = Orders.for_(self.fulfillment['order_id'])
if order:
order.chronicle(
message=f"Sent ship notice for fulfillment {self.fulfillment['id']}",
when=datetime.now(),
data=notice)
except Exception as e:
logger.exception("Unable to chronicle event")
except requests.exceptions.RequestException as re:
raise NotificationFailedError(f"Unable to call notification url [{notification_url}] :: {str(re)}")
class StandardAcknowledgementAction:
def __init__(self, order):
self.order = order
def __get_url(self):
if 'notification_urls' in self.order and 'order_acknowledgement' in self.order['notification_urls']:
return self.order['notification_urls']['order_acknowledgement']
else:
raise AcknowledgementUrlNotProvidedError(f"Acknowledgement url not provided for [{self.order['id']}]")
def __build_payload(self):
ack = {
"customer_edi_id": self.order['customer_edi_id'],
"purchase_order": self.order['purchase_order'],
"order_number": self.order['purchase_order'],
"status": self.order['order_status'],
}
if 'channel_name' in self.order:
ack['channel_name'] = self.order['channel_name']
if 'channel_order_id' in self.order:
ack['channel_order_id'] = self.order['channel_order_id']
if self.order['order_status'] == 'REJECTED':
ack["errors"] = [
{
"message": self.order['rejection_reason'] if 'rejection_reason' in self.order else '',
"type": "order"
}
]
return ack
def send(self):
try:
notification_url = self.__get_url()
payload = self.__build_payload()
json_payload = json.dumps(payload, cls=UtilityEncoder)
logger.debug(f"Sending OrderAck with url: {notification_url}")
response = requests.put(notification_url, data=json_payload, headers={'Content-type': 'application/json'})
logger.debug(f"OrderAck response: {response}")
logger.debug(response.raw)
try:
self.order.chronicle(
message=f"Sent order acknowledgement with status of {self.order.status.value}.",
when=datetime.now(),
data=payload)
except Exception as e:
logger.exception("Unable to chronicle event")
except requests.exceptions.RequestException as re:
raise NotificationFailedError(f"Unable to call notification url [{notification_url}] :: {str(re)}")