Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
hub-client / tests / marketo / test_marketo_adapter.py
Size: Mime:
import json
import datetime

import httpretty
import pytz
from django.test import TestCase
from django.test.utils import override_settings
from django.utils import timezone

from dockerhub.marketo.adapter import (
    MarketoMailingListImpl, LeadManagerImpl, MarketoClientConfig,
    CampaignManagerImpl)
from dockerhub.marketo import constants
from dockerhub.marketo.exceptions import (
    UnableToCompleteRequestException, MarketoImproperlyConfigured,
    InvalidActivityDateException, UnknownActivityTypeException)
from dockerhub.test.marketo import test_mixins


class TestMarketoAdapterFixture(TestCase):
    mailing_list_id = "1015"
    email = "fake@email.com"
    lead_id = 1
    cookie = 'test_cookie_id'
    field_name = 'test'
    field_value = 'value'
    extra_values = dict(
        is_awesome=True,
        firstName='John',
        lastName='Costa'
    )
    campaign_id = 1051

    def setUp(self):
        self.mailing_list_adapter = MarketoMailingListImpl()
        self.campaign_adapter = CampaignManagerImpl()


class TestMailingListAdapterIsSubscribedAdapter(
        test_mixins.MarketoRequiresOauthTokenMixin,
        test_mixins.MarketoGetLeadMixin,
        test_mixins.MarketoAddLeadToListMixin,
        test_mixins.MarketoIsSubscribedMixin,
        test_mixins.MarketoMemberOfListMixin,
        test_mixins.MarketoRemoveFromListMixin,
        TestMarketoAdapterFixture):
    @httpretty.activate
    def test(self):

        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
        httpretty.register_uri(
            httpretty.GET,
            self.GET_LEAD_URL,
            content_type="application/json",
            body=json.dumps(self.GET_LEAD_RESPONSE)
        )
        httpretty.register_uri(
            httpretty.POST,
            self.POST_LEAD_URL,
            content_type="application/json",
            body=json.dumps(self.POST_LEAD_RESPONSE)
        )
        httpretty.register_uri(
            httpretty.POST,
            self.POST_TO_LIST_URL.format(self.mailing_list_id),
            content_type="application/json",
            body=json.dumps(self.POST_TO_LIST_RESPONSE)
        )
        httpretty.register_uri(
            httpretty.GET,
            self.GET_LEADS_ON_LIST.format(self.mailing_list_id),
            content_type="application/json",
            body=json.dumps(self.GET_LEADS_ON_LIST_IN_LIST_RESPONSE)
        )
        httpretty.register_uri(
            httpretty.GET,
            self.MEMBER_OF_LIST_URL.format(self.mailing_list_id),
            content_type="application/json",
            body=json.dumps(self.GET_MEMBER_OF_LIST_RESPONSE)
        )

        self.assertTrue(
            self.mailing_list_adapter.is_subscribed(
                self.mailing_list_id, self.email
            )
        )


class TestMailingListAdapterIsSubscribedAdapterException(
        TestMailingListAdapterIsSubscribedAdapter):
    @httpretty.activate
    def test(self):

        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE),
            status=500
        )

        with self.assertRaises(UnableToCompleteRequestException):
            self.mailing_list_adapter.is_subscribed(
                self.mailing_list_id, self.email
            )


class TestMailingListSubscribeAdapter(test_mixins.MarketoRequiresOauthTokenMixin,
                                      test_mixins.MarketoGetLeadMixin,
                                      test_mixins.MarketoAddLeadToListMixin,
                                      test_mixins.MarketoIsSubscribedMixin,
                                      test_mixins.MarketoMemberOfListMixin,
                                      test_mixins.MarketoRemoveFromListMixin,
                                      TestMarketoAdapterFixture):
    mailing_list_id = 1015

    @httpretty.activate
    def test(self):

        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
        httpretty.register_uri(
            httpretty.GET,
            self.GET_LEAD_URL,
            content_type="application/json",
            body=json.dumps(self.GET_LEAD_RESPONSE)
        )
        httpretty.register_uri(
            httpretty.POST,
            self.GET_LEAD_URL,
            content_type="application/json",
            body=json.dumps(self.POST_LEAD_RESPONSE)
        )
        httpretty.register_uri(
            httpretty.POST,
            self.POST_TO_LIST_URL.format(self.mailing_list_id),
            content_type="application/json",
            body=json.dumps(self.POST_TO_LIST_RESPONSE)
        )
        httpretty.register_uri(
            httpretty.GET,
            self.GET_LEADS_ON_LIST.format(self.mailing_list_id),
            content_type="application/json",
            body=json.dumps(self.GET_LEADS_ON_LIST_IN_LIST_RESPONSE)
        )
        httpretty.register_uri(
            httpretty.GET,
            self.MEMBER_OF_LIST_URL,
            content_type="application/json",
            body=json.dumps(self.GET_MEMBER_OF_LIST_RESPONSE)
        )

        self.assertTrue(
            self.mailing_list_adapter.subscribe(
                self.mailing_list_id, self.email
            )
        )


