Repository URL to install this package:
|
Version:
0.4.184 ▾
|
lib-py-b2b
/
fulfillments.py
|
|---|
from boto3.dynamodb.conditions import Key
from lib_b2b.fulfillment import Fulfillment
from lib_b2b.fulfillment_status import FulfillmentStatus
from collections import namedtuple
from enum import Enum
from boto3 import resource
from botocore.exceptions import ClientError
from os import environ
from .errors import FulfillmentLookupError, FulfillmentFetchError
import logging
from aws_xray_sdk.core import xray_recorder
from datetime import datetime
logger = logging.getLogger(__name__)
log_level = logging.getLevelName(environ['LOG_LEVEL']) if 'LOG_LEVEL' in environ else logging.INFO
logger.setLevel(log_level)
fulfillment_table = environ['fulfillment_table'] if 'fulfillment_table' in environ else 'test.b2b.Fulfillment'
FulfillmentDateDescriptor = namedtuple('FulfillmentDateDescriptor', 'field_name, index_name')
class FulfillmentDateType(Enum):
SHIP_DATE = 'ship_date'
class Fulfillments:
# TODO: - Move all of the DAO fulfillment methods here, and then refactor fulfillment to have state
pass
@staticmethod
@xray_recorder.capture()
def fetch_by_order_id(order_id: str):
dynamodb_resource = resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table(fulfillment_table)
response = table.query(
IndexName='order_id-index',
KeyConditionExpression=Key('order_id').eq(order_id)
)
if response and 'Items' in response and response['Items']:
return response['Items']
else:
return None
@staticmethod
@xray_recorder.capture()
def fetch_by_container_id(container_id: str):
dynamodb_resource = resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table(fulfillment_table)
response = table.query(
IndexName='container_id-index',
KeyConditionExpression=Key('container_id').eq(container_id)
)
if response and 'Items' in response and response['Items']:
return response['Items'][0]
else:
return None
@staticmethod
@xray_recorder.capture()
def fetch_for_date_range(date_type: FulfillmentDateType, begin_date: datetime, end_date: datetime,
statuses: [FulfillmentStatus], customer_mode: str = 'PROD',
customer_edi_id: str = None, all_fields: bool = True) -> [Fulfillment]:
try:
if FulfillmentStatus.ANY in statuses:
statuses = FulfillmentStatus.all()
if len(statuses) > 4:
if customer_edi_id:
fulfillments = Fulfillments.fetch_for_customer(
customer_edi_id=customer_edi_id,
customer_mode=customer_mode,
date_type=date_type,
begin_date=begin_date,
end_date=end_date,
all_fields=all_fields
)
fulfillments = [fulfillment for fulfillment in fulfillments if FulfillmentStatus.for_(fulfillment['status']) in statuses]
return fulfillments
else:
# let's query by customer_mode and filter the results
fulfillments = Fulfillments.fetch_for_customer_mode(
customer_mode=customer_mode,
date_type=date_type,
begin_date=begin_date,
end_date=end_date,
all_fields=all_fields
)
fulfillments = [fulfillment for fulfillment in fulfillments if FulfillmentStatus.for_(fulfillment['status']) in statuses]
return fulfillments
else:
fulfillments = []
for status in statuses:
fulfillments.append(
Fulfillments.fetch_for_status(
date_type=date_type,
begin_date=begin_date,
end_date=end_date,
status=status,
customer_mode=customer_mode,
customer_edi_id=customer_edi_id,
all_fields=all_fields
)
)
fulfillments = [f for sublist in fulfillments for f in sublist]
return fulfillments
except ClientError as e:
raise FulfillmentFetchError(e.response['Error']['Message'])
@staticmethod
def fetch_for_customer(customer_edi_id: str, date_type: FulfillmentDateType,
begin_date: datetime, end_date: datetime,
customer_mode: str = 'PROD', all_fields: bool = True):
date_descriptors = {
FulfillmentDateType.SHIP_DATE: FulfillmentDateDescriptor(field_name=FulfillmentDateType.SHIP_DATE.value,
index_name='customer_edi_id-index')}
date_descriptor = date_descriptors.get(date_type)
dynamodb_resource = resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table(fulfillment_table)
expression_attr = {
':customer_mode': customer_mode,
':begin_date': begin_date.isoformat(),
':end_date': end_date.isoformat(),
':customer_edi_id': customer_edi_id
}
key_expression = f"customer_edi_id = :customer_edi_id and #d between :begin_date and :end_date"
filter_expression = "customer_mode = :customer_mode"
expression_names = {
'#d': date_descriptor.field_name
}
operational_parameters = {
'IndexName': date_descriptor.index_name,
'KeyConditionExpression': key_expression,
'FilterExpression': filter_expression,
'ExpressionAttributeValues': expression_attr
}
if not all_fields:
# if there are projections, we need to add these fields
projection_expression = "#d, #s, customer_mode, customer_edi_id, id, container_id, " \
"purchase_order, tracking_urls, channel_fulfillment_id, carrier, " \
"ship_zone, weight, net_shipping_cost"
expression_names['#s'] = 'status'
operational_parameters['ProjectionExpression'] = projection_expression
operational_parameters['ExpressionAttributeNames'] = expression_names
response = table.query(**operational_parameters)
data = response['Items']
while 'LastEvaluatedKey' in response:
operational_parameters['ExclusiveStartKey'] = response['LastEvaluatedKey']
response = table.query(**operational_parameters)
data.extend(response['Items'])
return data
@staticmethod
def fetch_for_customer_mode(customer_mode: str, date_type: FulfillmentDateType,
begin_date: datetime, end_date: datetime, all_fields: bool = True):
date_descriptors = {
FulfillmentDateType.SHIP_DATE: FulfillmentDateDescriptor(field_name=FulfillmentDateType.SHIP_DATE.value,
index_name='customer_mode-ship_date-index')}
date_descriptor = date_descriptors.get(date_type)
dynamodb_resource = resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table(fulfillment_table)
expression_attr = {
':customer_mode': customer_mode,
':begin_date': begin_date.isoformat(),
':end_date': end_date.isoformat()
}
key_expression = f"customer_mode = :customer_mode and #d between :begin_date and :end_date"
expression_names = {
'#d': date_descriptor.field_name
}
operational_parameters = {
'IndexName': date_descriptor.index_name,
'KeyConditionExpression': key_expression,
'ExpressionAttributeValues': expression_attr
}
if not all_fields:
# if there are projections, we need to add these fields
projection_expression = "#d, #s, customer_mode, customer_edi_id, id, container_id, " \
"purchase_order, tracking_urls, channel_fulfillment_id, carrier, " \
"ship_zone, weight, net_shipping_cost"
expression_names['#s'] = 'status'
operational_parameters['ProjectionExpression'] = projection_expression
operational_parameters['ExpressionAttributeNames'] = expression_names
response = table.query(**operational_parameters)
data = response['Items']
while 'LastEvaluatedKey' in response:
operational_parameters['ExclusiveStartKey'] = response['LastEvaluatedKey']
response = table.query(**operational_parameters)
data.extend(response['Items'])
return data
@staticmethod
@xray_recorder.capture()
def fetch_for_status(status: FulfillmentStatus, date_type: FulfillmentDateType,
begin_date: datetime, end_date: datetime,
customer_mode: str = 'PROD', customer_edi_id: str = None,
all_fields: bool = True) -> [Fulfillment]:
date_descriptors = {
FulfillmentDateType.SHIP_DATE: FulfillmentDateDescriptor(field_name=FulfillmentDateType.SHIP_DATE.value,
index_name='status-index')}
date_descriptor = date_descriptors.get(date_type)
dynamodb_resource = resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table(fulfillment_table)
expression_attr = {
':customer_mode': customer_mode,
':begin_date': begin_date.isoformat(),
':end_date': end_date.isoformat(),
':status': status.value
}
key_expression = f"#s = :status and #d between :begin_date and :end_date"
filter_expression = "customer_mode = :customer_mode"
if customer_edi_id:
filter_expression += " and customer_edi_id = :customer_edi_id"
expression_attr[':customer_edi_id'] = customer_edi_id
expression_names = {
'#d': date_descriptor.field_name,
'#s': 'status'
}
operational_parameters = {
'IndexName': date_descriptor.index_name,
'KeyConditionExpression': key_expression,
'FilterExpression': filter_expression,
'ExpressionAttributeValues': expression_attr
}
if not all_fields:
# if there are projections, we need to add these fields
projection_expression = "#d, #s, customer_mode, customer_edi_id, id, container_id, " \
"purchase_order, tracking_urls, channel_fulfillment_id, carrier, " \
"ship_zone, weight, net_shipping_cost"
operational_parameters['ProjectionExpression'] = projection_expression
operational_parameters['ExpressionAttributeNames'] = expression_names
response = table.query(**operational_parameters)
data = response['Items']
while 'LastEvaluatedKey' in response:
operational_parameters['ExclusiveStartKey'] = response['LastEvaluatedKey']
response = table.query(**operational_parameters)
data.extend(response['Items'])
return data