Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
lib-py-b2b / event.py
Size: Mime:
from enum import Enum
import json
import boto3


class B2BTopic(Enum):
    RECEIVED = 'b2b_order_received'
    VALID = 'b2b_order_validated'
    READY = 'b2b_order_ready'
    ACCEPTED = 'b2b_order_accepted'
    REJECTED = 'b2b_order_rejected'
    ENTERED = 'b2b_order_entered'
    CANCELLED = 'b2b_order_cancelled'
    SHIPPED = 'b2b_order_shipped'
    ERROR = 'b2b_order_error'
    PENDING = 'b2b_fulfill_pending'
    NOT_SENT = 'b2b_fulfill_not_sent'
    SENT = 'b2b_fulfill_sent'
    FAILED = 'b2b_fulfill_failed'


class B2BEvent:
    @staticmethod
    def notify(topic: B2BTopic, order_id: str):
        message = {"order_id": order_id}
        client = boto3.client('sns', region_name='us-east-1')
        # Create the topic if it doesn't exist (this is idempotent)
        topic = client.create_topic(Name=topic.value)
        topic_arn = topic['TopicArn']  # get its Amazon Resource Name

        response = client.publish(
            TopicArn=topic_arn,
            Message=json.dumps({'default': json.dumps(message)}),
            MessageStructure='json'
        )