class TestMailingListSubscribeAdapterException(
        TestMailingListSubscribeAdapter):
    @httpretty.activate
    def test_subscribe(self):

        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE),
            status=500
        )

        with self.assertRaises(UnableToCompleteRequestException):
            self.mailing_list_adapter.subscribe(
                self.mailing_list_id, self.email
            )


class TestMailingListUnsubscribeAdapter(test_mixins.MarketoRequiresOauthTokenMixin,
                                        test_mixins.MarketoGetLeadMixin,
                                        test_mixins.MarketoRemoveFromListMixin,
                                        test_mixins.MarketoIsSubscribedMixin,
                                        test_mixins.MarketoMemberOfListMixin,
                                        TestMarketoAdapterFixture):
    mailing_list_id = 1015

    @httpretty.activate
    def test(self):

        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
        httpretty.register_uri(
            httpretty.POST,
            self.GET_LEAD_URL,
            content_type="application/json",
            body=json.dumps(self.POST_LEAD_RESPONSE)
        )
        httpretty.register_uri(
            httpretty.DELETE,
            self.DELETE_FROM_LIST_URL.format(self.mailing_list_id),
            content_type="application/json",
            body=json.dumps(self.DELETE_FROM_LIST_RESPONSE_NOT_IN_LIST)
        )
        httpretty.register_uri(
            httpretty.GET,
            self.GET_LEADS_ON_LIST.format(self.mailing_list_id),
            content_type="application/json",
            body=json.dumps(self.GET_LEADS_ON_LIST_NOT_IN_LIST_RESPONSE)
        )
        httpretty.register_uri(
            httpretty.GET,
            self.MEMBER_OF_LIST_URL.format(self.mailing_list_id),
            content_type="application/json",
            body=json.dumps(self.GET_MEMBER_OF_LIST_RESPONSE)
        )

        self.assertTrue(
            self.mailing_list_adapter.unsubscribe(
                self.mailing_list_id, self.email
            )
        )


class TestMailingListUnsubscribeAdapterException(
        TestMailingListUnsubscribeAdapter):
    @httpretty.activate
    def test(self):

        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE),
            status=500)

        with self.assertRaises(UnableToCompleteRequestException):
            self.mailing_list_adapter.unsubscribe(
                self.mailing_list_id, self.email
            )


class TestCreateLeadAdapter(test_mixins.MarketoRequiresOauthTokenMixin,
                            test_mixins.MarketoAddLeadMixin,
                            test_mixins.MarketoAssociateLeadMixin,
                            TestMarketoAdapterFixture):
    expected_lead_ids = [1097532]
    extra_values = dict(firstName='john')
    cookie = 'cookie123'

    def setUp(self):
        self.lead_adapter = LeadManagerImpl()

    @httpretty.activate
    def test_create_lead(self):
        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
        httpretty.register_uri(
            httpretty.POST,
            self.POST_LEAD_URL,
            content_type="application/json",
            body=json.dumps(self.POST_LEAD_RESPONSE))
        httpretty.register_uri(
            httpretty.POST,
            self.ASSOCIATE_LEAD_URL.format(1097532),  # see fixture
            content_type="application/json",
            body=json.dumps(self.ASSOCIATE_LEAD_RESPONSE))

        self.result = self.lead_adapter.create_lead(
            self.email, extra_values=self.extra_values,
            cookie=self.cookie
        )
        self.assertEqual(
            self.result,
            self.expected_lead_ids
        )


class TestCreateLeadAdapterException(TestCreateLeadAdapter):
    expected_lead_ids = []

    @httpretty.activate
    def test_create_lead(self):
        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE),
            status=500)

        with self.assertRaises(UnableToCompleteRequestException):
            self.lead_adapter.create_lead(self.email)


class TestCreateLeadAdapterCookieException(TestCreateLeadAdapter):
    expected_lead_ids = [1097532]  # lead still created

    @httpretty.activate
    def test_create_lead(self):
        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
        httpretty.register_uri(
            httpretty.POST,
            self.POST_LEAD_URL,
            content_type="application/json",
            body=json.dumps(self.POST_LEAD_RESPONSE))
        httpretty.register_uri(
            httpretty.POST,
            self.ASSOCIATE_LEAD_URL.format(1097532),  # see fixture
            content_type="application/json",
            body=json.dumps(self.ASSOCIATE_LEAD_RESPONSE),
            status=500)

        with self.assertRaises(UnableToCompleteRequestException):
            self.lead_adapter.create_lead(
                self.email, extra_values=self.extra_values,
                cookie=self.cookie
            )


