Repository URL to install this package:
|
Version:
0.4.139 ▾
|
lib-py-b2b
/
order_status_event.py
|
|---|
from abc import ABC, abstractmethod
from datetime import datetime
from typing import List, Tuple, DefaultDict
from collections import defaultdict
from boto3.dynamodb.conditions import Attr, Or
from py_aws_util.logging import log_data
from lib_b2b.change import ChangeRecord
from lib_b2b.erp import ERP
from lib_b2b.financial_status import FinancialStatus
from .notification import Notifier, NotificationType, ErrorNotificationData
from lib_b2b.order_flow import OrderFlowType
from .event import B2BTopic, B2BEvent
from .orders import Orders
from .order import Order
from .errors import BusinessError, OrderValidationError, ErrorRecord
from .order_status import OrderStatus
import logging
from os import environ
log_level = logging.getLevelName(environ['LOG_LEVEL']) if 'LOG_LEVEL' in environ else logging.INFO
logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
datefmt='%d-%m-%Y:%H:%M:%S',
level=log_level)
logger = logging.getLogger('lib-b2b-order-status')
class OrderStatusEventListener(ABC):
@abstractmethod
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
"""
Handle status changes on order
:param from_status: string or OrderStatus
:param to_status: string or OrderStatus
:param order: order_id as string or b2b order object
:return: None
"""
pass
class OrderStatusEventBroker:
@staticmethod
def initialize():
"""
Default initialization and subscriptions
:return:
"""
_broker = OrderStatusEventBroker()
_broker.subscribe(OrderStatus.ANY, OrderStatus.RECEIVED, AnyToReceivedEventHandler())
_broker.subscribe(OrderStatus.NONE, OrderStatus.VALID, NoneToValidEventHandler())
_broker.subscribe(OrderStatus.RECEIVED, OrderStatus.VALID, ReceivedToValidEventHandler())
_broker.subscribe(OrderStatus.ANY, OrderStatus.ACCEPTED, AnyToAcceptedEventHandler())
_broker.subscribe(OrderStatus.ANY, OrderStatus.REJECTED, AnyToRejectedEventHandler())
_broker.subscribe(OrderStatus.ANY, OrderStatus.ENTERED, AnyToEnteredEventHandler())
_broker.subscribe(OrderStatus.ANY, OrderStatus.PARTIALLY_SHIPPED, AnyToPartiallyShippedEventHandler())
_broker.subscribe(OrderStatus.ANY, OrderStatus.SHIPPED, AnyToShippedEventHandler())
return _broker
def __init__(self):
self.subscribers: DefaultDict[Tuple[OrderStatus, OrderStatus], List[OrderStatusEventListener]] = defaultdict(list)
def subscribe(self, from_status, to_status, subscriber: OrderStatusEventListener):
status_tuple = (OrderStatus.for_(from_status), OrderStatus.for_(to_status))
self.subscribers[status_tuple].append(subscriber)
def unsubscribe(self, from_status, to_status, subscriber: OrderStatusEventListener):
status_tuple = (OrderStatus.for_(from_status), OrderStatus.for_(to_status))
self.subscribers[status_tuple].remove(subscriber)
def publish(self, from_status, to_status, order):
"""
Handle status changes on order. The events are post change in the status.
:param from_status: string or OrderStatus
:param to_status: string or OrderStatus
:param order: order_id as string or b2b order object
:return: None
"""
_order = Orders.for_(order)
status_tuples = []
_from_status = OrderStatus.for_(from_status)
_to_status = OrderStatus.for_(to_status)
_status_tuple = (_from_status, _to_status)
status_tuples.append(_status_tuple)
status_tuples.append((OrderStatus.ANY, _to_status))
status_tuples.append((_from_status, OrderStatus.ANY))
logger.info(f"Orders {_order['id']} :: Status Change {_from_status} -> {_to_status}")
# Send SNS events for status change
try:
B2BEvent.notify(B2BTopic[_to_status.value], _order['id'])
except KeyError:
logger.warning(f"{_order['order_status']} is not a valid B2BTopic.")
for status_tuple in status_tuples:
if status_tuple in self.subscribers:
for subscriber in self.subscribers[status_tuple]:
logger.debug(f"Notifying subscriber {type(subscriber).__name__} of status change {status_tuple}")
try:
subscriber.onStatusChange(from_status=_from_status, to_status=_to_status, order=_order)
except BusinessError as be:
_order.record_error(error_code='OF-001', error_msg=be.message)
logger.exception(f"Subscriber {type(subscriber).__name__} raised an exception while handling {status_tuple}")
except Exception as e:
logger.exception(f"Subscriber {type(subscriber).__name__} raised an exception while handling {status_tuple}")
else:
logger.debug(f"No subscribers found for {status_tuple}")
class AnyToReceivedEventHandler(OrderStatusEventListener):
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
try:
order.validate()
order.prepare()
order.status = OrderStatus.VALID
B2BEvent.notify(B2BTopic.VALID, order.order_id)
except OrderValidationError as ove:
order.reject(reasons=ove.reasons)
B2BEvent.notify(B2BTopic.REJECTED, order.order_id)
class NoneToValidEventHandler(OrderStatusEventListener):
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
# This is a bug in an older public interface that customers are still using.
# So, we are gonna rewind it back to the RECEIVED state
logger.info("Rewinding order.")
order.status = OrderStatus.RECEIVED
class ReceivedToValidEventHandler(OrderStatusEventListener):
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
ERP.for_(order.customer_edi_id).add(order)
order.chronicle(message="Added order to ERP", when=datetime.now())
profile = Profile.profile_for(customer=order.customer_edi_id)
if profile.integration_config.wait_for_payment_before_ready and \
order.financial_status < FinancialStatus.PAID:
try:
condition = Attr('order_status').eq(OrderStatus.VALID.value)
order.set_status(new_status=OrderStatus.UNPAID, conditions=condition)
except ValueError:
logger.exception("Unable to set order status to UNPAID because it is not in status VALID.",
extra=log_data(order_id=order.order_id, order_status=order.status.value))
else:
order.status = OrderStatus.READY
class AnyToAcceptedEventHandler(OrderStatusEventListener):
"""
This should only happen in manual order flows
"""
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
logger.debug(f"Processing acknowledgement for order {order['id']}.")
Acknowledgement(order).notify()
order.chronicle(message="Accept acknowledgement sent.")
profile = Profile.profile_for(customer=order.customer_edi_id)
order_flow_type = OrderFlowType.for_(profile.integration_config.order_flow_type)
if order_flow_type == OrderFlowType.MANUAL:
order.status = OrderStatus.ENTERED
else:
if 'glovia_sales_order' not in order:
logger.debug(f"Checking to see if order exists in Glovia {order['id']}.")
glovia_order = order.update_erp_reference()
if glovia_order:
order.status = OrderStatus.ENTERED
else:
order.error(ErrorRecord(code="ORDER-67", msg="Unable to find accepted order in the ERP."))
else:
order.status = OrderStatus.ENTERED
class AnyToRejectedEventHandler(OrderStatusEventListener):
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
logger.debug(f"Processing acknowledgement for order {order['id']}.")
Acknowledgement(order).notify()
order.chronicle(message="Reject acknowledgement sent.")
profile = Profile.profile_for(customer=order['customer_edi_id'])
order_flow_type = OrderFlowType.for_(profile.integration_config.order_flow_type)
if order_flow_type == OrderFlowType.AUTOMATED_GLOVIA:
ERP.for_(order.customer_edi_id).reject(order)
order.chronicle(message="Processed order rejection in ERP.")
# Notify business error recipients
updated_order = Orders.for_(order['id'])
if 'rejection_errors' in updated_order:
Notifier.notify(
notification_type=NotificationType.BUSINESS,
customer=updated_order['customer_edi_id'],
subject=f"B2B Orders Rejected ({updated_order['purchase_order']})",
data=ErrorNotificationData(
order_id=updated_order['id'],
message="The order was rejected due to:",
errors=updated_order['rejection_errors']
)
)
class AnyToEnteredEventHandler(OrderStatusEventListener):
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
profile = Profile.profile_for(customer=order['customer_edi_id'])
order_flow_type = OrderFlowType.for_(profile.integration_config.order_flow_type)
if order_flow_type is OrderFlowType.AUTOMATED_GLOVIA:
ERP.for_(order.customer_edi_id).accept(order)
order.chronicle(message="Executed post order acceptance logic for ERP.")
for addl_chg in order.additional_charges:
if addl_chg.recycling:
fulfillment = ff.Fulfillment.create_for_additional_charge(additional_charge=addl_chg, order=order)
logger.info(f"Created fulfillment record for recycle item. {fulfillment['id']}")
order.chronicle(message="Created fulfillment record for recycle item.")
class AnyToPartiallyShippedEventHandler(OrderStatusEventListener):
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
# Occasionally an order seems to go from a pre-entered status straight to the shipping statuses.
# This bypasses the post accept erp update that is necessary for discounts and taxes. So, we're
# making sure that the logic is run
if from_status < OrderStatus.ENTERED:
AnyToEnteredEventHandler().onStatusChange(from_status=from_status, to_status=OrderStatus.ENTERED,
order=order)
class AnyToShippedEventHandler(OrderStatusEventListener):
def onStatusChange(self, from_status: OrderStatus, to_status: OrderStatus, order: Order):
logger.debug(f"Checking to see if order exists in Glovia {order['id']}.")
if 'glovia_sales_order' not in order:
logger.debug(f"Checking to see if order exists in Glovia {order['id']}.")
order.update_erp_reference()
# Occasionally an order seems to go from a pre-entered status straight to the shipping statuses.
# This bypasses the post accept erp update that is necessary for discounts and taxes. So, we're
# making sure that the logic is run
if from_status < OrderStatus.ENTERED:
AnyToEnteredEventHandler().onStatusChange(from_status=from_status,
to_status=OrderStatus.ENTERED,
order=order)
# The fulfillment event logic should handle sending ship notices. So, nothing else to do here.
# Moved here to avoid circular import problems (strange)
from lib_b2b.profile import Profile
from lib_b2b.acknowledge import Acknowledgement
from . import fulfillment as ff