Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
hub-client / dockerhub / marketo / client.py
Size: Mime:
import collections
import json
import pprint
import requests
import time
import logging
from datetime import datetime, timedelta

from dockerhub.marketo.utils import flatten, DatetimeEncoder
from dockerhub.marketo.exceptions import UnableToAuthenticateException
from dockerhub.marketo import decorators

log = logging.getLogger(__name__)


# TODO: move this to marketo.constants
class MarketoConstantMixin(object):
    """
    Definition of Marketo Constants
    """
    SCHEME = "https://"

    #  http://developers.marketo.com/documentation/rest/get-multiple-leads-by-list-id/
    MULTIPLE_LEADS_BY_ENDPOINT = "/rest/v1/list/{0}/leads.json"

    #  http://developers.marketo.com/documentation/rest/add-leads-to-list/
    LIST_ENDPOINT = "/rest/v1/lists/{0}/leads.json"

    # TODO: add documentation links for the other endpoints
    LISTS_ENDPOINT = "/rest/v1/lists.json"
    IDENTITY_ENDPOINT = "/identity/oauth/token"
    LEADS_ENDPOINT = "/rest/v1/leads.json"
    MEMBER_OF_LIST_ENDPOINT = "/rest/v1/lists/{0}/leads/ismember.json"
    ASSOCIATE_LEAD_ENDPOINT = "/rest/v1/leads/{0}/associate.json"
    CREATE_ACTIVITY_ENDPOINT = "/rest/v1/activities/external.json"
    GET_ACTIVITY_ENDPOINT = "/rest/v1/activities.json"
    GET_PAGING_TOKEN_ENDPOINT = "/rest/v1/activities/pagingtoken.json"
    REQUEST_CAMPAIGN_ENDPOINT = "/rest/v1/campaigns/{0}/trigger.json"

    UNSUBSCRIBED = 'unsubscribed'
    SUBSCRIBED = 'subscribed'
    EMAIL = 'email'
    MEMBER_OF = 'memberof'

    class FilterType(object):
        """
        Class used to represent the static filter types.
        """
        ID = 'id'
        EMAIL = 'email'
        DOCKER_UUID = 'dockerUUID'

    class FieldType(object):
        """
        Class used to represent the static field types
        """
        ID = 'id'
        EMAIL = 'email'

    class ResponseConstant(object):
        ERRORS = 'errors'
        RESULT = 'result'
        STATUS = 'status'
        ADDED = 'added'
        SUCCESS = 'success'


