Repository URL to install this package:
Version:
0.2.10 ▾
|
# -*- coding: utf-8 -*-
"""
VK.com OpenAPI, OAuth2 and Iframe application OAuth2 backends, docs at:
http://psa.matiasaguirre.net/docs/backends/vk.html
"""
from time import time
from hashlib import md5
from social.utils import parse_qs
from social.backends.base import BaseAuth
from social.backends.oauth import BaseOAuth2
from social.exceptions import AuthTokenRevoked, AuthException
class VKontakteOpenAPI(BaseAuth):
"""VK.COM OpenAPI authentication backend"""
name = 'vk-openapi'
ID_KEY = 'id'
def get_user_details(self, response):
"""Return user details from VK.com request"""
nickname = response.get('nickname') or ''
fullname, first_name, last_name = self.get_user_names(
first_name=response.get('first_name', [''])[0],
last_name=response.get('last_name', [''])[0]
)
return {
'username': response['id'] if len(nickname) == 0 else nickname,
'email': '',
'fullname': fullname,
'first_name': first_name,
'last_name': last_name
}
def user_data(self, access_token, *args, **kwargs):
return self.data
def auth_html(self):
"""Returns local VK authentication page, not necessary for
VK to authenticate.
"""
ctx = {'VK_APP_ID': self.setting('APP_ID'),
'VK_COMPLETE_URL': self.redirect_uri}
local_html = self.setting('LOCAL_HTML', 'vkontakte.html')
return self.strategy.render_html(tpl=local_html, context=ctx)
def auth_complete(self, *args, **kwargs):
"""Performs check of authentication in VKontakte, returns User if
succeeded"""
session_value = self.strategy.session_get(
'vk_app_' + self.setting('APP_ID')
)
if 'id' not in self.data or not session_value:
raise ValueError('VK.com authentication is not completed')
mapping = parse_qs(session_value)
check_str = ''.join(item + '=' + mapping[item]
for item in ['expire', 'mid', 'secret', 'sid'])
key, secret = self.get_key_and_secret()
hash = md5((check_str + secret).encode('utf-8')).hexdigest()
if hash != mapping['sig'] or int(mapping['expire']) < time():
raise ValueError('VK.com authentication failed: Invalid Hash')
kwargs.update({'backend': self,
'response': self.user_data(mapping['mid'])})
return self.strategy.authenticate(*args, **kwargs)
def uses_redirect(self):
"""VK.com does not require visiting server url in order
to do authentication, so auth_xxx methods are not needed to be called.
Their current implementation is just an example"""
return False
class VKOAuth2(BaseOAuth2):
"""VKOAuth2 authentication backend"""
name = 'vk-oauth2'
ID_KEY = 'user_id'
AUTHORIZATION_URL = 'http://oauth.vk.com/authorize'
ACCESS_TOKEN_URL = 'https://oauth.vk.com/access_token'
ACCESS_TOKEN_METHOD = 'POST'
EXTRA_DATA = [
('id', 'id'),
('expires_in', 'expires')
]
def get_user_id(self, details, response):
return response['uid']
def get_user_details(self, response):
"""Return user details from VK.com account"""
fullname, first_name, last_name = self.get_user_names(
first_name=response.get('first_name'),
last_name=response.get('last_name')
)
return {'username': response.get('screen_name'),
'email': response.get('email', ''),
'fullname': fullname,
'first_name': first_name,
'last_name': last_name}
def user_data(self, access_token, *args, **kwargs):
"""Loads user data from service"""
request_data = ['first_name', 'last_name', 'screen_name', 'nickname',
'photo'] + self.setting('EXTRA_DATA', [])
fields = ','.join(set(request_data))
data = vk_api(self, 'users.get', {
'access_token': access_token,
'fields': fields,
})
if data.get('error'):
error = data['error']
msg = error.get('error_msg', 'Unknown error')
if error.get('error_code') == 5:
raise AuthTokenRevoked(self, msg)
else:
raise AuthException(self, msg)
if data:
data = data.get('response')[0]
data['user_photo'] = data.get('photo') # Backward compatibility
return data
class VKAppOAuth2(VKOAuth2):
"""VK.com Application Authentication support"""
name = 'vk-app'
def user_profile(self, user_id, access_token=None):
request_data = ['first_name', 'last_name', 'screen_name', 'nickname',
'photo'] + self.setting('EXTRA_DATA', [])
fields = ','.join(set(request_data))
data = {'uids': user_id, 'fields': fields}
if access_token:
data['access_token'] = access_token
profiles = vk_api(self, 'getProfiles', data).get('response')
if profiles:
return profiles[0]
def auth_complete(self, *args, **kwargs):
required_params = ('is_app_user', 'viewer_id', 'access_token',
'api_id')
if not all(param in self.data for param in required_params):
return None
auth_key = self.data.get('auth_key')
# Verify signature, if present
key, secret = self.get_key_and_secret()
if auth_key:
check_key = md5('_'.join([key,
self.data.get('viewer_id'),
secret]).encode('utf-8')).hexdigest()
if check_key != auth_key:
raise ValueError('VK.com authentication failed: invalid '
'auth key')
user_check = self.setting('USERMODE')
user_id = self.data.get('viewer_id')
if user_check is not None:
user_check = int(user_check)
if user_check == 1:
is_user = self.data.get('is_app_user')
elif user_check == 2:
is_user = vk_api(self, 'isAppUser',
{'uid': user_id}).get('response', 0)
if not int(is_user):
return None
auth_data = {
'auth': self,
'backend': self,
'request': self.strategy.request_data(),
'response': {
'user_id': user_id,
}
}
auth_data['response'].update(self.user_profile(user_id))
return self.strategy.authenticate(*args, **auth_data)
def vk_api(backend, method, data):
"""
Calls VK.com OpenAPI method, check:
https://vk.com/apiclub
http://goo.gl/yLcaa
"""
# We need to perform server-side call if no access_token
data['v'] = backend.setting('API_VERSION', '3.0')
if 'access_token' not in data:
key, secret = backend.get_key_and_secret()
if 'api_id' not in data:
data['api_id'] = key
data['method'] = method
data['format'] = 'json'
url = 'http://api.vk.com/api.php'
param_list = sorted(list(item + '=' + data[item] for item in data))
data['sig'] = md5(
(''.join(param_list) + secret).encode('utf-8')
).hexdigest()
else:
url = 'https://api.vk.com/method/' + method
try:
return backend.get_json(url, params=data)
except (TypeError, KeyError, IOError, ValueError, IndexError):
return None