Repository URL to install this package:
|
Version:
0.5.4 ▾
|
from zdesk import Zendesk, get_id_from_url, ZendeskError, RateLimitError
import random
import httplib
import re
import json
duplicate_email_regex = re.compile('^Email: (.*) is already being used by another user$')
def email_to_identity(email):
return {
'type': 'email',
'value': email,
'verified': True
}
def emails_to_identities(emails):
return [email_to_identity(email) for email in emails]
def make_memberships(user_ids, org_id):
return [{
'user_id': user_id,
'organization_id': org_id} for user_id in user_ids]
class ZendeskSyncException(Exception):
pass
def format_name(username, fullname):
if fullname is None or fullname == '':
return username
return u'{0} ({1})'.format(fullname, username)
# Functions prefixed with safe_ are idempotent
class ZendeskSync():
def __init__(self, uri, username, token):
self.url = uri
self.username = username
self.token = token
self.client = Zendesk(uri,
zdesk_email=username,
zdesk_password=token,
zdesk_token=True,
api_version=2)
# This is used in integration tests to confirm that the username and
# password are correct
def ping(self):
return bool(self.client.users_me())
def get_user(self, uuid):
user_result = self.client.users_search(external_id=uuid)
if user_result['count'] != 0:
return user_result['users'][0]
return None
# XXX: there is no escaping done here, so this function must be called
# only with valid emails. It seems like zendesk has special logic when
# the search query looks like an email
def get_user_by_email(self, email):
user_result = self.client.users_search(query=email)
if user_result['count'] != 0:
return user_result['users'][0]
return None
def replace_plan_tags(self, old_tags, plans):
new_tags = filter(lambda t: not t.startswith('plan_'), old_tags)
new_tags += map(lambda p: 'plan_' + p, plans)
return new_tags
def update_user_plan_tags(self, uuid, plans):
user = self.get_user(uuid)
if user is None:
raise ZendeskSyncException('Tried to sync tags for non-existent user. {}'.format(uuid))
new_tags = self.replace_plan_tags(user['tags'], plans)
if user['tags'] != new_tags:
self.client.user_update(user['id'], {'user': {'tags': new_tags}})
def update_org_plan_tags(self, uuid, plans):
org = self.get_org(uuid)
if org is None:
raise ZendeskSyncException('Tried to sync tags for non-existent user. {}'.format(uuid))
new_tags = self.replace_plan_tags(org['tags'], plans)
if org['tags'] != new_tags:
self.client.organization_update(org['id'], {'organization': {'tags': new_tags}})
def list_identities(self, uuid):
user_result = self.client.users_search(external_id=uuid)
user_id = user_result['users'][0]['id']
res = self.client.user_identities(user_id)
if 'identities' not in res.keys():
return []
return res['identities']
# This is a hack to force zendesk to write out the identities
# before we fetch them...
def reliable_list_identities(self, user_id):
self.client.user_show(user_id)
res = self.client.user_identities(user_id=user_id)
if 'identities' not in res.keys():
return []
return res['identities']
def remove_external_id(self, user_id):
self.client.user_update(user_id, {
'user': {
'external_id': ''
}
})
def merge_users(self, user_into_uuid, user_from_uuid):
"""
This merges user_from into user_into and then removes user_from's identities
from user_into.
A side effect of our workaround for Zendesk's eventual consistency is that the
"into" user will have a new temporary email address added to them until next
re-sync happens.
"""
user_into = self.get_user(user_into_uuid)
user_from = self.get_user(user_from_uuid)
# if we try to merge a user with an external id zendesk barfs, so we have to remove it first
if user_from['external_id'] != '':
self.remove_external_id(user_from['id'])
# To avoid waiting for an eventually consistent merge to move email addresses, we remove the
# old email addresses ourselves
# Note that Zendesk requires at least one email address to exist, so we generate one
tmp_email = 'tmp{0}@null.devnull'.format(random.randint(1, 2000000000))
self.update_user_emails(user_from['id'], tmp_email, [tmp_email])
# do the merge
self.client.user_merge(user_from['id'],
{'user': {'id': user_into['id']}})
def add_email(self, user_id, email):
res = self.client.user_identity_create(
user_id, {'identity': email_to_identity(email)})
ident_id = get_id_from_url(res)
# get the full identity back
ret = self.client.user_identity_show(
user_id, ident_id)
if 'identity' not in ret.keys():
msg = "Internal error: identity not found after being created."
raise ZendeskSyncException(msg)
return ret['identity']
# We need to manually make each change to emails because zendesk
# doesn't have an api that allows us to replace all emails at once.
# Warning: if there is another existing user with one of these email addresses
# this will error out. The application is responsible for not having this happen
def update_user_emails(self, user_id, primary_email, emails):
primary_email = primary_email.lower()
emails = [e.lower() for e in emails]
identities = self.reliable_list_identities(user_id)
email_identities_map = {
ident['value']: ident for ident in identities
if ident['type'] == 'email'}
email_identities_strings = email_identities_map.keys()
# find out which emails are new and need to be added
new_emails = set(emails)
new_emails -= set(email_identities_strings)
# add new emails
for email in new_emails:
# create the new identity or make the merge if the other user exists
ident = self.add_email(user_id, email)
# update our local copy
identities.append(ident)
email_identities_map[email] = ident
email_identities_strings.append(email)
# set primary email
primary_ident = email_identities_map[primary_email]
if not primary_ident['primary']:
self.client.user_identity_make_primary(
user_id, primary_ident['id'], {})
# verify emails (if the user had a non-verified email)
for email in emails:
# find corresponding identity and if it's verified
# this should always be true because we just added the emails,
# but if it's not, just don't try to verify the missing email
if email in email_identities_map.keys():
ident = email_identities_map[email]
if not ident['verified']:
self.client.user_identity_update(user_id, ident['id'], 'true', {})
# remove old emails
for ident in identities:
if ident['type'] == 'email' and ident['value'] not in emails:
self.client.user_identity_delete(
user_id, ident['id'])
# emails is a list of strings, all emails will be added verified
def safe_create_update_user(self, uuid, primary_email, emails, fullname, username):
user = self.get_user(uuid)
if primary_email is None:
primary_email = emails[0]
if user is None:
user_id = self.create_user(uuid, primary_email, emails, fullname, username)
else:
user_id = self.update_user(user, primary_email, emails, fullname, username)
return user_id
def update_user(self, user, primary_email, emails, fullname, username):
user_id = user['id']
self.client.user_update(user_id, {
'user': {
'external_id': user['external_id'],
'name': format_name(username, fullname),
'email': primary_email
}
})
self.update_user_emails(user_id, primary_email, emails)
return user_id
def create_user_no_merge(self, uuid, primary_email, emails, fullname, username):
user_url = self.client.user_create({
'user': {
'external_id': uuid,
'name': format_name(username, fullname),
'email': primary_email,
'identities': emails_to_identities(emails)
}
})
user_id = get_id_from_url(user_url)
return user_id
def create_user(self, uuid, primary_email, emails, fullname, username):
if primary_email is None:
primary_email = emails[0]
try:
user_url = self.client.user_create({
'user': {
'external_id': uuid,
'name': format_name(username, fullname),
'email': primary_email,
'identities': emails_to_identities(emails)
}
})
except ZendeskError as e:
# if a user with one of these emails already exists...
if e.error_code == httplib.UNPROCESSABLE_ENTITY:
# find out which email is the conflict on
info = json.loads(e.msg)
# if the error is not in the email, we don't know how to handle it
if 'email' not in info['details'].keys():
raise e
description = info['details']['email'][0]['description']
match = duplicate_email_regex.search(description)
# if our regex fails, we don't know how to merge the users
if match is None:
raise e
email = match.group(1)
# find out which existing user has this email
existing_user = self.get_user_by_email(email)
# hijack the user
user_id = existing_user['id']
self.client.user_update(user_id, {
'user': {
'external_id': uuid
}
})
# add any other email addresses that need to be added
self.update_user_emails(user_id, primary_email, emails)
else:
raise e
else:
user_id = get_id_from_url(user_url)
return user_id
def safe_create_user(self, uuid, primary_email, emails, fullname, username):
user = self.get_user(uuid)
if user:
return None
else:
return self.create_user(uuid, primary_email, emails, fullname, username)
def safe_delete_user(self, uuid):
user = self.get_user(uuid)
if user is not None:
self.client.user_delete(user['id'])
return user is not None
def safe_suspend_user(self, uuid):
user = self.get_user(uuid)
if user is not None:
self.client.user_update(user['id'], {
'user': {
'suspended': True
}
})
return user is not None
def safe_delete_user_by_email(self, email):
user = self.get_user_by_email(email)
if user is not None:
self.client.user_delete(user['id'])
return user is not None
def get_users(self, users_uuids):
ret = self.client.users_show_many(','.join(users_uuids))
if 'users' not in ret.keys():
return []
return ret['users']
def get_users_by_id(self, users_ids):
ids_str = ','.join([str(i) for i in users_ids])
ret = self.client.users_show_many(ids=ids_str)
if 'users' not in ret.keys():
return []
return ret['users']
def get_org(self, uuid):
res = self.client.organizations_search(uuid)
if res['count'] != 0:
return res['organizations'][0]
return None
def safe_delete_org(self, uuid):
org = self.get_org(uuid)
if org is not None:
self.client.organization_delete(org['id'])
return org is not None
def safe_delete_org_by_name(self, name):
org = self.get_org_by_name(name)
if org is not None:
self.client.organization_delete(org['id'])
return org is not None
def get_org_by_name(self, name):
# We use the autocomplete api because it's the closest thing to
# searching for an organization by exact name that they have.
# We have to check to confirm that the first result is the correct
# one because orgs can be prefixes of one another. When there's an
# exact match, the first result should be the one we want.
# The minimum number of characters required for autocomplete is 2
# while the shortest hub org name allowed is 4 characters, so
# the length is not a problem.
res = self.client.organizations_autocomplete(name)
if res['count'] > 0:
org = res['organizations'][0]
if org['name'].lower() == name.lower():
return org
return None
# this list may be missing entries because of eventual consistency during
# changes of membership
def get_org_memberships(self, org_id):
res = self.client.organization_organization_memberships(org_id)
if res['count'] == 0:
return []
return res['organization_memberships']
def get_org_members(self, org_id):
memberships = self.get_org_memberships(org_id)
user_ids = [m['user_id'] for m in memberships]
return self.get_users_by_id(user_ids)
def create_org(self, uuid, name):
return self.client.organization_create({
'organization': {
'external_id': uuid,
'name': name
}
})
def rename_org(self, org_id, new_name):
self.client.organization_update(org_id, {
'organization': {
'name': new_name}})
def add_org_members(self, org_id, user_ids):
data = {'organization_memberships': make_memberships(user_ids, org_id)}
self.client.organization_memberships_create_many(data)
def remove_extra_org_members(self, memberships, user_ids):
extra_memberships = filter(lambda m: m['user_id'] not in user_ids,
memberships)
extra_membership_ids = [member['id'] for member in extra_memberships]
if len(extra_membership_ids) > 0:
self.client.organization_memberships_destroy_many(','.join([str(i) for i in extra_membership_ids]))
def safe_org_ownership_update(self, uuid, name, user_uuids):
"""
Warning: eventual consistency of user membership! Usually 1 second is
enough. The worst case failure is that a user has not been removed for
Zendesk and next membership change will trigger the removal.
"""
created_org = False
users = self.get_users(user_uuids)
org = self.get_org(uuid)
if org is None:
old_org = self.get_org_by_name(name)
if old_org is not None:
self.rename_org(old_org['id'], 'old {0}'.format(name))
# now that the old org is out of the way, we can create a new org
self.create_org(uuid, name)
org = self.get_org(uuid)
created_org = True
memberships = self.get_org_memberships(org['id'])
user_ids = [user['id'] for user in users]
self.add_org_members(org['id'], user_ids)
self.remove_extra_org_members(memberships, user_ids)
return created_org