Repository URL to install this package:
|
Version:
0.4.139 ▾
|
lib-py-b2b
/
event.py
|
|---|
from enum import Enum
import json
import boto3
class B2BTopic(Enum):
RECEIVED = 'b2b_order_received'
VALID = 'b2b_order_validated'
READY = 'b2b_order_ready'
ACCEPTED = 'b2b_order_accepted'
REJECTED = 'b2b_order_rejected'
ENTERED = 'b2b_order_entered'
CANCELLED = 'b2b_order_cancelled'
SHIPPED = 'b2b_order_shipped'
ERROR = 'b2b_order_error'
PENDING = 'b2b_fulfill_pending'
NOT_SENT = 'b2b_fulfill_not_sent'
SENT = 'b2b_fulfill_sent'
FAILED = 'b2b_fulfill_failed'
class B2BEvent:
@staticmethod
def notify(topic: B2BTopic, order_id: str):
message = {"order_id": order_id}
client = boto3.client('sns', region_name='us-east-1')
# Create the topic if it doesn't exist (this is idempotent)
topic = client.create_topic(Name=topic.value)
topic_arn = topic['TopicArn'] # get its Amazon Resource Name
response = client.publish(
TopicArn=topic_arn,
Message=json.dumps({'default': json.dumps(message)}),
MessageStructure='json'
)