class TestMissingDjangoSettings(TestCase):
    @override_settings(MARKETO_HOST=None)
    def test_missing_host(self):
        with self.assertRaises(MarketoImproperlyConfigured):
            MarketoClientConfig()

    @override_settings(MARKETO_CLIENT_ID=None)
    def test_missing_client_id(self):
        with self.assertRaises(MarketoImproperlyConfigured):
            MarketoClientConfig()

    @override_settings(MARKETO_SECRET=None)
    def test_missing_client_secret(self):
        with self.assertRaises(MarketoImproperlyConfigured):
            MarketoClientConfig()


class TestCreateActivityAdapter(test_mixins.MarketoCreateActivityMixin,
                                TestCreateLeadAdapter):
    email = 'fake@fake.email.com'
    activity_type = constants.DTR_BINARY_DOWNLOAD
    activity_date = timezone.now()
    attributes = dict(url='http://some.fake.url/yadda/yadda')

    @httpretty.activate
    def test(self):
        # authenticate
        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE))

        # find lead
        httpretty.register_uri(
            httpretty.POST,
            self.POST_LEAD_URL,
            content_type="application/json",
            body=json.dumps(self.POST_LEAD_RESPONSE))

        # create activity
        httpretty.register_uri(
            httpretty.POST,
            self.POST_CREATE_ACTIVITY_URL,
            content_type="application/json",
            body=json.dumps(self.POST_CREATE_ACTIVITY_RESPONSE))

        self.assertTrue(self.lead_adapter.create_activity(
            self.email,
            self.activity_type,
            self.activity_date,
            attributes=self.attributes
        ))

    @httpretty.activate
    def test_unable_to_find_lead(self):
        # authenticate
        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE))

        # find lead - 500
        httpretty.register_uri(
            httpretty.POST,
            self.POST_LEAD_URL,
            content_type="application/json",
            body=json.dumps(self.POST_LEAD_RESPONSE),
            status=500)

        with self.assertRaises(UnableToCompleteRequestException):
            self.lead_adapter.create_activity(
                self.email,
                self.activity_type,
                self.activity_date,
                attributes=self.attributes
            )

    @httpretty.activate
    def test_unable_to_create_activity(self):
        # authenticate
        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE))

        # find lead
        httpretty.register_uri(
            httpretty.POST,
            self.POST_LEAD_URL,
            content_type="application/json",
            body=json.dumps(self.POST_LEAD_RESPONSE))

        # create activity - 400
        httpretty.register_uri(
            httpretty.POST,
            self.POST_CREATE_ACTIVITY_URL,
            content_type="application/json",
            body=json.dumps(self.POST_CREATE_ACTIVITY_RESPONSE),
            status=400)

        with self.assertRaises(UnableToCompleteRequestException):
            self.lead_adapter.create_activity(
                self.email,
                self.activity_type,
                self.activity_date,
                attributes=self.attributes
            )

    @httpretty.activate
    def test_unable_to_encode_date(self):
        # authenticate
        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE))

        # find lead
        httpretty.register_uri(
            httpretty.POST,
            self.POST_LEAD_URL,
            content_type="application/json",
            body=json.dumps(self.POST_LEAD_RESPONSE))

        # create activity
        httpretty.register_uri(
            httpretty.POST,
            self.POST_CREATE_ACTIVITY_URL,
            content_type="application/json",
            body=json.dumps(self.POST_CREATE_ACTIVITY_RESPONSE))

        with self.assertRaises(InvalidActivityDateException):
            self.lead_adapter.create_activity(
                self.email,
                self.activity_type,
                None,
                attributes=self.attributes
            )

        with self.assertRaises(InvalidActivityDateException):
            self.lead_adapter.create_activity(
                self.email,
                self.activity_type,
                "Johnny",
                attributes=self.attributes
            )

        with self.assertRaises(InvalidActivityDateException):
            self.lead_adapter.create_activity(
                self.email,
                self.activity_type,
                1,
                attributes=self.attributes
            )

    @httpretty.activate
    def test_invalid_activity_tyoe(self):
        # authenticate
        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE))

        # find lead
        httpretty.register_uri(
            httpretty.POST,
            self.POST_LEAD_URL,
            content_type="application/json",
            body=json.dumps(self.POST_LEAD_RESPONSE))

        # create activity
        httpretty.register_uri(
            httpretty.POST,
            self.POST_CREATE_ACTIVITY_URL,
            content_type="application/json",
            body=json.dumps(self.POST_CREATE_ACTIVITY_RESPONSE))

        with self.assertRaises(UnknownActivityTypeException):
            self.lead_adapter.create_activity(
                self.email,
                "bodybuilding",
                self.activity_date,
                attributes=self.attributes
            )


