Repository URL to install this package:
|
Version:
0.4.192 ▾
|
lib-py-b2b
/
order.py
|
|---|
from typing import Optional, List
from dateutil.parser import isoparse, parse
from dateutil.tz import UTC
from py_aws_util.logging import log_data
from lib_b2b.additional_charge import AdditionalCharge
from lib_b2b.change import ChangeRecord, ChangeRequest
from lib_b2b.discount import Discount
from lib_b2b.erp import ERP
from lib_b2b.financial_status import FinancialStatus
from lib_b2b.order_dates import OrderDatesDelegate
from lib_b2b.order_line import OrderLine
from lib_b2b.order_totals import OrderTotals
from lib_b2b.tracking import get_random_tracking_number
from lib_b2b.util import parse_dt
from .persistent import Persistable
from collections import UserDict
from .address import Address, AddressValidationResult
from datetime import datetime, timedelta, timezone
from .order_status import OrderStatus
from .errors import OrderNotFoundError, OrderStatusChangeError, OrderValidationError, \
InvalidAddressError, InvalidCustomerItemError, ItemPricingNotFound, ShipRateNotFound, ShipZoneNotFound, \
OrderFetchError, OrderSaveError, ErrorRecord, WarningRecord, NoticeRecord, RejectionReason, VersionConflictError, \
ConditionCheckError, ChangeFailedError
from .shipping import Shipping
from boto3 import resource
from botocore.exceptions import ClientError
from .change import Changeable
from boto3.dynamodb.conditions import ConditionBase, ConditionExpressionBuilder, Attr, Or, And
import logging
from os import environ
from aws_xray_sdk.core import xray_recorder
logger = logging.getLogger(__name__)
order_table = environ['order_table'] if 'order_table' in environ else 'test.b2b.Order'
class Order(UserDict, Persistable, Changeable):
def supported_change_request_types(self) -> [ChangeRequest]:
from lib_b2b.order_change.accept import OrderAcceptChangeRequest
from lib_b2b.order_change.address import OrderAddressChangeRequest
from lib_b2b.order_change.cancel import OrderCancelChangeRequest
from lib_b2b.order_change.cancel_line import OrderCancelLineChangeRequest
from lib_b2b.order_change.paid import OrderFinancialStatusChangeRequest
from lib_b2b.order_change.ship_date import OrderShipDateChangeRequest
from lib_b2b.order_change.total import OrderTotalChangeRequest
from lib_b2b.order_change.reject import OrderRejectChangeRequest
from lib_b2b.order_change.revision import OrderRevisionChangeRequest
return [OrderAcceptChangeRequest, OrderAddressChangeRequest,
OrderCancelChangeRequest, OrderCancelLineChangeRequest,
OrderFinancialStatusChangeRequest, OrderShipDateChangeRequest,
OrderTotalChangeRequest, OrderRejectChangeRequest,
OrderRevisionChangeRequest]
@staticmethod
def generate_id(customer, purchase_order):
if isinstance(customer, str):
_customer_edi_id = customer
else:
_customer_edi_id = customer['customer_edi_id']
return f"{_customer_edi_id}-{purchase_order}"
@staticmethod
def __generate_order_date(order_date: datetime, additional_data: dict) -> datetime:
if order_date:
return order_date
else:
today = datetime.now(tz=timezone.utc)
if 'manually_entered' in additional_data:
return today - timedelta(days=1)
else:
return today
@staticmethod
def __get_version_conditions(version):
if version:
expressions = [
Attr('version').lte(version),
Attr('version').not_exists()
]
return Or(*expressions)
else:
return None
@staticmethod
def create(data: dict, conditions: ConditionBase = None, version: str = None):
dynamodb_resource = resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table(order_table)
try:
if version:
data['version'] = version
version_conditions = Order.__get_version_conditions(version)
if conditions:
if version_conditions:
conditions = And(conditions, version_conditions)
table.put_item(Item=data, ConditionExpression=conditions)
else:
if version_conditions:
table.put_item(Item=data, ConditionExpression=version_conditions)
table.put_item(Item=data)
order = Order(data)
order.chronicle(message="Received order", when=datetime.now().astimezone(UTC))
return order
except ClientError as ce:
if ce.response['Error']['Code'] != 'ConditionalCheckFailedException':
if conditions:
raise ConditionCheckError(f"Condition check failed. {str(ce)}") from ce
else:
raise VersionConflictError(f"Tried to create an order with a version older "
f"than the existing order. {str(ce)}") from ce
else:
raise OrderSaveError("Unable to save the order.") from ce
@staticmethod
def from_dict(data: dict):
return Order(data)
def __init__(self, data):
"""
Generally, you should be using the factory method "create" instead.
:param data:
:type data:
"""
super().__init__(data)
@property
def order_id(self) -> str:
return self.data['id']
@property
def customer_edi_id(self) -> str:
return self.data['customer_edi_id']
@property
def customer_name(self):
from lib_b2b.customer import Customer
return Customer.for_(self.customer_edi_id).get('name')
@property
def status(self) -> OrderStatus:
_s = self.data['order_status']
if _s:
return OrderStatus.for_(_s)
else:
return OrderStatus.NONE
@status.setter
def status(self, new_status: OrderStatus):
before_status = self.status
self.modify({'order_status': new_status.value})
self.record(
ChangeRecord(before=before_status.value, after=new_status.value,
description=f"Order status changed from {before_status.value} to {new_status.value}",
when=datetime.now().astimezone(UTC))
)
def set_status(self, new_status: OrderStatus, conditions: ConditionBase = None, version: str = None):
"""
An alternative to setting the property that accepts conditions for the change
:param new_status: the order status we want to change to
:type new_status: OrderStatus
:param conditions: conditions that must evaluate to true for the update to occur
:type conditions: ConditionBase
:return: None
:rtype: None
:raise: ValueError if the conditions are not met
"""
before_status = self.status
self.modify(data={'order_status': new_status.value}, conditions=conditions, version=version)
self.record(
ChangeRecord(before=before_status.value, after=new_status.value,
description=f"Order status changed from {before_status.value} to {new_status.value}",
when=datetime.now().astimezone(UTC))
)
@property
def ship_to(self) -> Address:
return Address.from_dict(self.data['shipTo'])
@ship_to.setter
def ship_to(self, new_ship_to: Address):
self.modify({'shipTo': new_ship_to.as_dict()})
def set_ship_to(self, new_ship_to: Address, conditions: ConditionBase = None, version: str = None):
"""
An alternative to setting the property that accepts conditions for the change
:param new_ship_to: the order address we want to change to
:type new_ship_to: Address
:param conditions: conditions that must evaluate to true for the update to occur
:type conditions: ConditionBase
:return: None
:rtype: None
:raise: ValueError if the conditions are not met
"""
self.modify(data={'shipTo': new_ship_to.as_dict()}, conditions=conditions, version=version)
@property
def ship_date(self):
_s = self.data.get('make_on', None)
if _s:
return parse_dt(_s)
else:
return None
# self.__assign_make_date()
# return self.make_date
@ship_date.setter
def ship_date(self, new_ship_date: datetime):
make_on = new_ship_date.isoformat()
logger.info(f"Setting ship date to: [{new_ship_date}]")
self.modify({'make_on': make_on})
@property
def not_before_date(self):
_s = self.data.get('not_before_date', None)
if _s:
return parse_dt(_s)
else:
return None
@not_before_date.setter
def not_before_date(self, new_not_before_date: datetime):
not_before = new_not_before_date.isoformat()
logger.info(f"Setting not before date to: [{not_before}]")
self.modify({'not_before_date': not_before})
def set_not_before_date(self, new_not_before_date: datetime, conditions: ConditionBase = None, version: str = None):
not_before = new_not_before_date.isoformat()
logger.info(f"Setting not before date to: [{not_before}]")
self.modify({'not_before_date': not_before}, conditions=conditions, version=version)
@property
def order_date(self):
_s = self.data.get('order_date', None)
if _s:
return parse_dt(_s).astimezone(timezone.utc)
else:
return None
@order_date.setter
def order_date(self, new_order_date: datetime):
_order_date = new_order_date.isoformat()
logger.info(f"Setting order date to: [{_order_date}]")
self.modify({'order_date': _order_date})
@property
def make_date(self):
_s = self.data.get('make_on', None)
if _s:
return parse_dt(_s)
else:
return None
@make_date.setter
def make_date(self, new_make_date: datetime):
make_on = new_make_date.isoformat()
logger.info(f"Setting make date to: [{make_on}]")
self.modify({'make_on': make_on})
@property
def release_date(self):
_s = self.data.get('release_on', None)
if _s:
return parse_dt(_s)
else:
return None
@release_date.setter
def release_date(self, new_release_date: datetime):
release_on = new_release_date.isoformat()
logger.info(f"Setting release date to: [{release_on}]")
self.modify({'release_on': release_on})
@property
def changed_on(self) -> Optional[datetime]:
_s = self.data.get('changed_on', None)
if _s:
return parse_dt(_s)
else:
return None
@changed_on.setter
def changed_on(self, new_change_date: datetime):
changed_on = new_change_date.isoformat()
self.modify({'changed_on': changed_on})
@property
def lines(self) -> [OrderLine]:
return list(map(lambda l: OrderLine.from_dict(l), self.data.get('order_lines', [])))
@lines.setter
def lines(self, value: [OrderLine]):
_lines = list(map(lambda l: l.as_dict(), value))
self.modify(data={'order_lines': _lines})
@property
def discounts(self) -> [Discount]:
return list(map(lambda l: Discount.from_dict(l), self.data.get('discounts', [])))
@discounts.setter
def discounts(self, value: [Discount]):
_discounts = list(map(lambda l: l.as_dict(), value))
self.modify(data={'discounts': _discounts})
@property
def expedite(self) -> bool:
return self.data.get('expedite', False)
@expedite.setter
def expedite(self, value: bool):
self.modify(data={'expedite': value})
@property
def additional_charges(self) -> [AdditionalCharge]:
return list(map(lambda l: AdditionalCharge.from_dict(l), self.data.get('additional_charges', [])))
@additional_charges.setter
def additional_charges(self, value: [AdditionalCharge]):
_additional_charges = list(map(lambda l: l.as_dict(), value))
self.modify(data={'additional_charges': _additional_charges})
@property
def totals(self) -> Optional[OrderTotals]:
if 'total_amount' not in self.data:
self.totals = OrderTotals.calculate(order_lines=self.lines,
discounts=self.discounts,
additional_charges=self.additional_charges,
refunds=[])
return OrderTotals.from_dict(self.data)
@totals.setter
def totals(self, new_totals: OrderTotals):
self.modify(data=new_totals.as_dict())
def set_totals(self, new_totals: OrderTotals, conditions: ConditionBase = None, version: str = None):
self.modify(data=new_totals.as_dict(), conditions=conditions, version=version)
@property
def financial_status(self) -> FinancialStatus:
_s = self.data.get('financial_status')
if _s:
return FinancialStatus(_s)
else:
return FinancialStatus.NONE
@property
def changeable_identity(self):
return self.order_id
@property
def address_validation_result(self) -> Optional[AddressValidationResult]:
_avr_data = self.data.get('address_validation_result')
if _avr_data:
return AddressValidationResult.from_dict(_avr_data)
else:
return None
@address_validation_result.setter
def address_validation_result(self, value: AddressValidationResult):
if value:
self.modify(dict(address_validation_result=value.as_dict()))
else:
self.modify(dict(address_validation_result=None))
@financial_status.setter
def financial_status(self, new_status: FinancialStatus):
self.modify({'financial_status': new_status.value})
def set_financial_status(self, new_status: FinancialStatus, conditions: ConditionBase = None,
version: str = None):
"""
An alternative to setting the property that accepts conditions for the change
:param new_status: the order financial status we want to change to
:type new_status: FinancialStatus
:param conditions: conditions that must evaluate to true for the update to occur
:type conditions: ConditionBase
:return: None
:rtype: None
:raise: ValueError if the conditions are not met
"""
self.modify(data={'financial_status': new_status.value}, conditions=conditions, version=version)
def line_matching(self, purchase_order_line: str = None, channel_line_id: str = None,
line_id: str = None, exclude_cancelled: bool = True):
"""
Get the order line mtaching either purchase_order_line or channel_line or line_id
:param exclude_cancelled: exclude lines that have been cancelled in the result
:type exclude_cancelled: bool
:param purchase_order_line: the purchase order line number
:type purchase_order_line: str
:param channel_line_id: the channel line number
:type channel_line_id: str
:param line_id: the line identifier
:type line_id: str
:return: the order line or None if not found
:rtype: OrderLine
"""
line = None
if purchase_order_line:
line = next(filter(lambda l: l.purchase_order_line == purchase_order_line, self.lines), None)
elif channel_line_id:
line = next(filter(lambda l: l.channel_line_id == channel_line_id, self.lines), None)
elif line_id:
line = next(filter(lambda l: l.order_line_id == line_id, self.lines), None)
if exclude_cancelled and line and line.cancelled:
return None
else:
return line
def _modify_line(self, modified_line: OrderLine, conditions: ConditionBase = None, version: str = None):
_lines = list(filter(lambda x: x.order_line_id != modified_line.order_line_id, self.lines))
_lines.append(modified_line)
_lines = list(map(lambda l: l.as_dict(), _lines))
self.modify(data={'order_lines': _lines}, conditions=conditions, version=version)
def as_dict(self) -> dict:
return self.data
@xray_recorder.capture()
def __refresh(self):
dynamodb = resource('dynamodb', region_name='us-east-1')
table = dynamodb.Table(order_table)
try:
response = table.get_item(Key={'id': self.order_id})
if 'Item' in response:
item = response['Item']
self.data.update(item)
else:
raise OrderNotFoundError(f"Unable to find order when refreshing: {self.order_id}")
except ClientError as e:
raise OrderFetchError(e.response['Error']['Message'])
@xray_recorder.capture()
def modify(self, data: dict, conditions: ConditionBase = None, version: str = None):
"""
The keys in the data dict should be the names of the properties in DDB. The conditions
are a way to protect the modify. For example, if you only want the modify to complete
successfully if a particular property is missing or has a specific value.
:param data: A dictionary of name value pairs to modify on the object in the store
:type data: dict
:param conditions: A set of guard predicates
:type conditions: ConditionBase
:return: the response from DDB
:rtype: dict
"""
try:
dynamodb_resource = resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table(order_table)
_order_id = self.data['id']
update_expression_elements = []
expression_attribute_values = {}
expression_attribute_names = {}
if version:
data['version'] = version
version_conditions = Order.__get_version_conditions(version)
for key, value in data.items():
expression_attribute_values[f":{key}"] = value
expression_attribute_names[f"#{key}"] = key
update_expression_elements.append(f"#{key} = :{key}")
update_expression = f"set {', '.join(update_expression_elements)}"
if conditions:
if version_conditions:
conditions = And(conditions, version_conditions)
response = table.update_item(
Key={'id': _order_id},
UpdateExpression=update_expression,
ExpressionAttributeValues=expression_attribute_values,
ExpressionAttributeNames=expression_attribute_names,
ConditionExpression=conditions,
ReturnValues="UPDATED_NEW"
)
else:
if version_conditions:
response = table.update_item(
Key={'id': _order_id},
UpdateExpression=update_expression,
ExpressionAttributeValues=expression_attribute_values,
ExpressionAttributeNames=expression_attribute_names,
ConditionExpression=version_conditions,
ReturnValues="UPDATED_NEW"
)
else:
response = table.update_item(
Key={'id': _order_id},
UpdateExpression=update_expression,
ExpressionAttributeValues=expression_attribute_values,
ExpressionAttributeNames=expression_attribute_names,
ReturnValues="UPDATED_NEW"
)
self.data.update(data)
return response
except ClientError as ce:
if ce.response['Error']['Code'] != 'ConditionalCheckFailedException':
raise ValueError(f"Condition check failed. {str(ce)}") from ce
else:
logger.warning(ce)
def _remove(self, conditions: ConditionBase = None):
"""
Only around for tests. Don't use. Orders should never be removed, they should ne cancelled.
:param conditions: A set of guard predicates
:type conditions: ConditionBase
:return: the response from DDB
:rtype: dict
"""
try:
dynamodb_resource = resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table(order_table)
if conditions:
response = table.delete_item(
Key={'id': self.order_id},
ConditionExpression=conditions,
ReturnValues="NONE"
)
else:
response = table.delete_item(
Key={'id': self.order_id},
ReturnValues="NONE"
)
return response
except ClientError as ce:
if ce.response['Error']['Code'] != 'ConditionalCheckFailedException':
raise ce
else:
logger.warning(ce)
def chronicle(self, message: str, when: datetime = None, data: dict = None, who: str = None):
if not when:
when = datetime.now().astimezone(UTC)
if not data:
data = {}
dynamodb_resource = resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table(order_table)
_order_id = self.data['id']
historic_event = {
'message': message,
'when': when.isoformat(),
'data': data,
'who': who
}
aws_request_id = environ.get('aws_request_id')
if aws_request_id:
historic_event['aws_request_id'] = aws_request_id
historic_events = [historic_event]
result = table.update_item(
Key={'id': _order_id},
UpdateExpression="set history = list_append(if_not_exists(history, :empty_list), :historic_event)",
ExpressionAttributeValues={
':empty_list': [],
':historic_event': historic_events
},
ReturnValues="UPDATED_NEW"
)
if result['ResponseMetadata']['HTTPStatusCode'] == 200 and 'Attributes' in result:
return True
def record(self, change_record: ChangeRecord):
change_date = datetime.now().astimezone(UTC)
dynamodb_resource = resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table(order_table)
_order_id = self.data['id']
change_entry = [{
'message': str(change_record),
'data': change_record.as_dict()
}]
result = table.update_item(
Key={'id': _order_id},
UpdateExpression="set changelog = list_append(if_not_exists(changelog, :empty_list), :change_record)",
ExpressionAttributeValues={
':empty_list': [],
':change_record': change_entry
},
ReturnValues="UPDATED_NEW"
)
if result['ResponseMetadata']['HTTPStatusCode'] == 200 and 'Attributes' in result:
self.chronicle(message=change_record.description, when=change_record.when,
data=change_record.as_dict(), who=change_record.who)
self.changed_on = change_date
return True
@xray_recorder.capture()
def update_erp_reference(self):
"""
attempt to find corresponding glovia order and if it exists, record it's identifiers on the b2b order
if it doesn't exist return None
:return: The glovia order identity or None
"""
try:
# check for glovia order
order = ERP.for_(self.customer_edi_id).fetch_order_by_id(self.order_id)
data = {
'glovia_sales_order': order.sales_order,
'glovia_entry_date': order.entry_date.isoformat()
}
self.modify(data=data)
self.chronicle(message="Updated erp order reference data.", data=data)
return order
except OrderNotFoundError:
logger.info(f"Checked for erp order for order {self.order_id} and it does not yet exist.")
return None
@xray_recorder.capture()
def on_fulfillment_change(self, fulfillment_level):
_status = self.status
if fulfillment_level.total_remaining <= 0:
_status = OrderStatus.SHIPPED
elif fulfillment_level.total_remaining < fulfillment_level.total_required:
_status = OrderStatus.PARTIALLY_SHIPPED
if _status != self.status:
self.modify(
data={
'order_status': _status.value,
'tracking_urls': fulfillment_level.tracking_urls,
'fulfillment_ids': fulfillment_level.fulfillment_ids,
'last_ship_date': fulfillment_level.last_ship_date,
'ship_dates': fulfillment_level.ship_dates
}
)
@xray_recorder.capture()
def change_ship_date(self, new_ship_date: datetime, on_behalf_of: str = 'System', version=None):
try:
from lib_b2b.order_change.ship_date import OrderShipDateChangeRequest, OrderShipDateChangeDataProvider
class __OCDP(OrderShipDateChangeDataProvider):
def version(self) -> Optional[str]:
return version
@property
def new_ship_date(self) -> Optional[datetime]:
return new_ship_date
def data(self) -> dict:
return {'ship_date': new_ship_date}
change_request = OrderShipDateChangeRequest(on_behalf_of=on_behalf_of, change_data_provider=__OCDP())
return self.change([change_request])
except Exception as e:
logger.exception(e)
raise e
@xray_recorder.capture()
def accept(self, on_behalf_of: str = 'System', version=None):
try:
if self.status is not OrderStatus.READY:
raise OrderStatusChangeError(f"Invalid status change from {self.status.value} to ACCEPTED.")
from .profile import Profile
from lib_b2b.order_flow import OrderFlowType
profile = Profile.profile_for(customer=self.customer_edi_id)
order_flow_type = OrderFlowType.for_(profile.integration_config.order_flow_type)
if order_flow_type != OrderFlowType.MANUAL:
# Check to confirm that the order is already in Glovia
if 'glovia_sales_order' not in self.data:
logger.debug(f"Checking to see if order exists in Glovia {self.order_id}.")
glovia_order = self.update_erp_reference()
if not glovia_order:
raise OrderNotFoundError(
"The order must exist in glovia prior to accepting the order in the automated order flow.")
from lib_b2b.order_change.accept import OrderAcceptChangeRequest
from lib_b2b.order_change import OrderStatusChangeDataProvider
class __OCDP(OrderStatusChangeDataProvider):
def version(self) -> Optional[str]:
return version
@property
def new_status(self):
return OrderStatus.ACCEPTED
def data(self) -> dict:
return {'order_status': OrderStatus.ACCEPTED.value}
change_request = OrderAcceptChangeRequest(on_behalf_of=on_behalf_of, change_data_provider=__OCDP())
return self.change([change_request])
except Exception as e:
logger.exception(e)
raise e
@xray_recorder.capture()
def reject(self, reasons: List[RejectionReason], on_behalf_of: str = 'System', version=None):
try:
from lib_b2b.order_change.reject import OrderRejectChangeRequest, OrderRejectChangeDataProvider
class __OCDP(OrderRejectChangeDataProvider):
def version(self) -> Optional[str]:
return version
@property
def new_status(self):
return OrderStatus.REJECTED
@property
def reasons(self) -> List[RejectionReason]:
return reasons
def data(self) -> dict:
return {'order_status': OrderStatus.REJECTED.value}
change_request = OrderRejectChangeRequest(on_behalf_of=on_behalf_of,
change_data_provider=__OCDP())
return self.change([change_request])
except Exception as e:
logger.exception(e)
raise e
@xray_recorder.capture()
def change_status(self, new_status: OrderStatus, on_behalf_of: str = 'System', version=None):
try:
from lib_b2b.order_change.status import OrderStatusChangeRequest, OrderStatusChangeDataProvider
class __OCDP(OrderStatusChangeDataProvider):
def version(self) -> Optional[str]:
return version
@property
def new_status(self):
return new_status
def data(self) -> dict:
return {'order_status': new_status.value}
change_request = OrderStatusChangeRequest(on_behalf_of=on_behalf_of,
change_data_provider=__OCDP())
return self.change([change_request])
except Exception as e:
logger.exception(e)
raise e
@xray_recorder.capture()
def request_expedited_handling(self, expedite: bool, on_behalf_of: str = 'System', version=None):
try:
from lib_b2b.order_change.expedite import OrderExpediteChangeRequest, OrderExpediteChangeDataProvider
class __OCDP(OrderExpediteChangeDataProvider):
def version(self) -> Optional[str]:
return version
@property
def expedite(self) -> bool:
return expedite
def data(self) -> dict:
return {'expedite': expedite}
change_request = OrderExpediteChangeRequest(on_behalf_of=on_behalf_of,
change_data_provider=__OCDP())
return self.change([change_request])
except Exception as e:
logger.exception(e)
raise e
@xray_recorder.capture()
def reprocess(self, on_behalf_of: str = 'System', version=None):
try:
from lib_b2b.order_change.reprocess import OrderReprocessChangeRequest
from lib_b2b.order_change import OrderStatusChangeDataProvider
class __OCDP(OrderStatusChangeDataProvider):
def version(self) -> Optional[str]:
return version
@property
def new_status(self):
return OrderStatus.RECEIVED
def data(self) -> dict:
return {'order_status': OrderStatus.RECEIVED.value}
change_request = OrderReprocessChangeRequest(on_behalf_of=on_behalf_of,
change_data_provider=__OCDP())
return self.change([change_request])
except Exception as e:
logger.exception(e)
raise e
@xray_recorder.capture()
def close_as_shipped(self, on_behalf_of: str = 'System', version=None):
try:
from lib_b2b.order_change.closeasshipped import OrderCloseAsShippedChangeRequest
from lib_b2b.order_change import OrderStatusChangeDataProvider
class __OCDP(OrderStatusChangeDataProvider):
def version(self) -> Optional[str]:
return version
@property
def new_status(self):
return OrderStatus.SHIPPED
def data(self) -> dict:
return {'order_status': OrderStatus.SHIPPED.value}
change_request = OrderCloseAsShippedChangeRequest(on_behalf_of=on_behalf_of,
change_data_provider=__OCDP())
return self.change([change_request])
except Exception as e:
logger.exception(e)
raise e
@xray_recorder.capture()
def update_financial_status(self, financial_status: FinancialStatus, on_behalf_of: str = 'System', version=None):
try:
from lib_b2b.order_change.paid import OrderFinancialStatusChangeRequest, OrderFinancialStatusChangeDataProvider
class __OCDP(OrderFinancialStatusChangeDataProvider):
def version(self) -> Optional[str]:
return version
@property
def financial_status(self):
return financial_status
def data(self) -> dict:
return {'financial_status': financial_status.value}
change_request = OrderFinancialStatusChangeRequest(on_behalf_of=on_behalf_of,
change_data_provider=__OCDP())
return self.change([change_request])
except Exception as e:
logger.exception(e)
raise e
@xray_recorder.capture()
def cancel(self, cancellation_reason: str, cancelled_at: datetime,
on_behalf_of: str = 'System', version=None):
"""
Cancel the entire order
:param on_behalf_of: Who is making this change
:type on_behalf_of: str
:param cancelled_at: when the cancellation occurred
:type cancelled_at: datetime
:param cancellation_reason: reason for the cancellation
:type cancellation_reason: str
:return:
:rtype:
"""
try:
from lib_b2b.order_change.cancel import OrderCancelChangeRequest, OrderCancelChangeDataProvider
class __OCDP(OrderCancelChangeDataProvider):
def version(self) -> Optional[str]:
return version
@property
def cancellation_reason(self) -> Optional[str]:
return cancellation_reason
@property
def cancelled_on(self) -> Optional[datetime]:
return cancelled_at
def data(self) -> dict:
return {
'order_status': OrderStatus.CANCELLED.value,
'cancellation_reason': cancellation_reason,
'cancelled_on': cancelled_at
}
change_request = OrderCancelChangeRequest(on_behalf_of=on_behalf_of,
change_data_provider=__OCDP())
return self.change([change_request])
except Exception as e:
logger.exception(e)
raise OrderStatusChangeError(str(e)) from e
@xray_recorder.capture()
def can_reimport(self):
"""
Check to see if the order can be re-imported from the upstream system
:return: True/False
:rtype: bool
"""
from . import config as cfg
if self.data['integration_type'] == cfg.IntegrationType.SHOPIFY.value:
if self.data.get('channel_order_id', None):
if self.status in (OrderStatus.RECEIVED, OrderStatus.VALID, OrderStatus.READY, OrderStatus.REJECTED):
return True
return False
@xray_recorder.capture()
def reimport(self, force=False):
"""
Reimport the order from the upstream system
:param force: Force the reimport ignoring any checks
:type force: bool
:return: None
:rtype: None
"""
if self.can_reimport() or force:
from .shopify import ManualOrderImportAction
action = ManualOrderImportAction(customer_edi_id=self.customer_edi_id,
order_number=self.data['channel_order_id'],
override=True)
action.do()
@xray_recorder.capture()
def _clear_errors_and_warnings(self):
# 'SET my_list2 = list_append(if_not_exists(my_list2, :empty_list), :my_value)'
self.modify(
data={
'errors': [],
'warnings': [],
}
)
@xray_recorder.capture()
def clear_errors_and_warnings(self, on_behalf_of: str = 'System', version=None):
try:
from lib_b2b.order_change.clear_errors import OrderClearErrorsChangeRequest, OrderClearErrorsChangeDataProvider
class __OCDP(OrderClearErrorsChangeDataProvider):
def version(self) -> Optional[str]:
return version
@property
def clear_errors(self) -> bool:
return True
def data(self) -> dict:
return {}
change_request = OrderClearErrorsChangeRequest(on_behalf_of=on_behalf_of,
change_data_provider=__OCDP())
return self.change([change_request])
except Exception as e:
logger.exception(e)
raise OrderStatusChangeError(str(e)) from e
def error(self, record: ErrorRecord):
self.__save_notice(record)
def warn(self, record: WarningRecord):
self.__save_notice(record)
@xray_recorder.capture()
def __save_notice(self, record: [NoticeRecord, List[NoticeRecord]]):
dynamodb_resource = resource('dynamodb', region_name='us-east-1')
table = dynamodb_resource.Table(order_table)
if isinstance(record, List):
notices = list(map(lambda r: r.as_dict(), record))
else:
notices = [record.as_dict()]
try:
response = table.update_item(
Key={'id': self.order_id},
UpdateExpression="set #p = list_append(if_not_exists(#p, :empty_list), :notices)",
ExpressionAttributeValues={
':empty_list': [],
':notices': notices
},
ExpressionAttributeNames={
'#p': record.persist_as
},
ReturnValues="UPDATED_NEW"
)
except ClientError as ce:
if ce.response['Error']['Code'] != 'ConditionalCheckFailedException':
raise ce
except Exception as e:
raise e
@xray_recorder.capture()
def record_error(self, error_code: str, error_msg: str, field: str = None):
self.error(ErrorRecord(code=error_code, msg=error_msg, field=field))
@xray_recorder.capture()
def record_warning(self, code: str, msg: str, field: str = None):
self.warn(WarningRecord(code=code, msg=msg, field=field))
@xray_recorder.capture()
def __add_shipping(self):
"""
Adds additional charges based upon pre-agreed rate schedule to
order lines for customers that do not have free shipping. This should
be called after the order has been validated to ensure that the product id exists
and has shipping rates configured.
:return:
:rtype:
"""
from . import profile as pf
profile = pf.Profile.profile_for(self.customer_edi_id)
if not profile.fulfillment_config.free_shipping:
if 'additional_charges' not in self.data:
self.data['additional_charges'] = []
if not any('Shipping' in x.get('type', '') for x in self.data['additional_charges']):
additional_charges = []
for line in self.data['order_lines']:
shipping_info = Shipping.get_shipping_rate(customer_edi_id=self.customer_edi_id,
postal_code=self.ship_to.postalCode,
product_id=line['product_id'])
if shipping_info:
quantity = line.get('quantity', 1)
for i in range(0, int(quantity)):
additional_charges.append({
"amount": shipping_info.rate,
"tax_lines": [],
"title": "FRGT",
"type": "Shipping"
})
_before = list(map(lambda x: x.as_dict(), self.additional_charges))
self.modify(data={'additional_charges': additional_charges})
self.record(
ChangeRecord(
before=_before,
after=list(map(lambda x: x.as_dict(), self.additional_charges)),
description="Added freight as additional charges.",
when=datetime.now().astimezone(UTC)
)
)
@xray_recorder.capture()
def __add_line_amounts(self):
"""
Used for fulfillment customer orders that use pricing from the ERP. This should
be called after the order has been validated to ensure that the product id exists
and has pricing configured.
:return: None
:rtype: None
"""
from .util import as_decimal
from . import config as cfg
erp = ERP.for_(self.customer_edi_id)
if self.data['integration_type'] == cfg.IntegrationType.STANDARD.value:
order_lines = self.data['order_lines']
for line in order_lines:
if 'amount' not in line:
unit_price = erp.get_customer_price(line['product_id'])
line['amount'] = as_decimal(unit_price * as_decimal(line['quantity'], 'quantity'), 'line_amount')
self.modify(data={'order_lines': order_lines})
@xray_recorder.capture()
def __fast_forward(self):
order_lines = self.data['order_lines']
for line in order_lines:
line['shipped'] = True
line['tracking'] = get_random_tracking_number()
line['ship_date'] = datetime.now().astimezone(UTC).isoformat()
self.modify(
data={
'acknowledged': False,
'order_status': OrderStatus.SHIPPED.value,
'shipped': True,
'order_lines': order_lines
}
)
@xray_recorder.capture()
def __is_test(self):
return True if 'customer_mode' in self.data and self.data['customer_mode'] == 'TEST' else False
def __assign_release_date(self):
_before = {'release_date': self.release_date.isoformat() if self.release_date else None}
logger.info(
f"Setting embargo release date for [order_id:{self.order_id}, customer_edi_id:{self.customer_edi_id}, order_date:{self.data['order_date']}]")
date_delegate = OrderDatesDelegate(customer=self.customer_edi_id,
order_date=self.order_date,
not_before_date=self.not_before_date)
calculated_date = date_delegate.release_date
self.release_date = calculated_date.date
self.record(
ChangeRecord(
before=_before,
after={'release_date': self.release_date.isoformat()},
description=f"Assigned embargo release date of {self.release_date.strftime('%Y-%m-%d') if self.release_date else None} using "
f"rule '{calculated_date.rule}'",
when=datetime.now().astimezone(UTC)
)
)
def __assign_make_date(self):
_before = {'ship_date': self.make_date.isoformat() if self.make_date else None}
logger.info(f"Setting projected manufacture date for "
f"[order_id:{self.order_id}, customer_edi_id:{self.customer_edi_id}, order_date:{self.data['order_date']}]")
date_delegate = OrderDatesDelegate(customer=self.customer_edi_id,
order_date=self.order_date,
not_before_date=self.not_before_date)
calculated_date = date_delegate.ship_date
self.ship_date = calculated_date.date
self.record(
ChangeRecord(
before=_before,
after={'ship_date': self.make_date.isoformat() if self.make_date else None},
description=f"Assigned ship date of {self.ship_date.strftime('%Y-%m-%d') if self.ship_date else None} "
f"using rule '{calculated_date.rule}' based on {calculated_date.based_on}",
when=datetime.now().astimezone(UTC)
)
)
def recalculate_dates(self):
self.__assign_release_date()
self.__assign_make_date()
@xray_recorder.capture()
def prepare(self):
"""
Add computed properties to make the order ready for entry
:return: None
:rtype: None
"""
from lib_b2b.config import IntegrationType
if 'integration_type' not in self.data:
from . import profile as pf
profile = pf.Profile.profile_for(self.customer_edi_id)
integration_type = IntegrationType(
profile.integration_type) if profile.integration_type else IntegrationType.STANDARD
self.modify({'integration_type': integration_type.value})
else:
integration_type = IntegrationType(self.data.get('integration_type'))
# Freight/Shipping
self.__add_shipping()
# assign order dates
self.recalculate_dates()
if integration_type is IntegrationType.STANDARD:
try:
# fix up standard orders
updated_lines = []
for line in self.lines:
updated_lines.append(line.add_missing(self.customer_edi_id))
self.lines = updated_lines
logger.info(f"Updated line amounts on {self.order_id}")
# add totals if they are not on the order
if not self.totals:
self.totals = OrderTotals.calculate(order_lines=self.lines,
discounts=self.discounts,
additional_charges=self.additional_charges,
refunds=[])
logger.info(f"Updated totals on {self.order_id}")
except Exception as e:
logger.exception(e)
if self.__is_test():
self.__fast_forward()
@xray_recorder.capture()
def validate(self):
"""
Validate the order and ensure it has all of the necessary data elements. Will raise an
OrderValidationError with a list of issues if there is an error.
:return: None
:rtype: None
"""
# Clear any existing errors or warnings (for reprocessing)
self._clear_errors_and_warnings()
rejection_reasons: List[RejectionReason] = []
from . import profile as pf
profile = pf.Profile.profile_for(self.customer_edi_id)
try:
validation_result = self.ship_to.validate(customer_edi_id=self.customer_edi_id)
if validation_result:
# Save the validation result to the order for use in the user interface
self.address_validation_result = validation_result
# Taking out rejections for address problems. is_valid is not dialed in
# well enough to reject the orders based on it's logic.
# if not validation_result.is_valid():
# msg = ", ".join(validation_result.messages)
# logger.warning(f"Automatic Orders Rejection: {msg}")
# self.error(ErrorRecord(code="ADDR-2", msg=f"Invalid Address. {msg}",
# extra_data=validation_result.effective_address.as_dict()))
# rejection_reasons.append(RejectionReason(
# rejection_type='order',
# message=f"Invalid address on order {self.order_id} for customer {self.customer_edi_id} :: {msg}."
# ))
# from .notification import Notifier, NotificationType, AddressNotificationData
# validation_result = self.address_validation_result
# Notifier.notify(
# notification_type=NotificationType.ADDRESS,
# subject=f"Address Error with Order {self.order_id}",
# customer=self.customer_edi_id,
# data=AddressNotificationData(
# order_id=self.order_id,
# message=msg,
# original_address=self.ship_to,
# recommended_address=validation_result.effective_address if validation_result else None
# )
# )
# logger.warning(f"Carrier reports that the address on order {self.order_id} is not usable. {msg}")
if validation_result.should_warn() or not validation_result.is_valid():
msg = ", ".join(validation_result.messages)
self.warn(WarningRecord(code="ADDR-3", msg=f"Address warning. {msg}",
extra_data=validation_result.effective_address.as_dict()))
logger.warning(f"Carrier reports that the address on order {self.order_id} "
f"may have an issue. {msg}")
# TODO - Figure out if there is a way to add notification to Shopify timeline.
except InvalidAddressError as iae:
logger.warning(f"Automatic Orders Rejection: {str(iae)}")
self.error(ErrorRecord(code="ADDR-1", msg=f"Invalid Address. {iae.sanitized_message}"))
rejection_reasons.append(RejectionReason(
rejection_type='order',
message=f"Invalid address on order {self.order_id} for customer "
f"{self.customer_edi_id} :: {iae.sanitized_message}."
))
from .notification import Notifier, NotificationType, AddressNotificationData
Notifier.notify(
notification_type=NotificationType.ADDRESS,
subject=f"Address Error with Order {self.order_id}",
customer=self.customer_edi_id,
data=AddressNotificationData(
order_id=self.order_id,
message=iae.sanitized_message,
original_address=self.ship_to,
recommended_address=None
)
)
except Exception as e:
# The validation itself failed, don't reject for that.
logger.exception(e)
for line in self.lines:
try:
requires_pricing = bool(profile.integration_config.requires_pricing)
erp = ERP.for_(self.customer_edi_id)
erp_customer = erp.fetch_customer(self.customer_edi_id)
try:
erp_customer.is_valid(customer_item_name=line.product_id, requires_pricing=requires_pricing)
if not profile.fulfillment_config.free_shipping:
Shipping.get_shipping_rate(customer_edi_id=self.customer_edi_id,
postal_code=self.ship_to.postalCode,
product_id=line.product_id)
except (InvalidCustomerItemError, ItemPricingNotFound) as icie1:
if line.product_external_id:
try:
# let's try the product_external_id to see if it will work - IndigoMalase
erp_customer.is_valid(customer_item_name=line.product_external_id,
requires_pricing=requires_pricing)
if not profile.fulfillment_config.free_shipping:
Shipping.get_shipping_rate(customer_edi_id=self.customer_edi_id,
postal_code=self.ship_to.postalCode,
product_id=line.product_external_id)
# if we got here, the the external id is in the sytem. So, let's fix the product_id
line.product_id = line.product_external_id
self._modify_line(line)
except InvalidCustomerItemError as icie2:
logger.warning(f"Automatic Orders Rejection: {str(icie2)}")
msg = f"{icie1.sanitized_message}"
self.error(ErrorRecord(code='CUSITEM-1', msg=msg))
rejection_reasons.append(
RejectionReason(
rejection_type='order_line',
message=msg
)
)
except ItemPricingNotFound as ipnf:
logger.warning(f"Automatic Orders Rejection: {str(ipnf)}")
msg = f"Pricing not found for customer item {line.product_id} " \
f"from order line {line.order_line_id}. " \
f"{ipnf.sanitized_message}"
self.error(ErrorRecord(code='ITEMPRC-1', msg=msg))
rejection_reasons.append(
RejectionReason(
rejection_type='order_line',
message=msg
)
)
except ShipRateNotFound as srnf:
logger.warning(f"Automatic Orders Rejection: {str(srnf)}")
msg = f"Shipping rate not found for the customer item {line.product_id} " \
f"with a destination postal code of {self.ship_to.postalCode} " \
f"for customer {self.customer_edi_id} " \
f"from order line {line.order_line_id}. " \
f"{srnf.sanitized_message}"
self.error(ErrorRecord(code='SHIP-1', msg=msg))
rejection_reasons.append(
RejectionReason(
rejection_type='order',
message=msg
)
)
except ShipZoneNotFound as sznf:
logger.warning(f"Automatic Orders Rejection: {str(sznf)}")
msg = f"Shipping zone not found for postal code {self.ship_to.postalCode} {sznf.message}"
self.error(ErrorRecord(code='SHIP-2', msg=msg))
rejection_reasons.append(
RejectionReason(
rejection_type='order',
message=msg
)
)
except ShipRateNotFound as srnf:
logger.warning(f"Automatic Orders Rejection: {str(srnf)}")
msg = f"Shipping rate not found for the customer item {line.product_id} " \
f"with a destination postal code of {self.ship_to.postalCode} " \
f"for customer {self.customer_edi_id} " \
f"from order line {line.order_line_id}. " \
f"{srnf.sanitized_message}"
self.error(ErrorRecord(code='SHIP-1', msg=msg))
rejection_reasons.append(
RejectionReason(
rejection_type='order',
message=msg
)
)
except ShipZoneNotFound as sznf:
logger.warning(f"Automatic Orders Rejection: {str(sznf)}")
msg = f"Shipping zone not found for postal code {self.ship_to.postalCode}. {sznf.message}"
self.error(ErrorRecord(code='SHIP-2', msg=msg))
rejection_reasons.append(
RejectionReason(
rejection_type='order',
message=msg
)
)
except Exception as e:
# The validation itself failed, don't reject for that.
logger.exception(e)
logger.info(f"rejection_reasons", extra=log_data(reasons=rejection_reasons))
if rejection_reasons:
raise OrderValidationError(message=f"Order {self.order_id} had validation errors.",
reasons=rejection_reasons)