Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
lib-py-b2b / fulfillment.py
Size: Mime:
import boto3
from boto3.dynamodb.conditions import Key

from lib_b2b.additional_charge import AdditionalCharge
from .errors import  FulfillmentExistsError, FulfillmentStatusError
from botocore.exceptions import ClientError
from os import environ
import logging
from collections import defaultdict, namedtuple
from datetime import datetime
import uuid
import json
from decimal import Decimal
from .util import UtilityEncoder
from boto3 import resource
from .orders import Orders
from .util import as_decimal
from .errors import NotFoundError, FulfillmentLookupError, FulfillmentSaveError
from .container import Container
from .profile import Profile
from .carrier import CarrierType
from . import fulfillment_status as fs
from aws_xray_sdk.core import xray_recorder

log_level = logging.getLevelName(environ['LOG_LEVEL']) if 'LOG_LEVEL' in environ else logging.INFO
logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
                    datefmt='%d-%m-%Y:%H:%M:%S',
                    level=log_level)
logger = logging.getLogger('lib-b2b-fulfillment')
fulfillment_table = environ['fulfillment_table'] if 'fulfillment_table' in environ else 'test.b2b.Fulfillment'

FulfillmentLevel = namedtuple('FulfillmentLevel', 'total_required total_remaining tracking_urls fulfillment_ids last_ship_date ship_dates')


