Repository URL to install this package:
|
Version:
0.4.190 ▾
|
lib-py-b2b
/
orders.py
|
|---|
from collections import namedtuple
from enum import Enum
import boto3
from boto3 import resource
from botocore.exceptions import ClientError
from os import environ
from lib_b2b.order import Order
from lib_b2b.order_status import OrderStatus
from .errors import OrderNotFoundError, OrderFetchError
import logging
from aws_xray_sdk.core import xray_recorder
from datetime import datetime
logger = logging.getLogger(__name__)
order_table = environ['order_table'] if 'order_table' in environ else 'test.b2b.Order'
DateDescriptor = namedtuple('DateDescriptor', 'field_name, index_name')
class OrderDateType(Enum):
ORDER_DATE = 'order_date'
SHIP_DATE = 'make_on'
class Orders:
# region Finders
@staticmethod
def for_(order) -> Order:
"""
Convert to order if parameter is an id (fetch_by_container_id), otherwise return the parameter as is.
:param order: string or Orders object
:return: Orders
"""
if isinstance(order, str):
_order = Orders.fetch(order)
else:
_order = order
return _order
@staticmethod
@xray_recorder.capture()
def fetch(order_id, consistent_read: bool = False) -> Order:
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb.Table(order_table)
try:
response = table.get_item(
Key={
'id': order_id
},
ConsistentRead=consistent_read
)
if 'Item' in response:
item = response['Item']
return Order.from_dict(item)
else:
raise OrderNotFoundError(f"Unable to find order: {order_id}")
except ClientError as e:
raise OrderFetchError(e.response['Error']['Message'])
@staticmethod
@xray_recorder.capture()
def fetch_for_sales_order(sales_order: str, customer_mode: str) -> [Order]:
try:
dynamodb_resource = resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table(order_table)
min_date = datetime(2016, 1, 1)
response = table.query(
IndexName='glovia_sales_order-glovia_entry_date-index',
KeyConditionExpression="glovia_sales_order = :sales_order and "
"glovia_entry_date > :min_date",
FilterExpression="customer_mode = :customer_mode",
ExpressionAttributeValues={
':sales_order': sales_order,
':min_date': min_date.isoformat(),
':customer_mode': customer_mode,
}
)
data = response['Items']
return list(map(lambda x: Order.from_dict(x), data or []))
except ClientError as e:
raise OrderFetchError(e.response['Error']['Message'])
@staticmethod
@xray_recorder.capture()
def fetch_for_date_range(date_type: OrderDateType, begin_date: datetime, end_date: datetime,
statuses: [OrderStatus], customer_mode: str = 'PROD',
customer_edi_id: str = None, item: str = None,
all_fields: bool = True) -> [Order]:
try:
if OrderStatus.ANY in statuses:
statuses = OrderStatus.all()
if len(statuses) > 4:
if customer_edi_id:
orders = Orders.fetch_for_customer(
customer_edi_id=customer_edi_id,
customer_mode=customer_mode,
date_type=date_type,
begin_date=begin_date,
end_date=end_date,
item=item,
all_fields=all_fields
)
orders = [order for order in orders if order.status in statuses]
return orders
else:
#let's query by customer_mode and filter the results
orders = Orders.fetch_for_customer_mode(
customer_mode=customer_mode,
date_type=date_type,
begin_date=begin_date,
end_date=end_date,
item=item,
all_fields=all_fields
)
orders = [order for order in orders if order.status in statuses]
return orders
else:
orders = []
for status in statuses:
orders.append(
Orders.fetch_for_status(
date_type=date_type,
begin_date=begin_date,
end_date=end_date,
status=status,
customer_mode=customer_mode,
customer_edi_id=customer_edi_id,
item=item,
all_fields=all_fields
)
)
orders = [o for sublist in orders for o in sublist]
return orders
except ClientError as e:
raise OrderFetchError(e.response['Error']['Message'])
@staticmethod
def fetch_for_customer(customer_edi_id: str, date_type: OrderDateType,
begin_date: datetime, end_date: datetime,
customer_mode: str = 'PROD', item: str = None,
all_fields: bool = True):
date_descriptors = {OrderDateType.ORDER_DATE: DateDescriptor(field_name=OrderDateType.ORDER_DATE.value,
index_name='customer_edi_id-order_date-index'),
OrderDateType.SHIP_DATE: DateDescriptor(field_name=OrderDateType.SHIP_DATE.value,
index_name='customer_edi_id-make_on-index')}
date_descriptor = date_descriptors.get(date_type)
dynamodb_resource = resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table(order_table)
expression_attr = {
':customer_mode': customer_mode,
':begin_date': begin_date.isoformat(),
':end_date': end_date.isoformat(),
':customer_edi_id': customer_edi_id
}
key_expression = f"customer_edi_id = :customer_edi_id and #d between :begin_date and :end_date"
filter_expression = "customer_mode = :customer_mode"
operational_parameters = {
'IndexName': date_descriptor.index_name,
'KeyConditionExpression': key_expression,
'FilterExpression': filter_expression,
'ExpressionAttributeValues': expression_attr,
'ExpressionAttributeNames': {
'#d': date_descriptor.field_name
}
}
if not all_fields:
# if there are projections, we need to add these fields
projection_expression = "customer_mode, customer_edi_id, id, customer_info, purchase_order, " \
"required_date, not_before_date, make_on, release_on, order_status, " \
"tracking_urls, ship_dates, shipTo, glovia_sales_order, order_lines, " \
"order_date, errors, warnings, channel_order_name"
operational_parameters['ProjectionExpression'] = projection_expression
response = table.query(**operational_parameters)
data = response['Items']
while 'LastEvaluatedKey' in response:
operational_parameters['ExclusiveStartKey'] = response['LastEvaluatedKey']
response = table.query(**operational_parameters)
data.extend(response['Items'])
orders = list(map(lambda x: Order.from_dict(x), data))
if item:
orders = [order for order in orders for order_line in order.lines if order_line.product_id == item]
return orders
@staticmethod
def fetch_for_customer_mode(customer_mode: str, date_type: OrderDateType,
begin_date: datetime, end_date: datetime,
item: str = None, all_fields: bool = True):
date_descriptors = {OrderDateType.ORDER_DATE: DateDescriptor(field_name=OrderDateType.ORDER_DATE.value,
index_name='customer_mode-order_date-index'),
OrderDateType.SHIP_DATE: DateDescriptor(field_name=OrderDateType.SHIP_DATE.value,
index_name='customer_mode-make_on-index')}
date_descriptor = date_descriptors.get(date_type)
dynamodb_resource = resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table(order_table)
expression_attr = {
':customer_mode': customer_mode,
':begin_date': begin_date.isoformat(),
':end_date': end_date.isoformat()
}
key_expression = f"customer_mode = :customer_mode and #d between :begin_date and :end_date"
operational_parameters = {
'IndexName': date_descriptor.index_name,
'KeyConditionExpression': key_expression,
'ExpressionAttributeValues': expression_attr,
'ExpressionAttributeNames': {
'#d': date_descriptor.field_name
}
}
if not all_fields:
# if there are projections, we need to add these fields
projection_expression = "customer_mode, customer_edi_id, id, customer_info, purchase_order, " \
"required_date, not_before_date, make_on, release_on, order_status, " \
"tracking_urls, ship_dates, shipTo, glovia_sales_order, order_lines, " \
"order_date, errors, warnings, channel_order_name"
operational_parameters['ProjectionExpression'] = projection_expression
response = table.query(**operational_parameters)
data = response['Items']
while 'LastEvaluatedKey' in response:
operational_parameters['ExclusiveStartKey'] = response['LastEvaluatedKey']
response = table.query(**operational_parameters)
data.extend(response['Items'])
orders = list(map(lambda x: Order.from_dict(x), data))
if item:
orders = [order for order in orders for order_line in order.lines if order_line.product_id == item]
return orders
@staticmethod
@xray_recorder.capture()
def fetch_for_status(status: OrderStatus, date_type: OrderDateType,
begin_date: datetime, end_date: datetime,
customer_mode: str = 'PROD', customer_edi_id: str = None,
item: str = None, all_fields: bool = True) -> [Order]:
date_descriptors = {OrderDateType.ORDER_DATE: DateDescriptor(field_name=OrderDateType.ORDER_DATE.value,
index_name='order_status-order_date-index'),
OrderDateType.SHIP_DATE: DateDescriptor(field_name=OrderDateType.SHIP_DATE.value,
index_name='order_status-make_on-index')}
date_descriptor = date_descriptors.get(date_type)
dynamodb_resource = resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table(order_table)
expression_attr = {
':customer_mode': customer_mode,
':begin_date': begin_date.isoformat(),
':end_date': end_date.isoformat(),
':order_status': status.value
}
key_expression = f"order_status = :order_status and #d between :begin_date and :end_date"
filter_expression = "customer_mode = :customer_mode"
if customer_edi_id:
filter_expression += " and customer_edi_id = :customer_edi_id"
expression_attr[':customer_edi_id'] = customer_edi_id
operational_parameters = {
'IndexName': date_descriptor.index_name,
'KeyConditionExpression': key_expression,
'FilterExpression': filter_expression,
'ExpressionAttributeValues': expression_attr,
'ExpressionAttributeNames': {
'#d': date_descriptor.field_name
}
}
if not all_fields:
# if there are projections, we need to add these fields
projection_expression = "customer_mode, customer_edi_id, id, customer_info, purchase_order, " \
"required_date, not_before_date, make_on, release_on, order_status, " \
"tracking_urls, ship_dates, shipTo, glovia_sales_order, order_lines, " \
"order_date, errors, warnings, channel_order_name"
operational_parameters['ProjectionExpression'] = projection_expression
response = table.query(**operational_parameters)
data = response['Items']
while 'LastEvaluatedKey' in response:
operational_parameters['ExclusiveStartKey'] = response['LastEvaluatedKey']
response = table.query(**operational_parameters)
data.extend(response['Items'])
orders = list(map(lambda x: Order.from_dict(x), data))
if item:
orders = [order for order in orders for order_line in order.lines if order_line.product_id == item]
return orders
@staticmethod
@xray_recorder.capture()
def fetch_all(customer_edi_id, customer_mode, begin_date, end_date) -> [Order]:
try:
dynamodb_resource = resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table(order_table)
response = table.query(
IndexName='customer_edi_id-order_date-index',
KeyConditionExpression="customer_edi_id = :customer_edi_id and "
"order_date between :begin_date and :end_date",
FilterExpression="customer_mode = :customer_mode",
ExpressionAttributeValues={
':customer_mode': customer_mode,
':customer_edi_id': customer_edi_id,
':begin_date': begin_date,
':end_date': end_date
}
)
data = response['Items']
while 'LastEvaluatedKey' in response:
response = table.query(
ExclusiveStartKey=response['LastEvaluatedKey'],
IndexName='customer_edi_id-order_date-index',
KeyConditionExpression="customer_edi_id = :customer_edi_id and "
"order_date between :begin_date and :end_date",
FilterExpression="customer_mode = :customer_mode",
ExpressionAttributeValues={
':customer_mode': customer_mode,
':customer_edi_id': customer_edi_id,
':begin_date': begin_date,
':end_date': end_date
}
)
data.extend(response['Items'])
return list(map(lambda x: Order.from_dict(x), data))
except ClientError as e:
raise OrderFetchError(e.response['Error']['Message'])
@staticmethod
@xray_recorder.capture()
def exists(order_id: str, revision: int = None) -> bool:
try:
order = Orders.fetch(order_id=order_id)
if revision:
if 'purchase_order_revision' in order and revision != order['purchase_order_revision']:
return False
return True
except OrderNotFoundError:
return False
# endregion