class MarketoClient(MarketoConstantMixin):
    """
    Implementation of low level Marketo interactions
    """
    def __init__(self, host, client_id, client_secret):
        assert host is not None
        assert isinstance(host, basestring)
        assert client_id is not None
        assert isinstance(client_id, basestring)
        assert client_secret is not None
        assert isinstance(client_secret, basestring)
        self.host = host
        self.client_id = client_id
        self.client_secret = client_secret
        self.token = None
        self.expires_in = None
        self.valid_until = None
        self.token_type = None
        self.scope = None
        self.headers = {'Content-type': 'application/json'}

        # start a session object
        self._s = requests.Session()

        # set the common session headers
        self._s.headers = self.headers

    def _construct_url(self, endpoint):
        """
        helper method to construct a url in a common way.
        """
        return ''.join([self.SCHEME, self.host, endpoint])

    @decorators.increment_api_calls
    def _get(self, endpoint, args=None):
        """
        Helper method used to pose to an endpoint in a common way. We return
        the response as a json object
        """
        url = self._construct_url(endpoint)

        log.debug("GET to: %s\n\tparams: %s", url, args)
        response = self._s.get(
            url,
            params=args
        )
        response.raise_for_status()
        log.debug("GET Response: %s", response.json())
        return response.json()

    @decorators.increment_api_calls
    def _post(self, endpoint, args=None, data=None):
        """
        helper method used to post to an endpoint in a common way.  We return
        the response as a json object.
        """
        url = self._construct_url(endpoint)
        log.debug("POST to: %s\n\tdata: %s\n\tparams: %s", url, data, args)
        response = self._s.post(
            url,
            data=json.dumps(data, cls=DatetimeEncoder),
            params=args
        )
        log.debug("RAW POST Response: %s", response)
        response.raise_for_status()
        log.debug("POST Response content: %s", response.json())
        return response.json()

    @decorators.increment_api_calls
    def _delete(self, endpoint, args=None, data=None):
        """
        helper method used to post to an endpoint in a common way.  We return
        the response as a json object.
        """
        url = self._construct_url(endpoint)
        log.debug("DELETE to: %s\n\tdata: %s\n\tparams: %s", url, data, args)
        response = self._s.delete(
            url,
            data=json.dumps(data),
            params=args
        )
        response.raise_for_status()

        log.debug("DELETE Response: %s", response.json())
        return response.json()

    @decorators.authenticate
    def _show_lists(self):
        """
        LOCAL DEV FEATURE ONLY
        returns the lists available
        :return:
        """
        return pprint.pprint(self._get(self.LISTS_ENDPOINT))

    def handle_authentication(self):
        """
        Method used to get an authentication token. Used for subsequent API
        calls
        :raises UnableToAuthenticateException: when no access_token is present
        in a successful response
        :raises ValueError: When the json response cannot be decoded
        :returns: None
        """
        log.debug("Authenticating with marketo...")
        if self.valid_until is not None and self.valid_until > time.time():
            return  # already have a token and its valid
        log.debug("Valid until not set, continuing with authentication...")
        args = {
            'grant_type': 'client_credentials',
            'client_id': self.client_id,
            'client_secret': self.client_secret
        }
        response = self._get(self.IDENTITY_ENDPOINT, args=args)
        log.debug("Authentication response: %s", response)
        self.token = response.get('access_token')
        if self.token is None:
            log.debug("Unable to authenticate, no token")
            raise UnableToAuthenticateException()
        self.token_type = response.get('token_type')
        self.expires_in = response.get('expires_in')
        if self.expires_in is not None:
            self.valid_until = time.time() + self.expires_in
        self.scope = response.get('scope')

        # set the access token at the request.Session level
        self._s.params.update({'access_token': self.token})

    @decorators.authenticate
    def update_field_value(self, lead_id, field_name, field_value):
        """
        updates specified lead field
        :param lead_id: marketo lead id
        :param field_name: field to update
        :param field_value:
        :return: True if lead is updated, false otherwise
        :raises requests.HTTPError:
        :raises ValueError: When the json response cannot be decoded
        """
        log.debug("Updating field (%s) with (%s) for (%s)",
                  field_name, field_value, lead_id)
        leads = self.get_leads(
            MarketoClient.FieldType.ID,
            values=[str(lead_id), ],
            fields=[self.EMAIL, ]
        )
        if not leads:
            return False

        email = leads[0].get(self.EMAIL)
        data = {
            'action': 'updateOnly',
            'input': [
                {
                    MarketoClient.FieldType.EMAIL: email,
                    field_name: field_value
                }
            ]
        }
        response = self._post(self.LEADS_ENDPOINT, data=data)
        return response.get('success', False)

    @decorators.authenticate
    def add_leads_to_list(self, list_id, leads):
        """
        adds a lead to the specified list
        """
        log.debug("Adding leads (%s) to list (%s)", leads, list_id)
        args = {
            'id': leads
        }
        response = self._post(self.LIST_ENDPOINT.format(list_id), args, None)
        if response.get('success', False):
            # Update the custom field if the POST request is successful
            for lead in response.get('result'):
                if lead.get('status') == 'added':
                    self.update_field_value(
                        lead.get('id'),
                        'list_{0}'.format(list_id),
                        self.SUBSCRIBED
                    )
        else:
            errors = response.get('errors', [])
            for error in errors:
                # list doesn't exist, what??!
                message = error.get('message')
                log.warning(
                    "Unable to add lead %s to list %s. %s",
                    list_id, leads, message
                )
        log.debug("Done adding leads (%s) to list (%s)", leads, list_id)
        if response.get('success'):
            return True
        return False

    @decorators.authenticate
    def request_campaign(self, campaign_id, leads=None, tokens=None):
        """
        Sends campaign requests to the specified leads
        """
        log.debug("Triggering campaign_id (%s) on the marketo leads (%s)", campaign_id, leads)

        data = {'input': {'leads': leads}}
        if tokens:
            data['input']['tokens'] = tokens

        response = self._post(self.REQUEST_CAMPAIGN_ENDPOINT.format(campaign_id), data=data)
        if response.get('success'):
            return True
        return False

    @decorators.authenticate
    def is_member_of_list(self, list_id, leads):
        """
        checks if the leads are members of a specific list
        """
        log.debug(
            "Checking for membership of (%s) on list (%s)...", list_id, leads
        )
        args = {
            'id': leads
        }
        response = self._get(
            self.MEMBER_OF_LIST_ENDPOINT.format(list_id), args=args
        )
        log.debug("Membership check response: %s", response)
        # if any of the results participate, return true
        results = response.get('result', [])
        for result in results:
            if result.get('status') == MarketoConstantMixin.MEMBER_OF:
                return True
        return False

    @decorators.authenticate
    def associate_lead(self, lead_id, cookie_id):
        """
        associates a lead with a cookie value
        """
        args = {
            'cookie': cookie_id
        }
        return self._post(
            self.ASSOCIATE_LEAD_ENDPOINT.format(lead_id), args=args
        )

    @decorators.authenticate
    def remove_leads_from_list(self, list_id, leads):
        """
        Removes a lead from the specified list
        :param list_id:
        :param leads:
        :return:
        """
        log.debug("Removing leads (%s) from list (%s)", leads, list_id)
        args = {
            'id': leads
        }
        response = self._delete(self.LIST_ENDPOINT.format(list_id), args, None)
        # Update the custom field if the DELETE request is successful
        for lead in response.get('result'):
            if lead.get('status') == 'removed':
                lead_id = lead.get('id')
                field_name = 'list_{0}'.format(list_id)
                self.update_field_value(lead_id, field_name, self.UNSUBSCRIBED)
        log.debug("Done removing leads (%s) from list (%s)", leads, list_id)
        if response.get('success'):
            return True
        return False

    @decorators.authenticate
    def get_leads(self, filter_value, values=None, fields=None):
        """
        Queries the endpoint for leads (users) using the filter and filter
        values, returning the fields specified
        :param filter_value:
        :param values:
        :param fields:
        :return:
        """
        values = [] if values is None else values
        fields = [] if fields is None else fields

        args = {
            'filterType': str(filter_value),
            'filterValues': ','.join(values)
        }
        if len(fields) > 0:
            args['fields'] = ",".join(fields)
        response = self._get(self.LEADS_ENDPOINT, args)
        return response.get('result') if response else []

    @decorators.authenticate
    def create_lead(self, email, extra_values=None):
        """
        Creates a lead using the email address provided.  Marketo will
        create the lead if it doesn't exist or update the lead if it
        already exists.
        :param email: email address
        :param extra_values:
        :return: list of created leads or [] if not created
        """
        log.debug(
            "Creating lead with email %s and extra values %s",
            email, extra_values
        )
        lead = dict(email=email)
        if extra_values is not None and isinstance(extra_values, dict):
            lead.update(extra_values.items())
        data = {
            'action': 'createOrUpdate',
            'lookupField': 'email',
            'input': [lead]
        }
        response = self._post(self.LEADS_ENDPOINT, data=data)
        if response.get('success', False):
            leads = response.get('result', [])
        else:
            leads = []
        leads = [int(lead) for lead in flatten('id', leads)]
        log.debug("Created leads: %s", leads)
        return leads

    def find_lead(self, *args, **kwargs):
        """
        To improve the readability of code calling into the client, this method
        wraps the "create_lead" method.  Create lead is a create/update
        function that returns the lead ids (or an empty list if unsuccessful)
        of the leads operated on.
        """
        return self.create_lead(*args, **kwargs)

    @decorators.authenticate
    def delete_leads(self, leads):
        """
        Deletes specified leads by list id
        :param leads: array of leads
        :return: True if leads are successfully removed, False otherwise
        """
        log.debug("Deleting leads with ids: %s", leads)
        lead_dicts = [dict(id=lead_id) for lead_id in leads]
        data = {
            'input': lead_dicts
        }
        response = self._delete(self.LEADS_ENDPOINT, data=data)
        if response.get('success', False):
            log.debug("Deleted leads: %s", leads)
            return True

        log.debug("Unable to delete leads: %s", leads)
        return False

    @decorators.authenticate
    def create_custom_activity(self, lead_id, activity_type_id,
                               primary_attribute_value, activity_date,
                               attributes=None):
        """
        Create activity for leads specified by leads
        :param lead: marketo id for the lead
        :param activity_type_id: marketo id for the activity type
        :param primary_attribute_value: marketo descroption of the activity
        :param activity_date: json serializable datetime. must not include the
               timezone offset (e.g. "-00:00")
        :param attributes: dict of additional data
        :return: True if the activity is created, False otherwise
        """
        # set default attribute value
        attributes = [] if attributes is None else attributes
        if not isinstance(attributes, collections.Iterable):
            raise TypeError("Attribute {0} is not iterable".format(attributes))

        # setup our post data
        log.debug("Creating activity for lead: %s", lead_id)
        data = {
            "input": [
                {
                    "leadId": lead_id,
                    "activityDate": activity_date,
                    "activityTypeId": activity_type_id,
                    "primaryAttributeValue": primary_attribute_value,
                    "attributes": attributes   # list of object with Name/Value keys
                }
            ]
        }

        # post the response to marketo
        response = self._post(self.CREATE_ACTIVITY_ENDPOINT, data=data)
        log.debug("Response from creating activity: %s", response)

        # grab values out of the response
        post_was_successful = response.get(
            MarketoClient.ResponseConstant.SUCCESS)
        results = response.get(MarketoClient.ResponseConstant.RESULT)

        # We're posting only one activity here, so we should have only
        # one result in the result set.
        result = results[0] if results else {}
        status = result.get(MarketoClient.ResponseConstant.STATUS)
        errors = response.get(MarketoClient.ResponseConstant.ERRORS)

        # verify success
        # Due to the flakyness of this endpoint, We're doing some overlapping
        # checks here.  One would think that a successful post yields no
        # errors, but that's not always the case. We're also checking
        # that we get results back (results is not empty) and that the activity
        # was successfully added (vs skipped)
        if post_was_successful and not errors and status == MarketoClient.ResponseConstant.ADDED:  # noqa
            log.debug("Activity created for lead: %s", lead_id)
            return True

        # result wasn't added, capture info
        log.error("Unable to create activity for lead: %s", lead_id,
                  extra=dict(
                      lead_id=lead_id,
                      activity_date=activity_date,
                      primary_attribute_value=primary_attribute_value,
                      activity_type_id=activity_type_id,
                      attributes=attributes,
                      status=status,
                      errors=errors
                  ))
        log.debug("Errors: %s", errors)
        return False

    @decorators.authenticate
    def get_paging_token(self, days_look_back=1):
        """
        Get paging token for a specific datetime range
        :param days_look_back: number of days since today (UTC)
        """
        log.debug("Retrieving paging token...")
        since_datetime = datetime.utcnow() - timedelta(days_look_back)
        args = {
            'sinceDatetime': since_datetime.isoformat()
        }
        response = self._get(self.GET_PAGING_TOKEN_ENDPOINT, args)
        next_page_token = response.get('nextPageToken')
        log.debug("Page token returned as: %s", next_page_token)
        return next_page_token

    @decorators.authenticate
    def _get_all_page_results(self, endpoint, args=None):
        """ loop through all the pages and get the results """
        log.debug("Retrieving results...")
        response = self._get(endpoint, args)
        results, next_page_token = self._process_response(response)
        results_list = results or []
        log.debug("Results found as: %s", results)

        while next_page_token:
            log.debug("More results to process...")
            args['nextPageToken'] = next_page_token
            response = self._get(endpoint, args)
            results, next_page_token = self._process_response(response)
            results_list.extend(results)
            log.debug("More results found as: %s", results)
        return results_list

    def _process_response(self, response):
        """ parse response and set variables """
        log.debug("Processing response...")
        results = response.get('result') or []
        next_page_token = response.get('nextPageToken')
        if len(results) == 0 or 'nextPageToken' not in response:
            next_page_token = None
        log.debug("Results: %s. next_page_token: %s", results, next_page_token)
        return results, next_page_token

    @decorators.authenticate
    def get_lead_activities(self, activity_ids=None,
                            days_look_back=1, list_id=None):
        """
        Get the lead activities for a specify time frame
        :param activity_ids: list of activity type ids
            1 Visit Web Page, 2 Fill Out Form, 3 Click Link, 6 Send Email,
            8 Email Bounced, 9 Unsubscribe Email, 10 Open Email, 11 Click Email,
            12 New Lead, 13 Change Data Value, 22 Change Score, 23 Change Owner,
            24 Add to List, 25 Remove from List, 27 Email Bounced Soft,
            38 Send Alert
        :param days_look_back: number of days since today (UTC)
            Default: 1 day
        :param list_id: this parameter servers as a filter, and will return
            leads that belong to a specified list activities for leads
            with in this given list
        """
        log.debug("Retrieving lead activities for activities: %s, list_id: %s, days_look_back: %s",
                  activity_ids, list_id, days_look_back)
        # TODO: activity_ids is required by the endpoint, so this should be an argument instead of a keyword argument
        activity_ids = activity_ids if activity_ids is not None else []
        paging_token = self.get_paging_token(days_look_back)

        args = {
            'activityTypeIds': activity_ids,
            'nextPageToken': paging_token
        }

        if list_id:
            args['listId'] = list_id

        results = self._get_all_page_results(self.GET_ACTIVITY_ENDPOINT, args)
        log.debug("Done retrieving results.")
        return results

    @decorators.authenticate
    def get_leads_by_list_id(self, list_id=None, batch_size=None, fields=None):
        """
        Returns a list of lead object for the list specified
        :param list_id: Id of the list
        :param batch_size: The number of lead records to be
            returned in a single call (default and max is 300)
        :param fields: Comma separated list of field names to be
            returned in the response.
        """
        log.debug("Retrieving leads for list_id: %s, batch_size: %s, fields: %s",
                  list_id, batch_size, fields)
        fields = [] if fields is None else fields

        args = {
            'access_token': self.token
        }
        if len(fields) > 0:
            args['fields'] = ",".join(fields)
        if batch_size:
            args['batchSize'] = batch_size

        results = self._get_all_page_results(self.MULTIPLE_LEADS_BY_ENDPOINT.format(list_id), args)
        log.debug("Done retrieving leads.")
        return results