Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
lib-py-b2b / orders.py
Size: Mime:
from collections import namedtuple
from enum import Enum

import boto3
from boto3 import resource
from botocore.exceptions import ClientError
from os import environ
from lib_b2b.order import Order
from lib_b2b.order_status import OrderStatus
from .errors import OrderNotFoundError, OrderFetchError
import logging
from aws_xray_sdk.core import xray_recorder
from datetime import datetime


logger = logging.getLogger(__name__)
order_table = environ['order_table'] if 'order_table' in environ else 'test.b2b.Order'


DateDescriptor = namedtuple('DateDescriptor', 'field_name, index_name')


class OrderDateType(Enum):
    ORDER_DATE = 'order_date'
    SHIP_DATE = 'make_on'


class Orders:
    # region Finders
    @staticmethod
    def for_(order) -> Order:
        """
        Convert to order if parameter is an id (fetch_by_container_id), otherwise return the parameter as is.
        :param order: string or Orders object
        :return: Orders
        """
        if isinstance(order, str):
            _order = Orders.fetch(order)
        else:
            _order = order
        return _order

    @staticmethod
    @xray_recorder.capture()
    def fetch(order_id, consistent_read: bool = False) -> Order:
        dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
        table = dynamodb.Table(order_table)
        try:
            response = table.get_item(
                Key={
                    'id': order_id
                },
                ConsistentRead=consistent_read
            )
            if 'Item' in response:
                item = response['Item']
                return Order.from_dict(item)
            else:
                raise OrderNotFoundError(f"Unable to find order: {order_id}")
        except ClientError as e:
            raise OrderFetchError(e.response['Error']['Message'])

    @staticmethod
    @xray_recorder.capture()
    def fetch_for_sales_order(sales_order: str, customer_mode: str) -> [Order]:
        try:
            dynamodb_resource = resource('dynamodb', region_name='us-east-1')
            table = dynamodb_resource.Table(order_table)
            min_date = datetime(2016, 1, 1)
            response = table.query(
                IndexName='glovia_sales_order-glovia_entry_date-index',
                KeyConditionExpression="glovia_sales_order = :sales_order and "
                                       "glovia_entry_date > :min_date",
                FilterExpression="customer_mode = :customer_mode",
                ExpressionAttributeValues={
                    ':sales_order': sales_order,
                    ':min_date': min_date.isoformat(),
                    ':customer_mode': customer_mode,

                }
            )
            data = response['Items']
            return list(map(lambda x: Order.from_dict(x), data or []))
        except ClientError as e:
            raise OrderFetchError(e.response['Error']['Message'])

    @staticmethod
    @xray_recorder.capture()
    def fetch_for_date_range(date_type: OrderDateType, begin_date: datetime, end_date: datetime,
                             statuses: [OrderStatus], customer_mode: str = 'PROD',
                             customer_edi_id: str = None, item: str = None,
                             all_fields: bool = True) -> [Order]:
        try:
            if OrderStatus.ANY in statuses:
                statuses = OrderStatus.all()
            if len(statuses) > 4:
                if customer_edi_id:
                    orders = Orders.fetch_for_customer(
                        customer_edi_id=customer_edi_id,
                        customer_mode=customer_mode,
                        date_type=date_type,
                        begin_date=begin_date,
                        end_date=end_date,
                        item=item,
                        all_fields=all_fields
                    )
                    orders = [order for order in orders if order.status in statuses]
                    return orders
                else:
                    #let's query by customer_mode and filter the results
                    orders = Orders.fetch_for_customer_mode(
                        customer_mode=customer_mode,
                        date_type=date_type,
                        begin_date=begin_date,
                        end_date=end_date,
                        item=item,
                        all_fields=all_fields
                    )
                    orders = [order for order in orders if order.status in statuses]
                    return orders
            else:
                orders = []
                for status in statuses:
                    orders.append(
                        Orders.fetch_for_status(
                            date_type=date_type,
                            begin_date=begin_date,
                            end_date=end_date,
                            status=status,
                            customer_mode=customer_mode,
                            customer_edi_id=customer_edi_id,
                            item=item,
                            all_fields=all_fields
                        )
                    )
                orders = [o for sublist in orders for o in sublist]
                return orders
        except ClientError as e:
            raise OrderFetchError(e.response['Error']['Message'])

    @staticmethod
    def fetch_for_customer(customer_edi_id: str, date_type: OrderDateType,
                           begin_date: datetime, end_date: datetime,
                           customer_mode: str = 'PROD', item: str = None,
                           all_fields: bool = True):
        date_descriptors = {OrderDateType.ORDER_DATE: DateDescriptor(field_name=OrderDateType.ORDER_DATE.value,
                                                                     index_name='customer_edi_id-order_date-index'),
                            OrderDateType.SHIP_DATE: DateDescriptor(field_name=OrderDateType.SHIP_DATE.value,
                                                                    index_name='customer_edi_id-make_on-index')}
        date_descriptor = date_descriptors.get(date_type)
        dynamodb_resource = resource('dynamodb', region_name='us-east-1')
        table = dynamodb_resource.Table(order_table)
        expression_attr = {
            ':customer_mode': customer_mode,
            ':begin_date': begin_date.isoformat(),
            ':end_date': end_date.isoformat(),
            ':customer_edi_id': customer_edi_id
        }
        key_expression = f"customer_edi_id = :customer_edi_id and #d between :begin_date and :end_date"
        filter_expression = "customer_mode = :customer_mode"
        operational_parameters = {
            'IndexName': date_descriptor.index_name,
            'KeyConditionExpression': key_expression,
            'FilterExpression': filter_expression,
            'ExpressionAttributeValues': expression_attr,
            'ExpressionAttributeNames': {
                '#d': date_descriptor.field_name
            }
        }
        if not all_fields:
            # if there are projections, we need to add these fields
            projection_expression = "customer_mode, customer_edi_id, id, customer_info, purchase_order, " \
                                    "required_date, not_before_date, make_on, release_on, order_status, " \
                                    "tracking_urls, ship_dates, shipTo, glovia_sales_order, order_lines, " \
                                    "order_date, errors, warnings, channel_order_name"
            operational_parameters['ProjectionExpression'] = projection_expression

        response = table.query(**operational_parameters)
        data = response['Items']
        while 'LastEvaluatedKey' in response:
            operational_parameters['ExclusiveStartKey'] = response['LastEvaluatedKey']
            response = table.query(**operational_parameters)
            data.extend(response['Items'])
        orders = list(map(lambda x: Order.from_dict(x), data))
        if item:
            orders = [order for order in orders for order_line in order.lines if order_line.product_id == item]
        return orders

    @staticmethod
    def fetch_for_customer_mode(customer_mode: str, date_type: OrderDateType,
                                begin_date: datetime, end_date: datetime,
                                item: str = None, all_fields: bool = True):
        date_descriptors = {OrderDateType.ORDER_DATE: DateDescriptor(field_name=OrderDateType.ORDER_DATE.value,
                                                                     index_name='customer_mode-order_date-index'),
                            OrderDateType.SHIP_DATE: DateDescriptor(field_name=OrderDateType.SHIP_DATE.value,
                                                                    index_name='customer_mode-make_on-index')}
        date_descriptor = date_descriptors.get(date_type)
        dynamodb_resource = resource('dynamodb', region_name='us-east-1')
        table = dynamodb_resource.Table(order_table)
        expression_attr = {
            ':customer_mode': customer_mode,
            ':begin_date': begin_date.isoformat(),
            ':end_date': end_date.isoformat()
        }
        key_expression = f"customer_mode = :customer_mode and #d between :begin_date and :end_date"
        operational_parameters = {
            'IndexName': date_descriptor.index_name,
            'KeyConditionExpression': key_expression,
            'ExpressionAttributeValues': expression_attr,
            'ExpressionAttributeNames': {
                '#d': date_descriptor.field_name
            }
        }
        if not all_fields:
            # if there are projections, we need to add these fields
            projection_expression = "customer_mode, customer_edi_id, id, customer_info, purchase_order, " \
                                    "required_date, not_before_date, make_on, release_on, order_status, " \
                                    "tracking_urls, ship_dates, shipTo, glovia_sales_order, order_lines, " \
                                    "order_date, errors, warnings, channel_order_name"
            operational_parameters['ProjectionExpression'] = projection_expression

        response = table.query(**operational_parameters)
        data = response['Items']

        while 'LastEvaluatedKey' in response:
            operational_parameters['ExclusiveStartKey'] = response['LastEvaluatedKey']
            response = table.query(**operational_parameters)
            data.extend(response['Items'])
        orders = list(map(lambda x: Order.from_dict(x), data))
        if item:
            orders = [order for order in orders for order_line in order.lines if order_line.product_id == item]
        return orders

    @staticmethod
    @xray_recorder.capture()
    def fetch_for_status(status: OrderStatus, date_type: OrderDateType,
                         begin_date: datetime, end_date: datetime,
                         customer_mode: str = 'PROD', customer_edi_id: str = None,
                         item: str = None, all_fields: bool = True) -> [Order]:

        date_descriptors = {OrderDateType.ORDER_DATE: DateDescriptor(field_name=OrderDateType.ORDER_DATE.value,
                                                                     index_name='order_status-order_date-index'),
                            OrderDateType.SHIP_DATE: DateDescriptor(field_name=OrderDateType.SHIP_DATE.value,
                                                                    index_name='order_status-make_on-index')}
        date_descriptor = date_descriptors.get(date_type)
        dynamodb_resource = resource('dynamodb', region_name='us-east-1')
        table = dynamodb_resource.Table(order_table)
        expression_attr = {
            ':customer_mode': customer_mode,
            ':begin_date': begin_date.isoformat(),
            ':end_date': end_date.isoformat(),
            ':order_status': status.value
        }
        key_expression = f"order_status = :order_status and #d between :begin_date and :end_date"
        filter_expression = "customer_mode = :customer_mode"
        if customer_edi_id:
            filter_expression += " and customer_edi_id = :customer_edi_id"
            expression_attr[':customer_edi_id'] = customer_edi_id

        operational_parameters = {
            'IndexName': date_descriptor.index_name,
            'KeyConditionExpression': key_expression,
            'FilterExpression': filter_expression,
            'ExpressionAttributeValues': expression_attr,
            'ExpressionAttributeNames': {
                '#d': date_descriptor.field_name
            }
        }
        if not all_fields:
            # if there are projections, we need to add these fields
            projection_expression = "customer_mode, customer_edi_id, id, customer_info, purchase_order, " \
                                    "required_date, not_before_date, make_on, release_on, order_status, " \
                                    "tracking_urls, ship_dates, shipTo, glovia_sales_order, order_lines, " \
                                    "order_date, errors, warnings, channel_order_name"
            operational_parameters['ProjectionExpression'] = projection_expression

        response = table.query(**operational_parameters)
        data = response['Items']

        while 'LastEvaluatedKey' in response:
            operational_parameters['ExclusiveStartKey'] = response['LastEvaluatedKey']
            response = table.query(**operational_parameters)
            data.extend(response['Items'])
        orders = list(map(lambda x: Order.from_dict(x), data))
        if item:
            orders = [order for order in orders for order_line in order.lines if order_line.product_id == item]
        return orders

    @staticmethod
    @xray_recorder.capture()
    def fetch_all(customer_edi_id, customer_mode, begin_date, end_date) -> [Order]:
        try:
            dynamodb_resource = resource('dynamodb', region_name='us-east-1')
            table = dynamodb_resource.Table(order_table)
            response = table.query(
                IndexName='customer_edi_id-order_date-index',
                KeyConditionExpression="customer_edi_id = :customer_edi_id and "
                                       "order_date between :begin_date and :end_date",
                FilterExpression="customer_mode = :customer_mode",
                ExpressionAttributeValues={
                    ':customer_mode': customer_mode,
                    ':customer_edi_id': customer_edi_id,
                    ':begin_date': begin_date,
                    ':end_date': end_date
                }
            )
            data = response['Items']
            while 'LastEvaluatedKey' in response:
                response = table.query(
                    ExclusiveStartKey=response['LastEvaluatedKey'],
                    IndexName='customer_edi_id-order_date-index',
                    KeyConditionExpression="customer_edi_id = :customer_edi_id and "
                                           "order_date between :begin_date and :end_date",
                    FilterExpression="customer_mode = :customer_mode",
                    ExpressionAttributeValues={
                        ':customer_mode': customer_mode,
                        ':customer_edi_id': customer_edi_id,
                        ':begin_date': begin_date,
                        ':end_date': end_date
                    }
                )
                data.extend(response['Items'])
            return list(map(lambda x: Order.from_dict(x), data))
        except ClientError as e:
            raise OrderFetchError(e.response['Error']['Message'])

    @staticmethod
    @xray_recorder.capture()
    def exists(order_id: str, revision: int = None) -> bool:
        try:
            order = Orders.fetch(order_id=order_id)
            if revision:
                if 'purchase_order_revision' in order and revision != order['purchase_order_revision']:
                    return False
            return True
        except OrderNotFoundError:
            return False
    # endregion