Repository URL to install this package:
|
Version:
0.32.0 ▾
|
import json
import datetime
import httpretty
import pytz
from django.test import TestCase
from django.test.utils import override_settings
from django.utils import timezone
from dockerhub.marketo.adapter import (
MarketoMailingListImpl, LeadManagerImpl, MarketoClientConfig,
CampaignManagerImpl)
from dockerhub.marketo import constants
from dockerhub.marketo.exceptions import (
UnableToCompleteRequestException, MarketoImproperlyConfigured,
InvalidActivityDateException, UnknownActivityTypeException)
from dockerhub.test.marketo import test_mixins
class TestMarketoAdapterFixture(TestCase):
mailing_list_id = "1015"
email = "fake@email.com"
lead_id = 1
cookie = 'test_cookie_id'
field_name = 'test'
field_value = 'value'
extra_values = dict(
is_awesome=True,
firstName='John',
lastName='Costa'
)
campaign_id = 1051
def setUp(self):
self.mailing_list_adapter = MarketoMailingListImpl()
self.campaign_adapter = CampaignManagerImpl()
class TestMailingListAdapterIsSubscribedAdapter(
test_mixins.MarketoRequiresOauthTokenMixin,
test_mixins.MarketoGetLeadMixin,
test_mixins.MarketoAddLeadToListMixin,
test_mixins.MarketoIsSubscribedMixin,
test_mixins.MarketoMemberOfListMixin,
test_mixins.MarketoRemoveFromListMixin,
TestMarketoAdapterFixture):
@httpretty.activate
def test(self):
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
httpretty.register_uri(
httpretty.GET,
self.GET_LEAD_URL,
content_type="application/json",
body=json.dumps(self.GET_LEAD_RESPONSE)
)
httpretty.register_uri(
httpretty.POST,
self.POST_LEAD_URL,
content_type="application/json",
body=json.dumps(self.POST_LEAD_RESPONSE)
)
httpretty.register_uri(
httpretty.POST,
self.POST_TO_LIST_URL.format(self.mailing_list_id),
content_type="application/json",
body=json.dumps(self.POST_TO_LIST_RESPONSE)
)
httpretty.register_uri(
httpretty.GET,
self.GET_LEADS_ON_LIST.format(self.mailing_list_id),
content_type="application/json",
body=json.dumps(self.GET_LEADS_ON_LIST_IN_LIST_RESPONSE)
)
httpretty.register_uri(
httpretty.GET,
self.MEMBER_OF_LIST_URL.format(self.mailing_list_id),
content_type="application/json",
body=json.dumps(self.GET_MEMBER_OF_LIST_RESPONSE)
)
self.assertTrue(
self.mailing_list_adapter.is_subscribed(
self.mailing_list_id, self.email
)
)
class TestMailingListAdapterIsSubscribedAdapterException(
TestMailingListAdapterIsSubscribedAdapter):
@httpretty.activate
def test(self):
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE),
status=500
)
with self.assertRaises(UnableToCompleteRequestException):
self.mailing_list_adapter.is_subscribed(
self.mailing_list_id, self.email
)
class TestMailingListSubscribeAdapter(test_mixins.MarketoRequiresOauthTokenMixin,
test_mixins.MarketoGetLeadMixin,
test_mixins.MarketoAddLeadToListMixin,
test_mixins.MarketoIsSubscribedMixin,
test_mixins.MarketoMemberOfListMixin,
test_mixins.MarketoRemoveFromListMixin,
TestMarketoAdapterFixture):
mailing_list_id = 1015
@httpretty.activate
def test(self):
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
httpretty.register_uri(
httpretty.GET,
self.GET_LEAD_URL,
content_type="application/json",
body=json.dumps(self.GET_LEAD_RESPONSE)
)
httpretty.register_uri(
httpretty.POST,
self.GET_LEAD_URL,
content_type="application/json",
body=json.dumps(self.POST_LEAD_RESPONSE)
)
httpretty.register_uri(
httpretty.POST,
self.POST_TO_LIST_URL.format(self.mailing_list_id),
content_type="application/json",
body=json.dumps(self.POST_TO_LIST_RESPONSE)
)
httpretty.register_uri(
httpretty.GET,
self.GET_LEADS_ON_LIST.format(self.mailing_list_id),
content_type="application/json",
body=json.dumps(self.GET_LEADS_ON_LIST_IN_LIST_RESPONSE)
)
httpretty.register_uri(
httpretty.GET,
self.MEMBER_OF_LIST_URL,
content_type="application/json",
body=json.dumps(self.GET_MEMBER_OF_LIST_RESPONSE)
)
self.assertTrue(
self.mailing_list_adapter.subscribe(
self.mailing_list_id, self.email
)
)
class TestMailingListSubscribeAdapterException(
TestMailingListSubscribeAdapter):
@httpretty.activate
def test_subscribe(self):
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE),
status=500
)
with self.assertRaises(UnableToCompleteRequestException):
self.mailing_list_adapter.subscribe(
self.mailing_list_id, self.email
)
class TestMailingListUnsubscribeAdapter(test_mixins.MarketoRequiresOauthTokenMixin,
test_mixins.MarketoGetLeadMixin,
test_mixins.MarketoRemoveFromListMixin,
test_mixins.MarketoIsSubscribedMixin,
test_mixins.MarketoMemberOfListMixin,
TestMarketoAdapterFixture):
mailing_list_id = 1015
@httpretty.activate
def test(self):
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
httpretty.register_uri(
httpretty.POST,
self.GET_LEAD_URL,
content_type="application/json",
body=json.dumps(self.POST_LEAD_RESPONSE)
)
httpretty.register_uri(
httpretty.DELETE,
self.DELETE_FROM_LIST_URL.format(self.mailing_list_id),
content_type="application/json",
body=json.dumps(self.DELETE_FROM_LIST_RESPONSE_NOT_IN_LIST)
)
httpretty.register_uri(
httpretty.GET,
self.GET_LEADS_ON_LIST.format(self.mailing_list_id),
content_type="application/json",
body=json.dumps(self.GET_LEADS_ON_LIST_NOT_IN_LIST_RESPONSE)
)
httpretty.register_uri(
httpretty.GET,
self.MEMBER_OF_LIST_URL.format(self.mailing_list_id),
content_type="application/json",
body=json.dumps(self.GET_MEMBER_OF_LIST_RESPONSE)
)
self.assertTrue(
self.mailing_list_adapter.unsubscribe(
self.mailing_list_id, self.email
)
)
class TestMailingListUnsubscribeAdapterException(
TestMailingListUnsubscribeAdapter):
@httpretty.activate
def test(self):
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE),
status=500)
with self.assertRaises(UnableToCompleteRequestException):
self.mailing_list_adapter.unsubscribe(
self.mailing_list_id, self.email
)
class TestCreateLeadAdapter(test_mixins.MarketoRequiresOauthTokenMixin,
test_mixins.MarketoAddLeadMixin,
test_mixins.MarketoAssociateLeadMixin,
TestMarketoAdapterFixture):
expected_lead_ids = [1097532]
extra_values = dict(firstName='john')
cookie = 'cookie123'
def setUp(self):
self.lead_adapter = LeadManagerImpl()
@httpretty.activate
def test_create_lead(self):
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
httpretty.register_uri(
httpretty.POST,
self.POST_LEAD_URL,
content_type="application/json",
body=json.dumps(self.POST_LEAD_RESPONSE))
httpretty.register_uri(
httpretty.POST,
self.ASSOCIATE_LEAD_URL.format(1097532), # see fixture
content_type="application/json",
body=json.dumps(self.ASSOCIATE_LEAD_RESPONSE))
self.result = self.lead_adapter.create_lead(
self.email, extra_values=self.extra_values,
cookie=self.cookie
)
self.assertEqual(
self.result,
self.expected_lead_ids
)
class TestCreateLeadAdapterException(TestCreateLeadAdapter):
expected_lead_ids = []
@httpretty.activate
def test_create_lead(self):
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE),
status=500)
with self.assertRaises(UnableToCompleteRequestException):
self.lead_adapter.create_lead(self.email)
class TestCreateLeadAdapterCookieException(TestCreateLeadAdapter):
expected_lead_ids = [1097532] # lead still created
@httpretty.activate
def test_create_lead(self):
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
httpretty.register_uri(
httpretty.POST,
self.POST_LEAD_URL,
content_type="application/json",
body=json.dumps(self.POST_LEAD_RESPONSE))
httpretty.register_uri(
httpretty.POST,
self.ASSOCIATE_LEAD_URL.format(1097532), # see fixture
content_type="application/json",
body=json.dumps(self.ASSOCIATE_LEAD_RESPONSE),
status=500)
with self.assertRaises(UnableToCompleteRequestException):
self.lead_adapter.create_lead(
self.email, extra_values=self.extra_values,
cookie=self.cookie
)
class TestMissingDjangoSettings(TestCase):
@override_settings(MARKETO_HOST=None)
def test_missing_host(self):
with self.assertRaises(MarketoImproperlyConfigured):
MarketoClientConfig()
@override_settings(MARKETO_CLIENT_ID=None)
def test_missing_client_id(self):
with self.assertRaises(MarketoImproperlyConfigured):
MarketoClientConfig()
@override_settings(MARKETO_SECRET=None)
def test_missing_client_secret(self):
with self.assertRaises(MarketoImproperlyConfigured):
MarketoClientConfig()
class TestCreateActivityAdapter(test_mixins.MarketoCreateActivityMixin,
TestCreateLeadAdapter):
email = 'fake@fake.email.com'
activity_type = constants.DTR_BINARY_DOWNLOAD
activity_date = timezone.now()
attributes = dict(url='http://some.fake.url/yadda/yadda')
@httpretty.activate
def test(self):
# authenticate
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
# find lead
httpretty.register_uri(
httpretty.POST,
self.POST_LEAD_URL,
content_type="application/json",
body=json.dumps(self.POST_LEAD_RESPONSE))
# create activity
httpretty.register_uri(
httpretty.POST,
self.POST_CREATE_ACTIVITY_URL,
content_type="application/json",
body=json.dumps(self.POST_CREATE_ACTIVITY_RESPONSE))
self.assertTrue(self.lead_adapter.create_activity(
self.email,
self.activity_type,
self.activity_date,
attributes=self.attributes
))
@httpretty.activate
def test_unable_to_find_lead(self):
# authenticate
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
# find lead - 500
httpretty.register_uri(
httpretty.POST,
self.POST_LEAD_URL,
content_type="application/json",
body=json.dumps(self.POST_LEAD_RESPONSE),
status=500)
with self.assertRaises(UnableToCompleteRequestException):
self.lead_adapter.create_activity(
self.email,
self.activity_type,
self.activity_date,
attributes=self.attributes
)
@httpretty.activate
def test_unable_to_create_activity(self):
# authenticate
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
# find lead
httpretty.register_uri(
httpretty.POST,
self.POST_LEAD_URL,
content_type="application/json",
body=json.dumps(self.POST_LEAD_RESPONSE))
# create activity - 400
httpretty.register_uri(
httpretty.POST,
self.POST_CREATE_ACTIVITY_URL,
content_type="application/json",
body=json.dumps(self.POST_CREATE_ACTIVITY_RESPONSE),
status=400)
with self.assertRaises(UnableToCompleteRequestException):
self.lead_adapter.create_activity(
self.email,
self.activity_type,
self.activity_date,
attributes=self.attributes
)
@httpretty.activate
def test_unable_to_encode_date(self):
# authenticate
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
# find lead
httpretty.register_uri(
httpretty.POST,
self.POST_LEAD_URL,
content_type="application/json",
body=json.dumps(self.POST_LEAD_RESPONSE))
# create activity
httpretty.register_uri(
httpretty.POST,
self.POST_CREATE_ACTIVITY_URL,
content_type="application/json",
body=json.dumps(self.POST_CREATE_ACTIVITY_RESPONSE))
with self.assertRaises(InvalidActivityDateException):
self.lead_adapter.create_activity(
self.email,
self.activity_type,
None,
attributes=self.attributes
)
with self.assertRaises(InvalidActivityDateException):
self.lead_adapter.create_activity(
self.email,
self.activity_type,
"Johnny",
attributes=self.attributes
)
with self.assertRaises(InvalidActivityDateException):
self.lead_adapter.create_activity(
self.email,
self.activity_type,
1,
attributes=self.attributes
)
@httpretty.activate
def test_invalid_activity_tyoe(self):
# authenticate
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
# find lead
httpretty.register_uri(
httpretty.POST,
self.POST_LEAD_URL,
content_type="application/json",
body=json.dumps(self.POST_LEAD_RESPONSE))
# create activity
httpretty.register_uri(
httpretty.POST,
self.POST_CREATE_ACTIVITY_URL,
content_type="application/json",
body=json.dumps(self.POST_CREATE_ACTIVITY_RESPONSE))
with self.assertRaises(UnknownActivityTypeException):
self.lead_adapter.create_activity(
self.email,
"bodybuilding",
self.activity_date,
attributes=self.attributes
)
class TestAdapterKeyConversion(TestCase):
def setUp(self):
self.result = (
LeadManagerImpl._convert_keyvals_to_custom_activity_format(
{
"SubType": "Ubuntu",
"UserName": "toli",
"UUID": "123-abc-xyz-456"
}
)
)
def test(self):
self.assertEqual(
# order seems to matter to the unit test
# but the api doesn't really care
self.result,
[
{
"name": "UserName",
"value": "toli",
},
{
"name": "SubType",
"value": "Ubuntu",
},
{
"name": "UUID",
"value": "123-abc-xyz-456",
},
]
)
class TestActivityDateConversion(TestCase):
"""
Simple case is to handle datetime naive objects
"""
datetime_now = datetime.datetime.now()
expected_datetime = datetime_now.isoformat()
def setUp(self):
self.result = LeadManagerImpl._convert_date_to_custom_activity_format(
self.datetime_now
)
def test(self):
self.assertEqual(self.result, self.expected_datetime)
class TestActivityDateConversionTimezoneAware(TestActivityDateConversion):
datetime_now = timezone.now()
# utz date, no tzinfo, iso formatted
expected_datetime = datetime_now.astimezone(pytz.utc).replace(
tzinfo=None
).isoformat()
class TestActivityDateConversionStringNaive(TestActivityDateConversion):
datetime_now = "2015-08-20T17:03:33.238789"
expected_datetime = "2015-08-20T17:03:33.238789"
class TestActivityDateConversionStringTZAware(TestActivityDateConversion):
datetime_now = "2015-08-20T17:03:33.238789+00:00"
expected_datetime = "2015-08-20T17:03:33.238789"
class TestActivityDateInvalidDate(TestCase):
datetime_now = "johnisnotadate" # really :)
expected_datetime = None
def test(self):
with self.assertRaises(InvalidActivityDateException):
self.result = \
LeadManagerImpl._convert_date_to_custom_activity_format(
self.datetime_now
)
class TestActivityDateEmptyDate(TestCase):
datetime_now = None
expected_datetime = None
def test(self):
with self.assertRaises(InvalidActivityDateException):
self.result = \
LeadManagerImpl._convert_date_to_custom_activity_format(
self.datetime_now
)
class TestActivityDateBadMonth(TestCase):
datetime_now = "2015-25-20T17:03:33" # bad month
expected_datetime = None
def test(self):
with self.assertRaises(InvalidActivityDateException):
self.result = \
LeadManagerImpl._convert_date_to_custom_activity_format(
self.datetime_now
)
class TestRequestCampaign(test_mixins.MarketoRequestCampaignMixin,
test_mixins.MarketoRequiresOauthTokenMixin,
TestMarketoAdapterFixture):
@httpretty.activate
def test_valid_campaign_request(self):
# authenticate
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
# trigger request campaign
httpretty.register_uri(
httpretty.POST,
self.REQUEST_CAMPAIGN_URL.format(self.campaign_id),
content_type="application/json",
body=json.dumps(self.POST_VALID_CAMPAIGN_REQUEST_RESPONSE))
self.leads = [{"id": self.lead_id}]
result = self.campaign_adapter.request_campaign(self.campaign_id, self.leads)
self.assertTrue(result)
@httpretty.activate
def test_invalid_campaign_request(self):
# authenticate
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
# trigger request campaign
httpretty.register_uri(
httpretty.POST,
self.REQUEST_CAMPAIGN_URL.format(self.campaign_id),
content_type="application/json",
body=json.dumps(self.POST_INVALID_CAMPAIGN_REQUEST_RESPONSE))
self.leads = [{"id": self.lead_id}]
result = self.campaign_adapter.request_campaign(self.campaign_id, self.leads)
self.assertFalse(result)
@httpretty.activate
def test_campaign_request_without_campaign_id(self):
# authenticate
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
with self.assertRaises(UnableToCompleteRequestException):
self.leads = [{"id": self.lead_id}]
result = self.campaign_adapter.request_campaign(None, self.leads)
@httpretty.activate
def test_campaign_request_without_lead_id(self):
# authenticate
httpretty.register_uri(
httpretty.GET,
self.OAUTH_TOKEN_URL,
content_type="application/json",
body=json.dumps(self.OAUTH_TOKEN_RESPONSE))
with self.assertRaises(UnableToCompleteRequestException):
result = self.campaign_adapter.request_campaign(self.campaign_id, None)