class TestAdapterKeyConversion(TestCase):
    def setUp(self):
        self.result = (
            LeadManagerImpl._convert_keyvals_to_custom_activity_format(
                {
                    "SubType": "Ubuntu",
                    "UserName": "toli",
                    "UUID": "123-abc-xyz-456"
                }
            )
        )

    def test(self):
        self.assertEqual(
            # order seems to matter to the unit test
            # but the api doesn't really care
            self.result,
            [
                {
                    "name": "UserName",
                    "value": "toli",
                },
                {
                    "name": "SubType",
                    "value": "Ubuntu",
                },
                {
                    "name": "UUID",
                    "value": "123-abc-xyz-456",
                },
            ]
        )


class TestActivityDateConversion(TestCase):
    """
    Simple case is to handle datetime naive objects
    """
    datetime_now = datetime.datetime.now()
    expected_datetime = datetime_now.isoformat()

    def setUp(self):
        self.result = LeadManagerImpl._convert_date_to_custom_activity_format(
            self.datetime_now
        )

    def test(self):
        self.assertEqual(self.result, self.expected_datetime)


class TestActivityDateConversionTimezoneAware(TestActivityDateConversion):
    datetime_now = timezone.now()

    # utz date, no tzinfo, iso formatted
    expected_datetime = datetime_now.astimezone(pytz.utc).replace(
        tzinfo=None
    ).isoformat()


class TestActivityDateConversionStringNaive(TestActivityDateConversion):
    datetime_now = "2015-08-20T17:03:33.238789"
    expected_datetime = "2015-08-20T17:03:33.238789"


class TestActivityDateConversionStringTZAware(TestActivityDateConversion):
    datetime_now = "2015-08-20T17:03:33.238789+00:00"
    expected_datetime = "2015-08-20T17:03:33.238789"


class TestActivityDateInvalidDate(TestCase):
    datetime_now = "johnisnotadate"  # really :)
    expected_datetime = None

    def test(self):
        with self.assertRaises(InvalidActivityDateException):
            self.result = \
                LeadManagerImpl._convert_date_to_custom_activity_format(
                    self.datetime_now
                )


class TestActivityDateEmptyDate(TestCase):
    datetime_now = None
    expected_datetime = None

    def test(self):
        with self.assertRaises(InvalidActivityDateException):
            self.result = \
                LeadManagerImpl._convert_date_to_custom_activity_format(
                    self.datetime_now
                )


class TestActivityDateBadMonth(TestCase):
    datetime_now = "2015-25-20T17:03:33"  # bad month
    expected_datetime = None

    def test(self):
        with self.assertRaises(InvalidActivityDateException):
            self.result = \
                LeadManagerImpl._convert_date_to_custom_activity_format(
                    self.datetime_now
                )


class TestRequestCampaign(test_mixins.MarketoRequestCampaignMixin,
                          test_mixins.MarketoRequiresOauthTokenMixin,
                          TestMarketoAdapterFixture):
    @httpretty.activate
    def test_valid_campaign_request(self):
        # authenticate
        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE))

        # trigger request campaign
        httpretty.register_uri(
            httpretty.POST,
            self.REQUEST_CAMPAIGN_URL.format(self.campaign_id),
            content_type="application/json",
            body=json.dumps(self.POST_VALID_CAMPAIGN_REQUEST_RESPONSE))

        self.leads = [{"id": self.lead_id}]
        result = self.campaign_adapter.request_campaign(self.campaign_id, self.leads)

        self.assertTrue(result)

    @httpretty.activate
    def test_invalid_campaign_request(self):
        # authenticate
        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE))

        # trigger request campaign
        httpretty.register_uri(
            httpretty.POST,
            self.REQUEST_CAMPAIGN_URL.format(self.campaign_id),
            content_type="application/json",
            body=json.dumps(self.POST_INVALID_CAMPAIGN_REQUEST_RESPONSE))

        self.leads = [{"id": self.lead_id}]
        result = self.campaign_adapter.request_campaign(self.campaign_id, self.leads)

        self.assertFalse(result)

    @httpretty.activate
    def test_campaign_request_without_campaign_id(self):
        # authenticate
        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE))

        with self.assertRaises(UnableToCompleteRequestException):
            self.leads = [{"id": self.lead_id}]
            result = self.campaign_adapter.request_campaign(None, self.leads)

    @httpretty.activate
    def test_campaign_request_without_lead_id(self):
        # authenticate
        httpretty.register_uri(
            httpretty.GET,
            self.OAUTH_TOKEN_URL,
            content_type="application/json",
            body=json.dumps(self.OAUTH_TOKEN_RESPONSE))

        with self.assertRaises(UnableToCompleteRequestException):
            result = self.campaign_adapter.request_campaign(self.campaign_id, None)