Repository URL to install this package:
Version:
0.2.10 ▾
|
from requests import request, ConnectionError
from social.utils import SSLHttpAdapter, module_member, parse_qs, user_agent
from social.exceptions import AuthFailed
class BaseAuth(object):
"""A django.contrib.auth backend that authenticates the user based on
a authentication provider response"""
name = '' # provider name, it's stored in database
supports_inactive_user = False # Django auth
ID_KEY = None
EXTRA_DATA = None
REQUIRES_EMAIL_VALIDATION = False
SEND_USER_AGENT = False
SSL_PROTOCOL = None
def __init__(self, strategy=None, redirect_uri=None):
self.strategy = strategy
self.redirect_uri = redirect_uri
self.data = {}
if strategy:
self.data = self.strategy.request_data()
self.redirect_uri = self.strategy.absolute_uri(
self.redirect_uri
)
def setting(self, name, default=None):
"""Return setting value from strategy"""
return self.strategy.setting(name, default=default, backend=self)
def start(self):
# Clean any partial pipeline info before starting the process
self.strategy.clean_partial_pipeline()
if self.uses_redirect():
return self.strategy.redirect(self.auth_url())
else:
return self.strategy.html(self.auth_html())
def complete(self, *args, **kwargs):
return self.auth_complete(*args, **kwargs)
def auth_url(self):
"""Must return redirect URL to auth provider"""
raise NotImplementedError('Implement in subclass')
def auth_html(self):
"""Must return login HTML content returned by provider"""
raise NotImplementedError('Implement in subclass')
def auth_complete(self, *args, **kwargs):
"""Completes loging process, must return user instance"""
raise NotImplementedError('Implement in subclass')
def process_error(self, data):
"""Process data for errors, raise exception if needed.
Call this method on any override of auth_complete."""
pass
def authenticate(self, *args, **kwargs):
"""Authenticate user using social credentials
Authentication is made if this is the correct backend, backend
verification is made by kwargs inspection for current backend
name presence.
"""
# Validate backend and arguments. Require that the Social Auth
# response be passed in as a keyword argument, to make sure we
# don't match the username/password calling conventions of
# authenticate.
if 'backend' not in kwargs or kwargs['backend'].name != self.name or \
'strategy' not in kwargs or 'response' not in kwargs:
return None
self.strategy = self.strategy or kwargs.get('strategy')
self.redirect_uri = self.redirect_uri or kwargs.get('redirect_uri')
self.data = self.strategy.request_data()
pipeline = self.strategy.get_pipeline()
kwargs.setdefault('is_new', False)
if 'pipeline_index' in kwargs:
pipeline = pipeline[kwargs['pipeline_index']:]
return self.pipeline(pipeline, *args, **kwargs)
def pipeline(self, pipeline, pipeline_index=0, *args, **kwargs):
out = self.run_pipeline(pipeline, pipeline_index, *args, **kwargs)
if not isinstance(out, dict):
return out
user = out.get('user')
if user:
user.social_user = out.get('social')
user.is_new = out.get('is_new')
return user
def disconnect(self, *args, **kwargs):
pipeline = self.strategy.get_disconnect_pipeline()
if 'pipeline_index' in kwargs:
pipeline = pipeline[kwargs['pipeline_index']:]
kwargs['name'] = self.name
kwargs['user_storage'] = self.strategy.storage.user
return self.run_pipeline(pipeline, *args, **kwargs)
def run_pipeline(self, pipeline, pipeline_index=0, *args, **kwargs):
out = kwargs.copy()
out.setdefault('strategy', self.strategy)
out.setdefault('backend', out.pop(self.name, None) or self)
out.setdefault('request', self.strategy.request_data())
out.setdefault('details', {})
for idx, name in enumerate(pipeline):
out['pipeline_index'] = pipeline_index + idx
func = module_member(name)
result = func(*args, **out) or {}
if not isinstance(result, dict):
return result
out.update(result)
self.strategy.clean_partial_pipeline()
return out
def extra_data(self, user, uid, response, details=None, *args, **kwargs):
"""Return deafault extra data to store in extra_data field"""
data = {}
for entry in (self.EXTRA_DATA or []) + self.setting('EXTRA_DATA', []):
if not isinstance(entry, (list, tuple)):
entry = (entry,)
size = len(entry)
if size >= 1 and size <= 3:
if size == 3:
name, alias, discard = entry
elif size == 2:
(name, alias), discard = entry, False
elif size == 1:
name = alias = entry[0]
discard = False
value = response.get(name) or details.get(name)
if discard and not value:
continue
data[alias] = value
return data
def auth_allowed(self, response, details):
"""Return True if the user should be allowed to authenticate, by
default check if email is whitelisted (if there's a whitelist)"""
emails = self.setting('WHITELISTED_EMAILS', [])
domains = self.setting('WHITELISTED_DOMAINS', [])
email = details.get('email')
allowed = True
if email and (emails or domains):
domain = email.split('@', 1)[1]
allowed = email in emails or domain in domains
return allowed
def get_user_id(self, details, response):
"""Return a unique ID for the current user, by default from server
response."""
return response.get(self.ID_KEY)
def get_user_details(self, response):
"""Must return user details in a know internal struct:
{'username': <username if any>,
'email': <user email if any>,
'fullname': <user full name if any>,
'first_name': <user first name if any>,
'last_name': <user last name if any>}
"""
raise NotImplementedError('Implement in subclass')
def get_user_names(self, fullname='', first_name='', last_name=''):
# Avoid None values
fullname = fullname or ''
first_name = first_name or ''
last_name = last_name or ''
if fullname and not (first_name or last_name):
try:
first_name, last_name = fullname.split(' ', 1)
except ValueError:
first_name = first_name or fullname or ''
last_name = last_name or ''
fullname = fullname or ' '.join((first_name, last_name))
return fullname.strip(), first_name.strip(), last_name.strip()
def get_user(self, user_id):
"""
Return user with given ID from the User model used by this backend.
This is called by django.contrib.auth.middleware.
"""
from social.strategies.utils import get_current_strategy
strategy = self.strategy or get_current_strategy()
return strategy.get_user(user_id)
def continue_pipeline(self, *args, **kwargs):
"""Continue previous halted pipeline"""
kwargs.update({'backend': self, 'strategy': self.strategy})
return self.authenticate(*args, **kwargs)
def auth_extra_arguments(self):
"""Return extra arguments needed on auth process. The defaults can be
overriden by GET parameters."""
extra_arguments = self.setting('AUTH_EXTRA_ARGUMENTS', {}).copy()
extra_arguments.update((key, self.data[key]) for key in extra_arguments
if key in self.data)
return extra_arguments
def uses_redirect(self):
"""Return True if this provider uses redirect url method,
otherwise return false."""
return True
def request(self, url, method='GET', *args, **kwargs):
kwargs.setdefault('headers', {})
if self.setting('VERIFY_SSL') is not None:
kwargs.setdefault('verify', self.setting('VERIFY_SSL'))
kwargs.setdefault('timeout', self.setting('REQUESTS_TIMEOUT') or
self.setting('URLOPEN_TIMEOUT'))
if self.SEND_USER_AGENT and 'User-Agent' not in kwargs['headers']:
kwargs['headers']['User-Agent'] = user_agent()
try:
if self.SSL_PROTOCOL:
session = SSLHttpAdapter.ssl_adapter_session(self.SSL_PROTOCOL)
response = session.request(method, url, *args, **kwargs)
else:
response = request(method, url, *args, **kwargs)
except ConnectionError as err:
raise AuthFailed(self, str(err))
response.raise_for_status()
return response
def get_json(self, url, *args, **kwargs):
return self.request(url, *args, **kwargs).json()
def get_querystring(self, url, *args, **kwargs):
return parse_qs(self.request(url, *args, **kwargs).text)
def get_key_and_secret(self):
"""Return tuple with Consumer Key and Consumer Secret for current
service provider. Must return (key, secret), order *must* be respected.
"""
return self.setting('KEY'), self.setting('SECRET')