Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
django-siwe-auth / siwe_auth / backend.py
Size: Mime:
import datetime
import logging
from typing import Optional

import pytz

from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import BaseBackend
from django.contrib.auth.models import Group, AbstractUser
from web3 import Web3, HTTPProvider
from web3.middleware import geth_poa_middleware
from ens import ENS

from siwe.siwe import (
    SiweMessage,
    ValidationError,
    ExpiredMessage,
    MalformedSession,
    InvalidSignature,
)

from .custom_groups.group_manager import GroupManager
from .models import Wallet
from .utils.nonce import Nonce


class SiweBackend(BaseBackend):
    """
    Authenticate an Ethereum address as per Sign-In with Ethereum (EIP-4361).
    """

    def authenticate(self, request, signature: str = None, siwe_message: SiweMessage = None, **kwargs):
        if signature is None or siwe_message is None:
            logging.info(f"Authentication attempt rejected due to missing data:"
                         f"signature={signature}, siwe_message={siwe_message}")
            return None

        # Validate signature
        provider = HTTPProvider(settings.PROVIDER)
        w3 = Web3(provider)
        w3.middleware_onion.inject(geth_poa_middleware, layer=0)

        try:
            siwe_message.validate(signature=signature, provider=provider)
        except ExpiredMessage:
            logging.info("Authentication attempt rejected due to expired message.")
            return None
        except MalformedSession as e:
            logging.info(
                f"Authentication attempt rejected due to missing fields: {', '.join(e.missing_fields)}"
            )
            return None
        except InvalidSignature:
            logging.info("Authentication attempt rejected due to invalid signature.")
            return None
        except ValidationError:
            logging.info("Authentication attempt rejected due to invalid message.")
            return None

        # Validate nonce
        now = datetime.datetime.now(tz=pytz.UTC)
        nonce_data = request.session.pop("nonce", None)
        nonce = Nonce(nonce_data["value"], datetime.datetime.fromisoformat(nonce_data["expiration"])) if nonce_data is not None else None

        if not nonce or nonce.value != siwe_message.nonce or nonce.expiration < now:
            logging.info(f"Authentication attempt rejected due to invalid nonce: {nonce}")
            return None

        # Pull ENS data
        if getattr(settings, "CREATE_ENS_PROFILE_ON_AUTHN", True):
            ens_profile = ENSProfile(ethereum_address=siwe_message.address, w3=w3)
        else:
            ens_profile = ENSProfile.__new__(ENSProfile) # blank ENSProfile, skipping __init__ constructor

        # Message and nonce has been validated. Authentication complete. Continue with authorization/other.
        ens_name_field = getattr(settings, "USER_ENS_NAME_FIELD", "ens_name")
        ens_avatar_field = getattr(settings, "USER_ENS_AVATAR_FIELD", "ens_avatar")
        populate_ens_data = getattr(settings, "USER_POPULATE_ENS_DATA", True)
        try:
            user = Wallet.objects.get(pk=siwe_message.address).user
            user.last_login = now
            if populate_ens_data:
                setattr(user, ens_name_field, ens_profile.name)
            user.save()
            logging.debug(f"Found wallet for address {siwe_message.address}")
        except Wallet.DoesNotExist:
            extra = {ens_name_field: ens_profile.name,
                     ens_avatar_field: ens_profile.avatar} if populate_ens_data else {}
            user = Wallet.objects.create_user(
                ethereum_address=siwe_message.address,
                last_login=now,
                **extra
            )
            logging.debug(
                f"Could not find wallet for address {siwe_message.address}. Creating new wallet object."
            )
        wallet = user.wallet

        # Group settings
        if getattr(settings, "CREATE_GROUPS_ON_AUTHN", False):
            for custom_group in settings.CUSTOM_GROUPS:
                group, created = Group.objects.get_or_create(name=custom_group[0])
                if created:
                    logging.info(f"Created group '{custom_group[0]}'.")
                group_manager: GroupManager = custom_group[1]
                if group_manager.is_member(
                    wallet=wallet,
                    provider=provider,
                ):
                    logging.info(
                        f"Adding user '{user}' to group '{custom_group[0]}'."
                    )
                    user.groups.add(group)

        return user

    def get_user(self, user_id) -> Optional[AbstractUser]:
        """
        Get Wallet by ethereum address if exists.
        :param ethereum_address: Ethereum address of user.
        :return: Wallet object if exists or None
        """
        User = get_user_model()
        try:
            return User.objects.get(pk=user_id)
        except User.DoesNotExist:
            return None


class ENSProfile:
    """
    Container for ENS profile information including but not limited to primary name and avatar.
    """

    name: str = None
    avatar: str = None

    def __init__(self, ethereum_address: str, w3: Web3):
        # Temporary until https://github.com/ethereum/web3.py/pull/2286 is merged
        self.name = ENS.fromWeb3(w3).name(address=w3.toChecksumAddress(ethereum_address))
        resolver = ENS.fromWeb3(w3).resolver(normal_name=self.name)
        # if resolver:
        #     self.avatar = resolver.caller.text(normal_name_to_hash(self.name), 'avatar')