Repository URL to install this package:
|
Version:
0.4.200 ▾
|
lib-py-b2b
/
order_request_queue.py
|
|---|
import json
import boto3
from botocore.exceptions import ClientError
from pprint import pformat
import logging
from os import environ
from lib_b2b.order_request import OrderRequest
from lib_b2b.request import Request
logger = logging.getLogger(__name__)
class OrderRequestQueue:
ORDER_REQUEST_QUEUE = environ['ORDER_REQUEST_QUEUE'] if 'ORDER_REQUEST_QUEUE' in environ else 'B2B_ORDER_REQUEST_QUEUE_DEV'
ORDER_REQUEST_DLQ = environ['ORDER_REQUEST_DLQ'] if 'ORDER_REQUEST_DLQ' in environ else 'B2B_ORDER_REQUEST_DLQ_DEV'
@staticmethod
def setup_queues():
sqs = boto3.resource('sqs')
dlq = sqs.create_queue(QueueName=OrderRequestQueue.ORDER_REQUEST_DLQ)
redrive_policy = {
'deadLetterTargetArn': dlq.attributes['QueueArn'],
'maxReceiveCount': '10'
}
queue = sqs.create_queue(QueueName=OrderRequestQueue.ORDER_REQUEST_QUEUE,
Attributes={'DelaySeconds': '0',
'VisibilityTimeout': '60',
'RedrivePolicy': json.dumps(redrive_policy)})
return queue
@staticmethod
def __get_order_queue():
try:
sqs = boto3.resource('sqs')
return sqs.get_queue_by_name(QueueName=OrderRequestQueue.ORDER_REQUEST_QUEUE)
except ClientError as ce:
if ce.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
return OrderRequestQueue.setup_queues()
else:
logger.exception("Error received trying to access the print queue.")
return None
@staticmethod
def as_request(record) -> Request:
return Request.from_json(record.body)
@staticmethod
def add(order_request: OrderRequest):
"""
Takes an OrderRequest object. This ensures that the request ahs already been properly
validated as this is done during object initiation.
:param order_request: the order request object for the request
:type order_request: OrderRequest
:return: the message id for the enqueued message
:rtype: str
"""
request: Request = order_request.request
logger.debug(f"Adding order request to queue {pformat(request.as_dict())} to {OrderRequestQueue.ORDER_REQUEST_QUEUE}")
queue = OrderRequestQueue.__get_order_queue()
response = queue.send_message(MessageBody=request.as_json())
return response['MessageId']