Repository URL to install this package:
|
Version:
0.4.201 ▾
|
lib-py-b2b
/
order_status_event.py
|
|---|
from abc import ABC, abstractmethod
from datetime import datetime
from typing import List, Tuple, DefaultDict
from collections import defaultdict
from boto3.dynamodb.conditions import Attr, Or
from dateutil.tz import UTC
from py_aws_util.logging import log_data
from lib_b2b.change import ChangeRecord
from lib_b2b.erp import ERP
from lib_b2b.financial_status import FinancialStatus
from .notification import Notifier, NotificationType, ErrorNotificationData
from lib_b2b.order_flow import OrderFlowType
from .event import B2BTopic, B2BEvent
from .orders import Orders
from .order import Order
from .errors import BusinessError, OrderValidationError, ErrorRecord, NotFoundError, RejectionReason, OrderNotFoundError
from .order_status import OrderStatus
import logging
from os import environ
logger = logging.getLogger(__name__)
class OrderStatusEventListener(ABC):
@abstractmethod
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
"""
Handle status changes on order
:param from_status: string or OrderStatus
:param to_status: string or OrderStatus
:param order: order_id as string or b2b order object
:return: None
"""
pass
class OrderStatusEventBroker:
@staticmethod
def initialize():
"""
Default initialization and subscriptions
:return:
"""
_broker = OrderStatusEventBroker()
_broker.subscribe(OrderStatus.ANY, OrderStatus.RECEIVED, AnyToReceivedEventHandler())
_broker.subscribe(OrderStatus.NONE, OrderStatus.VALID, NoneToValidEventHandler())
_broker.subscribe(OrderStatus.RECEIVED, OrderStatus.VALID, ReceivedToValidEventHandler())
_broker.subscribe(OrderStatus.ANY, OrderStatus.ACCEPTED, AnyToAcceptedEventHandler())
_broker.subscribe(OrderStatus.ANY, OrderStatus.REJECTED, AnyToRejectedEventHandler())
_broker.subscribe(OrderStatus.ANY, OrderStatus.ENTERED, AnyToEnteredEventHandler())
_broker.subscribe(OrderStatus.ANY, OrderStatus.PARTIALLY_SHIPPED, AnyToPartiallyShippedEventHandler())
_broker.subscribe(OrderStatus.ANY, OrderStatus.SHIPPED, AnyToShippedEventHandler())
return _broker
def __init__(self):
self.subscribers: DefaultDict[Tuple[OrderStatus, OrderStatus], List[OrderStatusEventListener]] = defaultdict(list)
def subscribe(self, from_status, to_status, subscriber: OrderStatusEventListener):
status_tuple = (OrderStatus.for_(from_status), OrderStatus.for_(to_status))
self.subscribers[status_tuple].append(subscriber)
def unsubscribe(self, from_status, to_status, subscriber: OrderStatusEventListener):
status_tuple = (OrderStatus.for_(from_status), OrderStatus.for_(to_status))
self.subscribers[status_tuple].remove(subscriber)
def publish(self, from_status, to_status, order):
"""
Handle status changes on order. The events are post change in the status.
:param from_status: string or OrderStatus
:param to_status: string or OrderStatus
:param order: order_id as string or b2b order object
:return: None
"""
_order = Orders.for_(order)
status_tuples = []
_from_status = OrderStatus.for_(from_status)
_to_status = OrderStatus.for_(to_status)
_status_tuple = (_from_status, _to_status)
status_tuples.append(_status_tuple)
status_tuples.append((OrderStatus.ANY, _to_status))
status_tuples.append((_from_status, OrderStatus.ANY))
logger.info(f"Orders {_order['id']} :: Status Change {_from_status} -> {_to_status}")
# Send SNS events for status change
try:
B2BEvent.notify(B2BTopic[_to_status.value], _order['id'])
except KeyError:
logger.warning(f"{_order['order_status']} is not a valid B2BTopic.")
for status_tuple in status_tuples:
if status_tuple in self.subscribers:
for subscriber in self.subscribers[status_tuple]:
logger.debug(f"Notifying subscriber {type(subscriber).__name__} of status change {status_tuple}")
try:
subscriber.onStatusChange(from_status=_from_status, to_status=_to_status, order=_order)
except BusinessError as be:
_order.record_error(error_code='OF-001', error_msg=be.message)
logger.exception(f"Subscriber {type(subscriber).__name__} raised an exception while handling {status_tuple}")
except Exception as e:
logger.exception(f"Subscriber {type(subscriber).__name__} raised an exception while handling {status_tuple}")
else:
logger.debug(f"No subscribers found for {status_tuple}")
class AnyToReceivedEventHandler(OrderStatusEventListener):
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
try:
if order.is_test():
order.fast_forward()
else:
order.validate()
order.prepare()
order.status = OrderStatus.VALID
B2BEvent.notify(B2BTopic.VALID, order.order_id)
except OrderValidationError as ove:
order.reject(reasons=ove.reasons)
B2BEvent.notify(B2BTopic.REJECTED, order.order_id)
class NoneToValidEventHandler(OrderStatusEventListener):
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
# This is a bug in an older public interface that customers are still using.
# So, we are gonna rewind it back to the RECEIVED state
logger.info("Rewinding order.")
order.status = OrderStatus.RECEIVED
class ReceivedToValidEventHandler(OrderStatusEventListener):
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
try:
ERP.for_(order.customer_edi_id).add(order)
order.chronicle(message="Added order to ERP", when=datetime.now().astimezone(UTC))
profile = Profile.profile_for(customer=order.customer_edi_id)
if profile.integration_config.wait_for_payment_before_ready and \
order.financial_status < FinancialStatus.PAID:
try:
condition = Attr('order_status').eq(OrderStatus.VALID.value)
order.set_status(new_status=OrderStatus.UNPAID, conditions=condition)
except ValueError:
logger.exception("Unable to set order status to UNPAID because it is not in status VALID.",
extra=log_data(order_id=order.order_id, order_status=order.status.value))
else:
order.status = OrderStatus.READY
except NotFoundError as nfe:
reasons=[RejectionReason(
rejection_type='order_line',
message=nfe.sanitized_message
)]
order.reject(reasons=reasons)
B2BEvent.notify(B2BTopic.REJECTED, order.order_id)
class AnyToAcceptedEventHandler(OrderStatusEventListener):
"""
This should only happen in manual order flows
"""
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
logger.debug(f"Processing acknowledgement for order {order['id']}.")
Acknowledgement(order).notify()
order.chronicle(message="Accept acknowledgement sent.")
if order.is_test():
logger.info(f"Skipping ERP lookup for TEST order {order.order_id}")
else:
profile = Profile.profile_for(customer=order.customer_edi_id)
order_flow_type = OrderFlowType.for_(profile.integration_config.order_flow_type)
if order_flow_type == OrderFlowType.MANUAL:
order.status = OrderStatus.ENTERED
else:
if 'glovia_sales_order' not in order:
logger.debug(f"Checking to see if order exists in Glovia {order['id']}.")
glovia_order = order.update_erp_reference()
if glovia_order:
order.status = OrderStatus.ENTERED
else:
order.error(ErrorRecord(code="ORDER-67", msg="Unable to find accepted order in the ERP."))
else:
order.status = OrderStatus.ENTERED
class AnyToRejectedEventHandler(OrderStatusEventListener):
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
logger.debug(f"Processing acknowledgement for order {order['id']}.")
Acknowledgement(order).notify()
order.chronicle(message="Reject acknowledgement sent.")
profile = Profile.profile_for(customer=order['customer_edi_id'])
order_flow_type = OrderFlowType.for_(profile.integration_config.order_flow_type)
if order_flow_type == OrderFlowType.AUTOMATED_GLOVIA:
ERP.for_(order.customer_edi_id).reject(order)
order.chronicle(message="Processed order rejection in ERP.")
# Notify business error recipients
updated_order = Orders.for_(order['id'])
if 'rejection_errors' in updated_order:
Notifier.notify(
notification_type=NotificationType.BUSINESS,
customer=updated_order['customer_edi_id'],
subject=f"B2B Orders Rejected ({updated_order['purchase_order']})",
data=ErrorNotificationData(
order_id=updated_order['id'],
message="The order was rejected due to:",
errors=updated_order['rejection_errors']
)
)
class AnyToEnteredEventHandler(OrderStatusEventListener):
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
profile = Profile.profile_for(customer=order['customer_edi_id'])
order_flow_type = OrderFlowType.for_(profile.integration_config.order_flow_type)
if order_flow_type is OrderFlowType.AUTOMATED_GLOVIA:
ERP.for_(order.customer_edi_id).accept(order)
order.chronicle(message="Executed post order acceptance logic for ERP.")
for addl_chg in order.additional_charges:
if addl_chg.recycling:
fulfillment = ff.Fulfillment.create_for_additional_charge(additional_charge=addl_chg, order=order)
logger.info(f"Created fulfillment record for recycle item. {fulfillment['id']}")
order.chronicle(message="Created fulfillment record for recycle item.")
class AnyToPartiallyShippedEventHandler(OrderStatusEventListener):
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
# Occasionally an order seems to go from a pre-entered status straight to the shipping statuses.
# This bypasses the post accept erp update that is necessary for discounts and taxes. So, we're
# making sure that the logic is run
if from_status < OrderStatus.ENTERED:
try:
AnyToEnteredEventHandler().onStatusChange(from_status=from_status,
to_status=OrderStatus.ENTERED,
order=order)
except OrderNotFoundError as onfe:
order.record_warning(code='ORDER-811', error_msg=f"Order not found in ERP while shipping order. This "
f"can happen if closing an order as shipped that is "
f"not in the ERP. {onfe.message}")
logger.warning(onfe.sanitized_message)
except BusinessError as be:
order.record_error(error_code='ORDER-831', error_msg=be.message)
logger.exception(be.sanitized_message)
except Exception as e:
logger.exception(str(e))
class AnyToShippedEventHandler(OrderStatusEventListener):
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
logger.debug(f"Checking to see if order exists in Glovia {order['id']}.")
if 'glovia_sales_order' not in order and not order.is_test():
logger.debug(f"Checking to see if order exists in Glovia {order['id']}.")
order.update_erp_reference()
# Occasionally an order seems to go from a pre-entered status straight to the shipping statuses.
# This bypasses the post accept erp update that is necessary for discounts and taxes. So, we're
# making sure that the logic is run
if from_status < OrderStatus.ENTERED:
try:
AnyToEnteredEventHandler().onStatusChange(from_status=from_status,
to_status=OrderStatus.ENTERED,
order=order)
except OrderNotFoundError as onfe:
order.record_warning(code='ORDER-811', error_msg=f"Order not found in ERP while shipping order. This "
f"can happen if closing an order as shipped that is "
f"not in the ERP. {onfe.message}")
logger.warning(onfe.sanitized_message)
except BusinessError as be:
order.record_error(error_code='ORDER-831', error_msg=be.message)
logger.exception(be.sanitized_message)
except Exception as e:
logger.exception(str(e))
# The fulfillment event logic should handle sending ship notices. So, nothing else to do here.
# Moved here to avoid circular import problems (strange)
from lib_b2b.profile import Profile
from lib_b2b.acknowledge import Acknowledgement
from . import fulfillment as ff