Repository URL to install this package:
|
Version:
0.3.2+sf4 ▾
|
import datetime
import logging
from typing import Optional
import pytz
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import BaseBackend
from django.contrib.auth.models import Group, AbstractUser
from web3 import Web3, HTTPProvider
from web3.middleware import geth_poa_middleware
from ens import ENS
from siwe.siwe import (
SiweMessage,
ValidationError,
ExpiredMessage,
MalformedSession,
InvalidSignature,
)
from .custom_groups.group_manager import GroupManager
from .models import Wallet
from .utils.nonce import Nonce
class SiweBackend(BaseBackend):
"""
Authenticate an Ethereum address as per Sign-In with Ethereum (EIP-4361).
"""
def authenticate(self, request, signature: str = None, siwe_message: SiweMessage = None, **kwargs):
if signature is None or siwe_message is None:
logging.info(f"Authentication attempt rejected due to missing data:"
f"signature={signature}, siwe_message={siwe_message}")
return None
# Validate signature
provider = HTTPProvider(settings.PROVIDER)
w3 = Web3(provider)
w3.middleware_onion.inject(geth_poa_middleware, layer=0)
try:
siwe_message.validate(signature=signature, provider=provider)
except ExpiredMessage:
logging.info("Authentication attempt rejected due to expired message.")
return None
except MalformedSession as e:
logging.info(
f"Authentication attempt rejected due to missing fields: {', '.join(e.missing_fields)}"
)
return None
except InvalidSignature:
logging.info("Authentication attempt rejected due to invalid signature.")
return None
except ValidationError:
logging.info("Authentication attempt rejected due to invalid message.")
return None
# Validate nonce
now = datetime.datetime.now(tz=pytz.UTC)
nonce_data = request.session.pop("nonce", None)
nonce = Nonce(nonce_data["value"], datetime.datetime.fromisoformat(nonce_data["expiration"])) if nonce_data is not None else None
if not nonce or nonce.value != siwe_message.nonce or nonce.expiration < now:
logging.info(f"Authentication attempt rejected due to invalid nonce: {nonce}")
return None
# Pull ENS data
if getattr(settings, "CREATE_ENS_PROFILE_ON_AUTHN", True):
ens_profile = ENSProfile(ethereum_address=siwe_message.address, w3=w3)
else:
ens_profile = ENSProfile.__new__(ENSProfile) # blank ENSProfile, skipping __init__ constructor
# Message and nonce has been validated. Authentication complete. Continue with authorization/other.
ens_name_field = getattr(settings, "USER_ENS_NAME_FIELD", "ens_name")
ens_avatar_field = getattr(settings, "USER_ENS_AVATAR_FIELD", "ens_avatar")
populate_ens_data = getattr(settings, "USER_POPULATE_ENS_DATA", True)
try:
user = Wallet.objects.get(pk=siwe_message.address).user
user.last_login = now
if populate_ens_data:
setattr(user, ens_name_field, ens_profile.name)
user.save()
logging.debug(f"Found wallet for address {siwe_message.address}")
except Wallet.DoesNotExist:
extra = {ens_name_field: ens_profile.name,
ens_avatar_field: ens_profile.avatar} if populate_ens_data else {}
user = Wallet.objects.create_user(
ethereum_address=siwe_message.address,
last_login=now,
**extra
)
logging.debug(
f"Could not find wallet for address {siwe_message.address}. Creating new wallet object."
)
wallet = user.wallet
# Group settings
if getattr(settings, "CREATE_GROUPS_ON_AUTHN", False):
for custom_group in settings.CUSTOM_GROUPS:
group, created = Group.objects.get_or_create(name=custom_group[0])
if created:
logging.info(f"Created group '{custom_group[0]}'.")
group_manager: GroupManager = custom_group[1]
if group_manager.is_member(
wallet=wallet,
provider=provider,
):
logging.info(
f"Adding user '{user}' to group '{custom_group[0]}'."
)
user.groups.add(group)
return user
def get_user(self, user_id) -> Optional[AbstractUser]:
"""
Get Wallet by ethereum address if exists.
:param ethereum_address: Ethereum address of user.
:return: Wallet object if exists or None
"""
User = get_user_model()
try:
return User.objects.get(pk=user_id)
except User.DoesNotExist:
return None
class ENSProfile:
"""
Container for ENS profile information including but not limited to primary name and avatar.
"""
name: str = None
avatar: str = None
def __init__(self, ethereum_address: str, w3: Web3):
# Temporary until https://github.com/ethereum/web3.py/pull/2286 is merged
self.name = ENS.fromWeb3(w3).name(address=w3.toChecksumAddress(ethereum_address))
resolver = ENS.fromWeb3(w3).resolver(normal_name=self.name)
# if resolver:
# self.avatar = resolver.caller.text(normal_name_to_hash(self.name), 'avatar')