class Fulfillment:
    @staticmethod
    @xray_recorder.capture()
    def fetch_unsent_fulfillments(status='NOT_SENT'):
        try:
            dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
            table = dynamodb.Table(fulfillment_table)
            data = []
            response = table.query(
                IndexName='status-index',
                KeyConditionExpression="#s = :ns",
                ExpressionAttributeValues={
                    ':ns': status
                },
                ExpressionAttributeNames={
                    '#s': 'status'
                }
            )
            data.extend(response['Items'])
            while 'LastEvaluatedKey' in response:
                response = table.query(
                    ExclusiveStartKey=response['LastEvaluatedKey'],
                    IndexName='status-index',
                    KeyConditionExpression="#s = :ns",
                    ExpressionAttributeValues={
                        ':ns': status
                    },
                    ExpressionAttributeNames={
                        '#s': 'status'
                    }
                )
                data.extend(response['Items'])
            return data
        except ClientError as e:
            raise FulfillmentLookupError(e.response['Error']['Message'])

    @staticmethod
    @xray_recorder.capture()
    def fetch_failed_fulfillments():
        try:
            dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
            table = dynamodb.Table(fulfillment_table)
            data = []
            response = table.query(
                IndexName='status-index',
                KeyConditionExpression="#s = :ns",
                FilterExpression="retry_count <= :max_retry_count",
                ExpressionAttributeValues={
                    ':ns': 'FAILED',
                    ':max_retry_count': fs.MAX_RETRY_COUNT
                },
                ExpressionAttributeNames={
                    '#s': 'status'
                }
            )
            data.extend(response['Items'])
            while 'LastEvaluatedKey' in response:
                response = table.query(
                    ExclusiveStartKey=response['LastEvaluatedKey'],
                    IndexName='status-index',
                    KeyConditionExpression="#s = :ns",
                    FilterExpression="retry_count <= :max_retry_count",
                    ExpressionAttributeValues={
                        ':ns': 'FAILED',
                        ':max_retry_count': fs.MAX_RETRY_COUNT
                    },
                    ExpressionAttributeNames={
                        '#s': 'status'
                    }
                )
                data.extend(response['Items'])
            return data
        except ClientError as e:
            raise FulfillmentLookupError(e.response['Error']['Message'])

    @staticmethod
    @xray_recorder.capture()
    def fetch_pending():
        try:
            dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
            table = dynamodb.Table(fulfillment_table)
            data = []
            response = table.query(
                IndexName='status-index',
                KeyConditionExpression="#s = :ns",
                ExpressionAttributeValues={
                    ':ns': fs.FulfillmentStatus.PENDING.value
                },
                ExpressionAttributeNames={
                    '#s': 'status'
                }
            )
            data.extend(response['Items'])
            while 'LastEvaluatedKey' in response:
                response = table.query(
                    ExclusiveStartKey=response['LastEvaluatedKey'],
                    IndexName='status-index',
                    KeyConditionExpression="#s = :ns",
                    ExpressionAttributeValues={
                        ':ns': fs.FulfillmentStatus.PENDING.value
                    },
                    ExpressionAttributeNames={
                        '#s': 'status'
                    }
                )
                data.extend(response['Items'])
            return data
        except ClientError as e:
            raise FulfillmentLookupError(e.response['Error']['Message'])

    @staticmethod
    @xray_recorder.capture()
    def fetch_unsent_fulfillments_for(customer_edi_id, status='NOT_SENT'):
        unsent = Fulfillment.fetch_unsent_fulfillments(status)
        return list(filter(lambda x: x['customer_edi_id'] == customer_edi_id, unsent))

    @staticmethod
    @xray_recorder.capture()
    def __create_records(records):
        dynamodb_resource = resource('dynamodb', region_name='us-east-1')
        table = dynamodb_resource.Table(fulfillment_table)
        with table.batch_writer() as batch:
            for r in records:
                batch.put_item(Item=r)

    @staticmethod
    @xray_recorder.capture()
    def exists(container_id: str):
        dynamodb_resource = resource('dynamodb', region_name='us-east-1')
        table = dynamodb_resource.Table(fulfillment_table)
        response = table.query(
            IndexName='container_id-index',
            KeyConditionExpression=Key('container_id').eq(container_id)
        )
        if response and 'Items' in response and response['Items']:
            return True
        else:
            return False

    @staticmethod
    @xray_recorder.capture()
    def fetch(fulfillment_id: str):
        dynamodb_resource = resource('dynamodb', region_name='us-east-1')
        table = dynamodb_resource.Table(fulfillment_table)
        try:
            response = table.get_item(
                Key={
                    'id': fulfillment_id
                }
            )
            if 'Item' in response:
                item = response['Item']
                return item
            else:
                raise NotFoundError(f"Unable to find fulfillment: {fulfillment_id}")
        except ClientError as e:
            raise FulfillmentLookupError(e.response['Error']['Message'])

    @staticmethod
    @xray_recorder.capture()
    def fetch_by_container_id(container_id: str):
        dynamodb_resource = resource('dynamodb', region_name='us-east-1')
        table = dynamodb_resource.Table(fulfillment_table)
        response = table.query(
            IndexName='container_id-index',
            KeyConditionExpression=Key('container_id').eq(container_id)
        )
        if response and 'Items' in response and response['Items']:
            return response['Items'][0]
        else:
            return None

    @staticmethod
    @xray_recorder.capture()
    def fetch_all_for_status(status, begin_date: str, end_date: str = None, customer_edi_id: str = None):
        try:
            dynamodb_resource = resource('dynamodb', region_name='us-east-1')
            table = dynamodb_resource.Table(fulfillment_table)
            if end_date:
                _end_date = end_date
            else:
                _end_date = datetime.utcnow().isoformat('T', 'seconds') + 'Z'
            expression_attribute_values = {
                ':status': status.value,
                ':begin_date': begin_date,
                ':end_date': _end_date
            }
            filter_expressions = []
            if customer_edi_id:
                expression_attribute_values[":customer_edi_id"] = customer_edi_id
                filter_expressions.append("customer_edi_id = :customer_edi_id")

            filter_expression = ' and '.join(filter_expressions)

            op_params = {
                'IndexName': 'status-index',
                'KeyConditionExpression': "#s = :status and ship_date between :begin_date and :end_date",
                'ExpressionAttributeValues': expression_attribute_values,
                'ExpressionAttributeNames': {'#s': 'status'}
            }
            if filter_expressions:
                op_params['FilterExpression'] = filter_expression

            response = table.query(**op_params)
            data = response['Items']
            while 'LastEvaluatedKey' in response:
                op_params['LastEvaluatedKey'] = response['LastEvaluatedKey']
                response = table.query(**op_params)
                data.extend(response['Items'])
            return data
        except ClientError as e:
            raise FulfillmentLookupError(e.response['Error']['Message'])

    @staticmethod
    @xray_recorder.capture()
    def fetch_by_order_id(order_id: str):
        dynamodb_resource = resource('dynamodb', region_name='us-east-1')
        table = dynamodb_resource.Table(fulfillment_table)
        response = table.query(
            IndexName='order_id-index',
            KeyConditionExpression=Key('order_id').eq(order_id)
        )
        if response and 'Items' in response and response['Items']:
            return response['Items']
        else:
            return None


    @staticmethod
    @xray_recorder.capture()
    def __remove(fulfillment):
        dynamodb_resource = resource('dynamodb', region_name='us-east-1')
        table = dynamodb_resource.Table(fulfillment_table)
        table.delete_item(Key={'id': fulfillment['id']})

    @staticmethod
    @xray_recorder.capture()
    def remove(fulfillment):
        if fulfillment['status'] == fs.FulfillmentStatus.SENT:
            raise FulfillmentStatusError('Unable to remove a sent fulfillment')
        else:
            Fulfillment.__remove(fulfillment)

    @staticmethod
    @xray_recorder.capture()
    def update_label_images(fulfillment, label_images: []):
        if isinstance(fulfillment, str):
            fulfillment_id = fulfillment
        else:
            fulfillment_id = fulfillment['id']

        try:
            dynamodb_resource = resource('dynamodb', region_name='us-east-1')
            table = dynamodb_resource.Table(fulfillment_table)
            response = table.update_item(
                Key={'id': fulfillment_id},
                UpdateExpression="set label_images = :l",
                ExpressionAttributeValues={
                    ':l': label_images
                },
                ReturnValues="UPDATED_NEW"
            )
            return response
        except ClientError as ce:
            if ce.response['Error']['Code'] != 'ConditionalCheckFailedException':
                raise ce
        except Exception as e:
            raise e

    @staticmethod
    @xray_recorder.capture()
    def update_for_shipment(fulfillment, status: fs.FulfillmentStatus, tracking_numbers: [str] = None,
                            ship_dates: [str] = None, carrier_data=None, labels: [] = None,
                            carrier_code: str = 'FDXG', net_shipping_cost: float = 0.00,
                            transit_time: str = None):
        if isinstance(fulfillment, str):
            fulfillment_id = fulfillment
        else:
            fulfillment_id = fulfillment['id']
        _fulfillment = Fulfillment.for_(fulfillment_id)
        try:
            _status = fs.FulfillmentStatus.for_(status)
            dynamodb_resource = resource('dynamodb', region_name='us-east-1')
            table = dynamodb_resource.Table(fulfillment_table)
            logger.info(f"Setting status for fulfillment: {fulfillment_id} to {_status.value}")
            if tracking_numbers is None:
                tracking_numbers = []
            if ship_dates is None:
                ship_dates = []
            if carrier_data is None:
                carrier_data = {}
            if labels is None:
                labels = []

            tracking_urls = []
            for tracking_num in tracking_numbers:
                tracking_urls.append(TrackingUrl.url_for(tracking_num, CarrierType(_fulfillment['carrier_type'])))
            carrier_data_str = json.dumps(carrier_data, cls=UtilityEncoder)
            recoded_carrier_data = json.loads(carrier_data_str, parse_float=Decimal)
            response = table.update_item(
                Key={'id': fulfillment_id},
                UpdateExpression="set #s = :s,"
                                 "tracking_numbers = :t,"
                                 "tracking_urls = :u,"
                                 "carrier_data = :c,"
                                 "ship_dates = :d,"
                                 "labels = :l,"
                                 "carrier_code = :cc,"
                                 "net_shipping_cost = :n,"
                                 "est_transit_time = :tt",
                ExpressionAttributeValues={
                    ':s': _status.value,
                    ':t': tracking_numbers,
                    ':u': tracking_urls,
                    ':c': recoded_carrier_data,
                    ':d': ship_dates,
                    ':l': labels,
                    ':cc': carrier_code,
                    ':n': Decimal(str(net_shipping_cost)),
                    ':tt': transit_time
                },
                ExpressionAttributeNames = {
                    '#s': 'status'
                },
                ReturnValues="UPDATED_NEW"
            )
            return response
        except ClientError as ce:
            if ce.response['Error']['Code'] != 'ConditionalCheckFailedException':
                raise ce
        except Exception as e:
            raise e

    @staticmethod
    def __remove_fulfillment(container_id):
        _f = Fulfillment.fetch_by_container_id(container_id)
        if _f:
            Fulfillment.remove(_f)

    @staticmethod
    @xray_recorder.capture()
    def create(container: Container, order, force=False, tracking_numbers: list = None):
        """
        Create a fulfillment record based on the container
        :param container: the Container object
        :param order: the b2b order object or order id
        :param force: overwrite the fulfillment if it already exists for this container
        :param tracking_numbers: tracking numbers if they are already available
        :return: the fulfillment object
        """
        fulfillment_id = str(uuid.uuid1())
        if force:
            Fulfillment.__remove_fulfillment(container_id=container.container_id)
        else:
            if Fulfillment.exists(container.container_id):
                raise FulfillmentExistsError(f'Attempted to create new fulfillment for a '
                                             f'container [{container.container_id}] for '
                                             f'which a fulfillment already exists.')
        _order = Orders.for_(order)
        profile = Profile.profile_for(_order['customer_edi_id'])
        if profile.fulfillment_config.use_standard_weight:
            weight = container.attributes.standard_weight
        else:
            weight = container.attributes.actual_weight or container.attributes.standard_weight

        # Per request from Ops/Chris - cap the weight we send :(
        actual_weight = weight
        if profile.fulfillment_config.maximum_sent_weight and weight > profile.fulfillment_config.maximum_sent_weight:
            weight = profile.fulfillment_config.maximum_sent_weight

        fulfillment = {
            'id': fulfillment_id,
            'order_id': _order['id'],
            'container_id': container.container_id,
            'customer_edi_id': _order['customer_edi_id'],
            'purchase_order': _order['purchase_order'],
            'customer_mode': _order['customer_mode'],
            'channel_name': _order.get('channel_name', 'standard'),
            'channel_order_id': _order.get('channel_order_id', _order.get('id')),
            'carrier': _order['carrier'],
            'carrier_type': profile.fulfillment_config.get_carrier_type().value,
            'status': fs.FulfillmentStatus.NOT_SENT.value if tracking_numbers else fs.FulfillmentStatus.PENDING.value,
            'integration_type': _order.get('integration_type', 'standard'),
            'line_items': [],
            'tracking_numbers': [],
            'tracking_urls': [],
            'ship_date': datetime.now().isoformat(),
            'glovia_order': container.contents[0].so,
            'service_type': profile.fulfillment_config.get_service_type(weight).value,
            'actual_weight': actual_weight,
            'weight': as_decimal(weight, 'container_weight'),
            'weight_um': container.attributes.weight_um
        }
        if 'notification_urls' in _order and 'ship_notice' in _order['notification_urls']:
            fulfillment['notification_url'] = _order['notification_urls']['ship_notice']

        for content in container.contents:
            line_id = f"{content.customer_edi_id}-{content.cus_po}-{content.cus_po_line}"
            line = next(l for l in _order['order_lines'] if l['purchase_order_line'] == content.cus_po_line)
            fulfillment['line_items'].append({'line_id': line_id,
                                              'quantity': as_decimal(content.quantity, 'container.content.quantity'),
                                              'purchase_order_line': content.cus_po_line or None,
                                              'channel_line_id': line.get('channel_line_id') or None,
                                              'glovia_line_id': content.soline
                                              })

        if tracking_numbers:
            fulfillment['tracking_numbers'].extend(tracking_numbers)
            for tracking_num in tracking_numbers:
                fulfillment['tracking_urls'].append(
                    TrackingUrl.url_for(tracking_num, profile.fulfillment_config.get_carrier_type()))

        Fulfillment.__create_records([fulfillment])
        return fulfillment

    @staticmethod
    @xray_recorder.capture()
    def create_for_additional_charge(additional_charge: AdditionalCharge, order, force: bool = False):
        """
        Create a fulfillment record based on the container
        :param force: Overwrite any existing fulfillment
        :type force: bool
        :param additional_charge: The additional charge to be automagically fulfilled
        :type additional_charge: AdditionalCharge
        :param order: the b2b order object or order id
        :return: the fulfillment object
        """
        fulfillment_id = str(uuid.uuid1())
        if force:
            Fulfillment.__remove_fulfillment(container_id=additional_charge.charge_id)
        else:
            if Fulfillment.exists(additional_charge.charge_id):
                raise FulfillmentExistsError(f'Attempted to create new fulfillment for an '
                                             f'additional charge [{additional_charge.charge_id}] for '
                                             f'which a fulfillment already exists.')
        _order = Orders.for_(order)
        profile = Profile.profile_for(_order['customer_edi_id'])
        fulfillment = {
            'id': fulfillment_id,
            'order_id': _order['id'],
            'container_id': additional_charge.charge_id,
            'customer_edi_id': _order['customer_edi_id'],
            'purchase_order': _order['purchase_order'],
            'customer_mode': _order['customer_mode'],
            'channel_name': _order.get('channel_name', 'standard'),
            'channel_order_id': _order.get('channel_order_id', _order.get('id')),
            'carrier': _order['carrier'],
            'carrier_type': profile.fulfillment_config.get_carrier_type().value,
            'status': fs.FulfillmentStatus.NOT_SENT.value,
            'integration_type': _order.get('integration_type', 'standard'),
            'line_items': [
                {
                    'line_id': additional_charge.charge_id,
                    'quantity': 1,
                    'purchase_order_line': None,
                    'channel_line_id': additional_charge.channel_line_id or None,
                    'glovia_line_id': None
                }
            ],
            'tracking_numbers': [],
            'tracking_urls': [],
            'ship_date': datetime.now().isoformat(),
            'glovia_order': _order.get('glovia_sales_order'),
            'service_type': 0,
            'weight': 0,
            'weight_um': 'LB'
        }
        if 'notification_urls' in _order and 'ship_notice' in _order['notification_urls']:
            fulfillment['notification_url'] = _order['notification_urls']['ship_notice']

        Fulfillment.__create_records([fulfillment])
        return fulfillment

    @staticmethod
    def unfulfilled_qty_for(order_id):
        dynamodb_resource = resource('dynamodb', region_name='us-east-1')
        table = dynamodb_resource.Table(fulfillment_table)
        fulfillments = []
        response = table.query(
            IndexName='order_id-index',
            KeyConditionExpression=Key('order_id').eq(order_id)
        )
        if 'Items' in response:
            fulfillments.extend(response['Items'])
            while 'LastEvaluatedKey' in response:
                response = table.query(
                    ExclusiveStartKey=response['LastEvaluatedKey'],
                    IndexName='order_id-index',
                    KeyConditionExpression=Key('order_id').eq(order_id)
                )
                fulfillments.extend(response['Items'])

        order = Orders.for_(order_id)
        required = {}
        for line in order['order_lines']:
            line_id = f"{order['customer_edi_id']}-{order['purchase_order']}-{line['purchase_order_line']}"
            required[line_id] = line['quantity']

        fulfilled = defaultdict(int)
        for fulfillment in fulfillments:
            if fs.FulfillmentStatus.for_(fulfillment['status']) in (fs.FulfillmentStatus.SENT, fs.FulfillmentStatus.NOT_SENT, fs.FulfillmentStatus.FAILED):
                for line in fulfillment['line_items']:
                    line_id = line['line_id']
                    fulfilled[line_id] += line['quantity']

        remaining = defaultdict(int)
        for line_id in required.keys():
            required_qty = required[line_id]
            fulfilled_qty = fulfilled[line_id]
            remaining[line_id] = max(0, (required_qty-fulfilled_qty))

        tracking_urls = [x for y in [f['tracking_urls'] for f in fulfillments] for x in y]
        fulfillment_ids = [f['id'] for f in fulfillments]
        ship_dates = []
        for f in fulfillments:
            if 'ship_date' in f:
                ship_dates.append(f['ship_date'])
            if 'ship_dates' in f:
                for s in f['ship_dates']:
                    ship_dates.append(s)
        # ship_dates = [f['ship_date'] for f in fulfillments if 'ship_date' in f]
        # ship_dates.extend([f['ship_dates'] for f in fulfillments if 'ship_dates' in f])
        last_ship_date = max(ship_dates)
        return FulfillmentLevel(total_required=sum(required.values()),
                                total_remaining=sum(remaining.values()),
                                tracking_urls=tracking_urls,
                                fulfillment_ids=fulfillment_ids,
                                last_ship_date=last_ship_date,
                                ship_dates=ship_dates)

    @staticmethod
    @xray_recorder.capture()
    def save_record(fulfillment):
        dynamodb_resource = resource('dynamodb', region_name='us-east-1')
        table = dynamodb_resource.Table(fulfillment_table)
        try:
            table.put_item(Item=fulfillment)
        except ClientError as e:
            raise FulfillmentSaveError(e.response['Error']['Message'])

    @staticmethod
    @xray_recorder.capture()
    def update_status(fulfillment_id: str, status):
        """
        Update the order status on the order record
        :param fulfillment_id: b2b fulfillment id
        :param status: the OrderStatus object for the status to be updated to
        :return: None
        """
        try:
            from .fulfillment_status import FulfillmentStatus
            _status = FulfillmentStatus.for_(status)
            dynamodb_resource = resource('dynamodb', region_name='us-east-1')
            table = dynamodb_resource.Table(fulfillment_table)
            logger.info(f"Setting status for fulfillment: {fulfillment_id} to {_status.value}")
            response = table.update_item(
                Key={'id': fulfillment_id},
                UpdateExpression="set #s = :s",
                ExpressionAttributeValues={
                    ':s': _status.value
                },
                ExpressionAttributeNames = {
                    '#s': 'status'
                },
                ReturnValues="UPDATED_NEW"
            )
            return response
        except ClientError as ce:
            if ce.response['Error']['Code'] != 'ConditionalCheckFailedException':
                raise ce
        except Exception as e:
            raise e

    @staticmethod
    @xray_recorder.capture()
    def update_channel_id(fulfillment_id: str, channel_fulfillment_id: str):
        """
        Update the fulfillment's downstream identifier for this fulfillment in the downstream system
        :param fulfillment_id: b2b fulfillment id
        :param channel_fulfillment_id: the identifier from the downstream system
        :return: None
        """
        try:
            dynamodb_resource = resource('dynamodb', region_name='us-east-1')
            table = dynamodb_resource.Table(fulfillment_table)
            logger.debug(f"Setting channel fulfillment id for fulfillment: {fulfillment_id} to {channel_fulfillment_id}")
            response = table.update_item(
                Key={'id': fulfillment_id},
                UpdateExpression="set channel_fulfillment_id = :s",
                ExpressionAttributeValues={
                    ':s': channel_fulfillment_id
                },
                ReturnValues="UPDATED_NEW"
            )
            return response
        except ClientError as ce:
            if ce.response['Error']['Code'] != 'ConditionalCheckFailedException':
                raise ce
        except Exception as e:
            raise e

    @staticmethod
    def for_(fulfillment):
        """
        Convert to fulfillment if parameter is an id (fetch_by_container_id), otherwise return the parameter as is.
        :param fulfillment: string or Fulfillment object
        :return: Fulfillment
        """
        if isinstance(fulfillment, str):
            _fulfillment = Fulfillment.fetch(fulfillment)
        else:
            _fulfillment = fulfillment
        return _fulfillment


from .carrier import TrackingUrl