Repository URL to install this package:
|
Version:
0.4.139 ▾
|
lib-py-b2b
/
fulfillment_status.py
|
|---|
from enum import Enum
from abc import ABC, abstractmethod
from typing import List, Tuple, DefaultDict
from collections import defaultdict
import logging
from os import environ
log_level = logging.getLevelName(environ['LOG_LEVEL']) if 'LOG_LEVEL' in environ else logging.INFO
logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
datefmt='%d-%m-%Y:%H:%M:%S',
level=log_level)
logger = logging.getLogger('lib-b2b-fulfillment-status')
class FulfillmentStatus(Enum):
NONE = 'NONE'
ANY = 'ANY'
PENDING = 'PENDING'
NOT_SENT = 'NOT_SENT'
FAILED = 'FAILED'
SENT = 'SENT'
CANCELLED = 'CANCELLED'
@staticmethod
def for_(fulfillment_status):
"""
Convert variable to an FulfillmentStatus type. Will pass an actual FulfillmentStatus through.
:param fulfillment_status: string or FulfillmentStatus object
:return: FulfillmentStatus
"""
if isinstance(fulfillment_status, str):
_status = FulfillmentStatus(fulfillment_status)
else:
_status = fulfillment_status
return _status
@staticmethod
def all():
return [s for s in FulfillmentStatus]
class FulfillmentStatusEventListener(ABC):
@abstractmethod
def onStatusChange(self, from_status: FulfillmentStatus, to_status: FulfillmentStatus, fulfillment):
"""
Handle status changes on fulfillment
:param from_status: string or FulfillmentStatus
:param to_status: string or FulfillmentStatus
:param fulfillment: fulfillment id as string or b2b fulfillment object
:return: None
"""
pass
class FulfillmentStatusEventBroker:
@staticmethod
def initialize():
"""
Default initialization and subscriptions
:return:
"""
_broker = FulfillmentStatusEventBroker()
_broker.subscribe(FulfillmentStatus.ANY, FulfillmentStatus.PENDING, AnyToPendingEventHandler())
_broker.subscribe(FulfillmentStatus.ANY, FulfillmentStatus.NOT_SENT, AnyToNotSentEventHandler())
_broker.subscribe(FulfillmentStatus.NOT_SENT, FulfillmentStatus.FAILED, NotSentToFailedEventHandler())
_broker.subscribe(FulfillmentStatus.NOT_SENT, FulfillmentStatus.SENT, NotSentToSentEventHandler())
_broker.subscribe(FulfillmentStatus.ANY, FulfillmentStatus.CANCELLED, AnyToCancelledEventHandler())
return _broker
def __init__(self):
self.subscribers: DefaultDict[Tuple[FulfillmentStatus, FulfillmentStatus], List[FulfillmentStatusEventListener]] = defaultdict(list)
def subscribe(self, from_status, to_status, subscriber: FulfillmentStatusEventListener):
status_tuple = (FulfillmentStatus.for_(from_status), FulfillmentStatus.for_(to_status))
self.subscribers[status_tuple].append(subscriber)
def unsubscribe(self, from_status, to_status, subscriber: FulfillmentStatusEventListener):
status_tuple = (FulfillmentStatus.for_(from_status), FulfillmentStatus.for_(to_status))
self.subscribers[status_tuple].remove(subscriber)
def publish(self, from_status, to_status, fulfillment):
"""
Handle status changes on fulfillments. The events are post change in the status.
:param from_status: string or FulfillmentStatus
:param to_status: string or FulfillmentStatus
:param fulfillment: fulfillment id as string or b2b Fulfillment object
:return: None
"""
_fulfillment = ff.Fulfillment.for_(fulfillment)
status_tuples = []
_from_status = FulfillmentStatus.for_(from_status)
_to_status = FulfillmentStatus.for_(to_status)
_status_tuple = (_from_status, _to_status)
status_tuples.append(_status_tuple)
status_tuples.append((FulfillmentStatus.ANY, _to_status))
status_tuples.append((_from_status, FulfillmentStatus.ANY))
logger.info(f"Fulfillment {_fulfillment['id']} :: Status Change {_from_status} -> {_to_status}")
# Send SNS events for status change
try:
B2BEvent.notify(B2BTopic[_to_status.value], _fulfillment['id'])
except KeyError:
logger.warning(f"{_fulfillment['status']} is not a valid B2BTopic.")
for status_tuple in status_tuples:
if status_tuple in self.subscribers:
for subscriber in self.subscribers[status_tuple]:
logger.debug(f"Notifying subscriber {type(subscriber).__name__} of status change {status_tuple}")
try:
subscriber.onStatusChange(from_status=_from_status, to_status=_to_status, fulfillment=_fulfillment)
except Exception as e:
logger.exception(f"Subscriber {type(subscriber).__name__} raised an exception while handling {status_tuple}")
else:
logger.debug(f"No subscribers found for {status_tuple}")
class AnyToPendingEventHandler(FulfillmentStatusEventListener):
def onStatusChange(self, from_status: FulfillmentStatus, to_status: FulfillmentStatus, fulfillment):
# Currently, this is a marker status that is used for knowing
# which shipments need status data from the carrier system
pass
class AnyToNotSentEventHandler(FulfillmentStatusEventListener):
def onStatusChange(self, from_status: FulfillmentStatus, to_status: FulfillmentStatus, fulfillment):
# This status means that I have all the tracking information for this fulfillment record and it is
# time to send it to the upstream system
# make first attempt to send fulfillment. subsequent attempts will be made from a scheduled function.
logger.info(f"Processing fulfillment: {fulfillment['id']} for order: {fulfillment['order_id']}")
ShipNotice(fulfillment).notify()
# Update the order to shipped or partially shipped
level = ff.Fulfillment.unfulfilled_qty_for(fulfillment['order_id'])
from .orders import Orders
Orders.for_(fulfillment['order_id']).on_fulfillment_change(level)
class NotSentToFailedEventHandler(FulfillmentStatusEventListener):
def onStatusChange(self, from_status: FulfillmentStatus, to_status: FulfillmentStatus, fulfillment):
# This status means that we attempted to send the ship notice and it failed
if fulfillment['retry_count'] <= MAX_RETRY_COUNT:
logger.warning(f"Fulfillment notice failed to send, on attempt {fulfillment['retry_count']}, "
f"for fulfillment: {fulfillment['id']} and order: {fulfillment['order_id']}")
else:
logger.error(f"Fulfillment notice failed to send, and has exceeded the maximum retry attempts, "
f"for fulfillment: {fulfillment['id']} and order: {fulfillment['order_id']}")
Notifier.notify(
notification_type=NotificationType.TECHNICAL,
customer=fulfillment['customer_edi_id'],
subject='Fulfillment Notice Failure',
data=ErrorNotificationData(order_id=fulfillment['order_id'],
message=f"Unable to notify the customer of fulfillment "
f"information for fulfillment: {fulfillment['id']}",
errors=fulfillment['last_error'])
)
class NotSentToSentEventHandler(FulfillmentStatusEventListener):
def onStatusChange(self, from_status: FulfillmentStatus, to_status: FulfillmentStatus, fulfillment):
logger.info(f"Fulfillment notices successfully sent for fulfillment: {fulfillment['id']} order: {fulfillment['order_id']}")
tracking_numbers = fulfillment.get('tracking_numbers', [])
container_id = fulfillment.get('container_id', None)
logger.debug(f"Attempting to save tracking information for container_id {container_id} tracking_numbers {tracking_numbers}")
try:
# write the tracking number back to container
from lib_b2b.erp import ERP
erp_order = ERP.for_(fulfillment['customer_edi_id']).fetch_order_by_id(fulfillment['order_id'])
ccn = erp_order.sales_ccn
logger.debug(
f"Attempting to save tracking information for container_id {container_id}, tracking_numbers {tracking_numbers}, erp_order {str(erp_order)}")
if tracking_numbers and container_id and ccn:
from .tracking import encode_glovia_pro_num
from .container import Container
tracking_number_str = encode_glovia_pro_num(tracking_numbers)
Container.save_tracking_number(ccn=ccn, container_id=container_id, tracking_number=tracking_number_str)
logger.info(f"Successfully saved fulfillment {fulfillment['id']} tracking number to container {container_id}")
except Exception as e:
logger.error(f"Unable to write tracking number for container {container_id}", e)
class AnyToCancelledEventHandler(FulfillmentStatusEventListener):
def onStatusChange(self, from_status: FulfillmentStatus, to_status: FulfillmentStatus, fulfillment):
from .carrier import CarrierType, CarrierIntegration
from .profile import Profile
from .shipnotice import ShipNotice
from .fulfillment import Fulfillment
from uuid import uuid4
profile = Profile.profile_for(customer=fulfillment['customer_edi_id'])
carrier_type = CarrierType(fulfillment.get('carrier_type', 'FEDEX_SHIP_MGR'))
carrier = CarrierIntegration.get_carrier(profile=profile, carrier_type=carrier_type)
carrier.remove_shipment(fulfillment=fulfillment)
ShipNotice(fulfillment=fulfillment).cancel()
fulfillment['cancelled_container_id'] = fulfillment['container_id']
fulfillment['container_id'] = str(uuid4())
Fulfillment.save_record(fulfillment)
# Moved to the bottom to avoid silly python circular import problems
from .shipnotice import ShipNotice
from .notification import Notifier, NotificationType, ErrorNotificationData
from .event import B2BEvent, B2BTopic
from .shipnotice import MAX_RETRY_COUNT
from . import fulfillment as ff