Repository URL to install this package:
|
Version:
0.32.0 ▾
|
import collections
import json
import pprint
import requests
import time
import logging
from datetime import datetime, timedelta
from dockerhub.marketo.utils import flatten, DatetimeEncoder
from dockerhub.marketo.exceptions import UnableToAuthenticateException
from dockerhub.marketo import decorators
log = logging.getLogger(__name__)
# TODO: move this to marketo.constants
class MarketoConstantMixin(object):
"""
Definition of Marketo Constants
"""
SCHEME = "https://"
# http://developers.marketo.com/documentation/rest/get-multiple-leads-by-list-id/
MULTIPLE_LEADS_BY_ENDPOINT = "/rest/v1/list/{0}/leads.json"
# http://developers.marketo.com/documentation/rest/add-leads-to-list/
LIST_ENDPOINT = "/rest/v1/lists/{0}/leads.json"
# TODO: add documentation links for the other endpoints
LISTS_ENDPOINT = "/rest/v1/lists.json"
IDENTITY_ENDPOINT = "/identity/oauth/token"
LEADS_ENDPOINT = "/rest/v1/leads.json"
MEMBER_OF_LIST_ENDPOINT = "/rest/v1/lists/{0}/leads/ismember.json"
ASSOCIATE_LEAD_ENDPOINT = "/rest/v1/leads/{0}/associate.json"
CREATE_ACTIVITY_ENDPOINT = "/rest/v1/activities/external.json"
GET_ACTIVITY_ENDPOINT = "/rest/v1/activities.json"
GET_PAGING_TOKEN_ENDPOINT = "/rest/v1/activities/pagingtoken.json"
REQUEST_CAMPAIGN_ENDPOINT = "/rest/v1/campaigns/{0}/trigger.json"
UNSUBSCRIBED = 'unsubscribed'
SUBSCRIBED = 'subscribed'
EMAIL = 'email'
MEMBER_OF = 'memberof'
class FilterType(object):
"""
Class used to represent the static filter types.
"""
ID = 'id'
EMAIL = 'email'
DOCKER_UUID = 'dockerUUID'
class FieldType(object):
"""
Class used to represent the static field types
"""
ID = 'id'
EMAIL = 'email'
class ResponseConstant(object):
ERRORS = 'errors'
RESULT = 'result'
STATUS = 'status'
ADDED = 'added'
SUCCESS = 'success'
class MarketoClient(MarketoConstantMixin):
"""
Implementation of low level Marketo interactions
"""
def __init__(self, host, client_id, client_secret):
assert host is not None
assert isinstance(host, basestring)
assert client_id is not None
assert isinstance(client_id, basestring)
assert client_secret is not None
assert isinstance(client_secret, basestring)
self.host = host
self.client_id = client_id
self.client_secret = client_secret
self.token = None
self.expires_in = None
self.valid_until = None
self.token_type = None
self.scope = None
self.headers = {'Content-type': 'application/json'}
# start a session object
self._s = requests.Session()
# set the common session headers
self._s.headers = self.headers
def _construct_url(self, endpoint):
"""
helper method to construct a url in a common way.
"""
return ''.join([self.SCHEME, self.host, endpoint])
@decorators.increment_api_calls
def _get(self, endpoint, args=None):
"""
Helper method used to pose to an endpoint in a common way. We return
the response as a json object
"""
url = self._construct_url(endpoint)
log.debug("GET to: %s\n\tparams: %s", url, args)
response = self._s.get(
url,
params=args
)
response.raise_for_status()
log.debug("GET Response: %s", response.json())
return response.json()
@decorators.increment_api_calls
def _post(self, endpoint, args=None, data=None):
"""
helper method used to post to an endpoint in a common way. We return
the response as a json object.
"""
url = self._construct_url(endpoint)
log.debug("POST to: %s\n\tdata: %s\n\tparams: %s", url, data, args)
response = self._s.post(
url,
data=json.dumps(data, cls=DatetimeEncoder),
params=args
)
log.debug("RAW POST Response: %s", response)
response.raise_for_status()
log.debug("POST Response content: %s", response.json())
return response.json()
@decorators.increment_api_calls
def _delete(self, endpoint, args=None, data=None):
"""
helper method used to post to an endpoint in a common way. We return
the response as a json object.
"""
url = self._construct_url(endpoint)
log.debug("DELETE to: %s\n\tdata: %s\n\tparams: %s", url, data, args)
response = self._s.delete(
url,
data=json.dumps(data),
params=args
)
response.raise_for_status()
log.debug("DELETE Response: %s", response.json())
return response.json()
@decorators.authenticate
def _show_lists(self):
"""
LOCAL DEV FEATURE ONLY
returns the lists available
:return:
"""
return pprint.pprint(self._get(self.LISTS_ENDPOINT))
def handle_authentication(self):
"""
Method used to get an authentication token. Used for subsequent API
calls
:raises UnableToAuthenticateException: when no access_token is present
in a successful response
:raises ValueError: When the json response cannot be decoded
:returns: None
"""
log.debug("Authenticating with marketo...")
if self.valid_until is not None and self.valid_until > time.time():
return # already have a token and its valid
log.debug("Valid until not set, continuing with authentication...")
args = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret
}
response = self._get(self.IDENTITY_ENDPOINT, args=args)
log.debug("Authentication response: %s", response)
self.token = response.get('access_token')
if self.token is None:
log.debug("Unable to authenticate, no token")
raise UnableToAuthenticateException()
self.token_type = response.get('token_type')
self.expires_in = response.get('expires_in')
if self.expires_in is not None:
self.valid_until = time.time() + self.expires_in
self.scope = response.get('scope')
# set the access token at the request.Session level
self._s.params.update({'access_token': self.token})
@decorators.authenticate
def update_field_value(self, lead_id, field_name, field_value):
"""
updates specified lead field
:param lead_id: marketo lead id
:param field_name: field to update
:param field_value:
:return: True if lead is updated, false otherwise
:raises requests.HTTPError:
:raises ValueError: When the json response cannot be decoded
"""
log.debug("Updating field (%s) with (%s) for (%s)",
field_name, field_value, lead_id)
leads = self.get_leads(
MarketoClient.FieldType.ID,
values=[str(lead_id), ],
fields=[self.EMAIL, ]
)
if not leads:
return False
email = leads[0].get(self.EMAIL)
data = {
'action': 'updateOnly',
'input': [
{
MarketoClient.FieldType.EMAIL: email,
field_name: field_value
}
]
}
response = self._post(self.LEADS_ENDPOINT, data=data)
return response.get('success', False)
@decorators.authenticate
def add_leads_to_list(self, list_id, leads):
"""
adds a lead to the specified list
"""
log.debug("Adding leads (%s) to list (%s)", leads, list_id)
args = {
'id': leads
}
response = self._post(self.LIST_ENDPOINT.format(list_id), args, None)
if response.get('success', False):
# Update the custom field if the POST request is successful
for lead in response.get('result'):
if lead.get('status') == 'added':
self.update_field_value(
lead.get('id'),
'list_{0}'.format(list_id),
self.SUBSCRIBED
)
else:
errors = response.get('errors', [])
for error in errors:
# list doesn't exist, what??!
message = error.get('message')
log.warning(
"Unable to add lead %s to list %s. %s",
list_id, leads, message
)
log.debug("Done adding leads (%s) to list (%s)", leads, list_id)
if response.get('success'):
return True
return False
@decorators.authenticate
def request_campaign(self, campaign_id, leads=None, tokens=None):
"""
Sends campaign requests to the specified leads
"""
log.debug("Triggering campaign_id (%s) on the marketo leads (%s)", campaign_id, leads)
data = {'input': {'leads': leads}}
if tokens:
data['input']['tokens'] = tokens
response = self._post(self.REQUEST_CAMPAIGN_ENDPOINT.format(campaign_id), data=data)
if response.get('success'):
return True
return False
@decorators.authenticate
def is_member_of_list(self, list_id, leads):
"""
checks if the leads are members of a specific list
"""
log.debug(
"Checking for membership of (%s) on list (%s)...", list_id, leads
)
args = {
'id': leads
}
response = self._get(
self.MEMBER_OF_LIST_ENDPOINT.format(list_id), args=args
)
log.debug("Membership check response: %s", response)
# if any of the results participate, return true
results = response.get('result', [])
for result in results:
if result.get('status') == MarketoConstantMixin.MEMBER_OF:
return True
return False
@decorators.authenticate
def associate_lead(self, lead_id, cookie_id):
"""
associates a lead with a cookie value
"""
args = {
'cookie': cookie_id
}
return self._post(
self.ASSOCIATE_LEAD_ENDPOINT.format(lead_id), args=args
)
@decorators.authenticate
def remove_leads_from_list(self, list_id, leads):
"""
Removes a lead from the specified list
:param list_id:
:param leads:
:return:
"""
log.debug("Removing leads (%s) from list (%s)", leads, list_id)
args = {
'id': leads
}
response = self._delete(self.LIST_ENDPOINT.format(list_id), args, None)
# Update the custom field if the DELETE request is successful
for lead in response.get('result'):
if lead.get('status') == 'removed':
lead_id = lead.get('id')
field_name = 'list_{0}'.format(list_id)
self.update_field_value(lead_id, field_name, self.UNSUBSCRIBED)
log.debug("Done removing leads (%s) from list (%s)", leads, list_id)
if response.get('success'):
return True
return False
@decorators.authenticate
def get_leads(self, filter_value, values=None, fields=None):
"""
Queries the endpoint for leads (users) using the filter and filter
values, returning the fields specified
:param filter_value:
:param values:
:param fields:
:return:
"""
values = [] if values is None else values
fields = [] if fields is None else fields
args = {
'filterType': str(filter_value),
'filterValues': ','.join(values)
}
if len(fields) > 0:
args['fields'] = ",".join(fields)
response = self._get(self.LEADS_ENDPOINT, args)
return response.get('result') if response else []
@decorators.authenticate
def create_lead(self, email, extra_values=None):
"""
Creates a lead using the email address provided. Marketo will
create the lead if it doesn't exist or update the lead if it
already exists.
:param email: email address
:param extra_values:
:return: list of created leads or [] if not created
"""
log.debug(
"Creating lead with email %s and extra values %s",
email, extra_values
)
lead = dict(email=email)
if extra_values is not None and isinstance(extra_values, dict):
lead.update(extra_values.items())
data = {
'action': 'createOrUpdate',
'lookupField': 'email',
'input': [lead]
}
response = self._post(self.LEADS_ENDPOINT, data=data)
if response.get('success', False):
leads = response.get('result', [])
else:
leads = []
leads = [int(lead) for lead in flatten('id', leads)]
log.debug("Created leads: %s", leads)
return leads
def find_lead(self, *args, **kwargs):
"""
To improve the readability of code calling into the client, this method
wraps the "create_lead" method. Create lead is a create/update
function that returns the lead ids (or an empty list if unsuccessful)
of the leads operated on.
"""
return self.create_lead(*args, **kwargs)
@decorators.authenticate
def delete_leads(self, leads):
"""
Deletes specified leads by list id
:param leads: array of leads
:return: True if leads are successfully removed, False otherwise
"""
log.debug("Deleting leads with ids: %s", leads)
lead_dicts = [dict(id=lead_id) for lead_id in leads]
data = {
'input': lead_dicts
}
response = self._delete(self.LEADS_ENDPOINT, data=data)
if response.get('success', False):
log.debug("Deleted leads: %s", leads)
return True
log.debug("Unable to delete leads: %s", leads)
return False
@decorators.authenticate
def create_custom_activity(self, lead_id, activity_type_id,
primary_attribute_value, activity_date,
attributes=None):
"""
Create activity for leads specified by leads
:param lead: marketo id for the lead
:param activity_type_id: marketo id for the activity type
:param primary_attribute_value: marketo descroption of the activity
:param activity_date: json serializable datetime. must not include the
timezone offset (e.g. "-00:00")
:param attributes: dict of additional data
:return: True if the activity is created, False otherwise
"""
# set default attribute value
attributes = [] if attributes is None else attributes
if not isinstance(attributes, collections.Iterable):
raise TypeError("Attribute {0} is not iterable".format(attributes))
# setup our post data
log.debug("Creating activity for lead: %s", lead_id)
data = {
"input": [
{
"leadId": lead_id,
"activityDate": activity_date,
"activityTypeId": activity_type_id,
"primaryAttributeValue": primary_attribute_value,
"attributes": attributes # list of object with Name/Value keys
}
]
}
# post the response to marketo
response = self._post(self.CREATE_ACTIVITY_ENDPOINT, data=data)
log.debug("Response from creating activity: %s", response)
# grab values out of the response
post_was_successful = response.get(
MarketoClient.ResponseConstant.SUCCESS)
results = response.get(MarketoClient.ResponseConstant.RESULT)
# We're posting only one activity here, so we should have only
# one result in the result set.
result = results[0] if results else {}
status = result.get(MarketoClient.ResponseConstant.STATUS)
errors = response.get(MarketoClient.ResponseConstant.ERRORS)
# verify success
# Due to the flakyness of this endpoint, We're doing some overlapping
# checks here. One would think that a successful post yields no
# errors, but that's not always the case. We're also checking
# that we get results back (results is not empty) and that the activity
# was successfully added (vs skipped)
if post_was_successful and not errors and status == MarketoClient.ResponseConstant.ADDED: # noqa
log.debug("Activity created for lead: %s", lead_id)
return True
# result wasn't added, capture info
log.error("Unable to create activity for lead: %s", lead_id,
extra=dict(
lead_id=lead_id,
activity_date=activity_date,
primary_attribute_value=primary_attribute_value,
activity_type_id=activity_type_id,
attributes=attributes,
status=status,
errors=errors
))
log.debug("Errors: %s", errors)
return False
@decorators.authenticate
def get_paging_token(self, days_look_back=1):
"""
Get paging token for a specific datetime range
:param days_look_back: number of days since today (UTC)
"""
log.debug("Retrieving paging token...")
since_datetime = datetime.utcnow() - timedelta(days_look_back)
args = {
'sinceDatetime': since_datetime.isoformat()
}
response = self._get(self.GET_PAGING_TOKEN_ENDPOINT, args)
next_page_token = response.get('nextPageToken')
log.debug("Page token returned as: %s", next_page_token)
return next_page_token
@decorators.authenticate
def _get_all_page_results(self, endpoint, args=None):
""" loop through all the pages and get the results """
log.debug("Retrieving results...")
response = self._get(endpoint, args)
results, next_page_token = self._process_response(response)
results_list = results or []
log.debug("Results found as: %s", results)
while next_page_token:
log.debug("More results to process...")
args['nextPageToken'] = next_page_token
response = self._get(endpoint, args)
results, next_page_token = self._process_response(response)
results_list.extend(results)
log.debug("More results found as: %s", results)
return results_list
def _process_response(self, response):
""" parse response and set variables """
log.debug("Processing response...")
results = response.get('result') or []
next_page_token = response.get('nextPageToken')
if len(results) == 0 or 'nextPageToken' not in response:
next_page_token = None
log.debug("Results: %s. next_page_token: %s", results, next_page_token)
return results, next_page_token
@decorators.authenticate
def get_lead_activities(self, activity_ids=None,
days_look_back=1, list_id=None):
"""
Get the lead activities for a specify time frame
:param activity_ids: list of activity type ids
1 Visit Web Page, 2 Fill Out Form, 3 Click Link, 6 Send Email,
8 Email Bounced, 9 Unsubscribe Email, 10 Open Email, 11 Click Email,
12 New Lead, 13 Change Data Value, 22 Change Score, 23 Change Owner,
24 Add to List, 25 Remove from List, 27 Email Bounced Soft,
38 Send Alert
:param days_look_back: number of days since today (UTC)
Default: 1 day
:param list_id: this parameter servers as a filter, and will return
leads that belong to a specified list activities for leads
with in this given list
"""
log.debug("Retrieving lead activities for activities: %s, list_id: %s, days_look_back: %s",
activity_ids, list_id, days_look_back)
# TODO: activity_ids is required by the endpoint, so this should be an argument instead of a keyword argument
activity_ids = activity_ids if activity_ids is not None else []
paging_token = self.get_paging_token(days_look_back)
args = {
'activityTypeIds': activity_ids,
'nextPageToken': paging_token
}
if list_id:
args['listId'] = list_id
results = self._get_all_page_results(self.GET_ACTIVITY_ENDPOINT, args)
log.debug("Done retrieving results.")
return results
@decorators.authenticate
def get_leads_by_list_id(self, list_id=None, batch_size=None, fields=None):
"""
Returns a list of lead object for the list specified
:param list_id: Id of the list
:param batch_size: The number of lead records to be
returned in a single call (default and max is 300)
:param fields: Comma separated list of field names to be
returned in the response.
"""
log.debug("Retrieving leads for list_id: %s, batch_size: %s, fields: %s",
list_id, batch_size, fields)
fields = [] if fields is None else fields
args = {
'access_token': self.token
}
if len(fields) > 0:
args['fields'] = ",".join(fields)
if batch_size:
args['batchSize'] = batch_size
results = self._get_all_page_results(self.MULTIPLE_LEADS_BY_ENDPOINT.format(list_id), args)
log.debug("Done retrieving leads.")
return results