Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
lib-py-b2b / fulfillments.py
Size: Mime:
from boto3.dynamodb.conditions import Key

from lib_b2b.fulfillment import Fulfillment
from lib_b2b.fulfillment_status import FulfillmentStatus
from collections import namedtuple
from enum import Enum
from boto3 import resource
from botocore.exceptions import ClientError
from os import environ
from .errors import FulfillmentLookupError, FulfillmentFetchError
import logging
from aws_xray_sdk.core import xray_recorder
from datetime import datetime

logger = logging.getLogger(__name__)
log_level = logging.getLevelName(environ['LOG_LEVEL']) if 'LOG_LEVEL' in environ else logging.INFO
logger.setLevel(log_level)
fulfillment_table = environ['fulfillment_table'] if 'fulfillment_table' in environ else 'test.b2b.Fulfillment'
FulfillmentDateDescriptor = namedtuple('FulfillmentDateDescriptor', 'field_name, index_name')


class FulfillmentDateType(Enum):
    SHIP_DATE = 'ship_date'


class Fulfillments:
    # TODO: - Move all of the DAO fulfillment methods here, and then refactor fulfillment to have state
    pass

    @staticmethod
    @xray_recorder.capture()
    def fetch_by_order_id(order_id: str):
        dynamodb_resource = resource('dynamodb', region_name='us-east-1')
        table = dynamodb_resource.Table(fulfillment_table)
        response = table.query(
            IndexName='order_id-index',
            KeyConditionExpression=Key('order_id').eq(order_id)
        )
        if response and 'Items' in response and response['Items']:
            return response['Items']
        else:
            return None

    @staticmethod
    @xray_recorder.capture()
    def fetch_by_container_id(container_id: str):
        dynamodb_resource = resource('dynamodb', region_name='us-east-1')
        table = dynamodb_resource.Table(fulfillment_table)
        response = table.query(
            IndexName='container_id-index',
            KeyConditionExpression=Key('container_id').eq(container_id)
        )
        if response and 'Items' in response and response['Items']:
            return response['Items'][0]
        else:
            return None

    @staticmethod
    @xray_recorder.capture()
    def fetch_for_date_range(date_type: FulfillmentDateType, begin_date: datetime, end_date: datetime,
                             statuses: [FulfillmentStatus], customer_mode: str = 'PROD',
                             customer_edi_id: str = None, all_fields: bool = True) -> [Fulfillment]:
        try:
            if FulfillmentStatus.ANY in statuses:
                statuses = FulfillmentStatus.all()
            if len(statuses) > 4:
                if customer_edi_id:
                    fulfillments = Fulfillments.fetch_for_customer(
                        customer_edi_id=customer_edi_id,
                        customer_mode=customer_mode,
                        date_type=date_type,
                        begin_date=begin_date,
                        end_date=end_date,
                        all_fields=all_fields
                    )
                    fulfillments = [fulfillment for fulfillment in fulfillments if FulfillmentStatus.for_(fulfillment['status']) in statuses]
                    return fulfillments
                else:
                    # let's query by customer_mode and filter the results
                    fulfillments = Fulfillments.fetch_for_customer_mode(
                        customer_mode=customer_mode,
                        date_type=date_type,
                        begin_date=begin_date,
                        end_date=end_date,
                        all_fields=all_fields
                    )
                    fulfillments = [fulfillment for fulfillment in fulfillments if FulfillmentStatus.for_(fulfillment['status']) in statuses]
                    return fulfillments
            else:
                fulfillments = []
                for status in statuses:
                    fulfillments.append(
                        Fulfillments.fetch_for_status(
                            date_type=date_type,
                            begin_date=begin_date,
                            end_date=end_date,
                            status=status,
                            customer_mode=customer_mode,
                            customer_edi_id=customer_edi_id,
                            all_fields=all_fields
                        )
                    )
                fulfillments = [f for sublist in fulfillments for f in sublist]
                return fulfillments
        except ClientError as e:
            raise FulfillmentFetchError(e.response['Error']['Message'])

    @staticmethod
    def fetch_for_customer(customer_edi_id: str, date_type: FulfillmentDateType,
                           begin_date: datetime, end_date: datetime,
                           customer_mode: str = 'PROD', all_fields: bool = True):
        date_descriptors = {
            FulfillmentDateType.SHIP_DATE: FulfillmentDateDescriptor(field_name=FulfillmentDateType.SHIP_DATE.value,
                                                                     index_name='customer_edi_id-index')}
        date_descriptor = date_descriptors.get(date_type)
        dynamodb_resource = resource('dynamodb', region_name='us-east-1')
        table = dynamodb_resource.Table(fulfillment_table)
        expression_attr = {
            ':customer_mode': customer_mode,
            ':begin_date': begin_date.isoformat(),
            ':end_date': end_date.isoformat(),
            ':customer_edi_id': customer_edi_id
        }
        key_expression = f"customer_edi_id = :customer_edi_id and #d between :begin_date and :end_date"
        filter_expression = "customer_mode = :customer_mode"
        expression_names = {
            '#d': date_descriptor.field_name
        }
        operational_parameters = {
            'IndexName': date_descriptor.index_name,
            'KeyConditionExpression': key_expression,
            'FilterExpression': filter_expression,
            'ExpressionAttributeValues': expression_attr
        }
        if not all_fields:
            # if there are projections, we need to add these fields
            projection_expression = "#d, #s, customer_mode, customer_edi_id, id, container_id, " \
                                    "purchase_order, tracking_urls, channel_fulfillment_id, carrier, " \
                                    "ship_zone, weight, net_shipping_cost"
            expression_names['#s'] = 'status'
            operational_parameters['ProjectionExpression'] = projection_expression

        operational_parameters['ExpressionAttributeNames'] = expression_names
        response = table.query(**operational_parameters)
        data = response['Items']

        while 'LastEvaluatedKey' in response:
            operational_parameters['ExclusiveStartKey'] = response['LastEvaluatedKey']
            response = table.query(**operational_parameters)
            data.extend(response['Items'])
        return data

    @staticmethod
    def fetch_for_customer_mode(customer_mode: str, date_type: FulfillmentDateType,
                                begin_date: datetime, end_date: datetime, all_fields: bool = True):
        date_descriptors = {
            FulfillmentDateType.SHIP_DATE: FulfillmentDateDescriptor(field_name=FulfillmentDateType.SHIP_DATE.value,
                                                                     index_name='customer_mode-ship_date-index')}
        date_descriptor = date_descriptors.get(date_type)
        dynamodb_resource = resource('dynamodb', region_name='us-east-1')
        table = dynamodb_resource.Table(fulfillment_table)
        expression_attr = {
            ':customer_mode': customer_mode,
            ':begin_date': begin_date.isoformat(),
            ':end_date': end_date.isoformat()
        }
        key_expression = f"customer_mode = :customer_mode and #d between :begin_date and :end_date"
        expression_names = {
            '#d': date_descriptor.field_name
        }
        operational_parameters = {
            'IndexName': date_descriptor.index_name,
            'KeyConditionExpression': key_expression,
            'ExpressionAttributeValues': expression_attr
        }
        if not all_fields:
            # if there are projections, we need to add these fields
            projection_expression = "#d, #s, customer_mode, customer_edi_id, id, container_id, " \
                                    "purchase_order, tracking_urls, channel_fulfillment_id, carrier, " \
                                    "ship_zone, weight, net_shipping_cost"
            expression_names['#s'] = 'status'
            operational_parameters['ProjectionExpression'] = projection_expression

        operational_parameters['ExpressionAttributeNames'] = expression_names

        response = table.query(**operational_parameters)
        data = response['Items']
        while 'LastEvaluatedKey' in response:
            operational_parameters['ExclusiveStartKey'] = response['LastEvaluatedKey']
            response = table.query(**operational_parameters)
            data.extend(response['Items'])
        return data

    @staticmethod
    @xray_recorder.capture()
    def fetch_for_status(status: FulfillmentStatus, date_type: FulfillmentDateType,
                         begin_date: datetime, end_date: datetime,
                         customer_mode: str = 'PROD', customer_edi_id: str = None,
                         all_fields: bool = True) -> [Fulfillment]:

        date_descriptors = {
            FulfillmentDateType.SHIP_DATE: FulfillmentDateDescriptor(field_name=FulfillmentDateType.SHIP_DATE.value,
                                                                     index_name='status-index')}
        date_descriptor = date_descriptors.get(date_type)
        dynamodb_resource = resource('dynamodb', region_name='us-east-1')
        table = dynamodb_resource.Table(fulfillment_table)
        expression_attr = {
            ':customer_mode': customer_mode,
            ':begin_date': begin_date.isoformat(),
            ':end_date': end_date.isoformat(),
            ':status': status.value
        }
        key_expression = f"#s = :status and #d between :begin_date and :end_date"
        filter_expression = "customer_mode = :customer_mode"
        if customer_edi_id:
            filter_expression += " and customer_edi_id = :customer_edi_id"
            expression_attr[':customer_edi_id'] = customer_edi_id
        expression_names = {
            '#d': date_descriptor.field_name,
            '#s': 'status'
        }
        operational_parameters = {
            'IndexName': date_descriptor.index_name,
            'KeyConditionExpression': key_expression,
            'FilterExpression': filter_expression,
            'ExpressionAttributeValues': expression_attr
        }
        if not all_fields:
            # if there are projections, we need to add these fields
            projection_expression = "#d, #s, customer_mode, customer_edi_id, id, container_id, " \
                                    "purchase_order, tracking_urls, channel_fulfillment_id, carrier, " \
                                    "ship_zone, weight, net_shipping_cost"
            operational_parameters['ProjectionExpression'] = projection_expression
        operational_parameters['ExpressionAttributeNames'] = expression_names
        response = table.query(**operational_parameters)
        data = response['Items']
        while 'LastEvaluatedKey' in response:
            operational_parameters['ExclusiveStartKey'] = response['LastEvaluatedKey']
            response = table.query(**operational_parameters)
            data.extend(response['Items'])
        return data