Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import errno
import os
import platform
import sys
import subprocess
import re

from datetime import datetime
from dateutil.tz import tzutc
from dateutil.relativedelta import relativedelta
from botocore.utils import parse_timestamp

from awscli.compat import is_windows, urlparse, RawConfigParser, StringIO
from awscli.customizations import utils as cli_utils
from awscli.customizations.commands import BasicCommand
from awscli.customizations.utils import uni_print


def get_relative_expiration_time(remaining):
    values = []
    prev_non_zero_attr = False
    for attr in ["years", "months", "days", "hours", "minutes"]:
        value = getattr(remaining, attr)
        if value > 0:
            if prev_non_zero_attr:
                values.append("and")
            values.append(str(value))
            values.append(attr[:-1] if value == 1 else attr)
        if prev_non_zero_attr:
            break
        prev_non_zero_attr = value > 0

    message = " ".join(values)
    return message


class BaseLogin(object):
    _TOOL_NOT_FOUND_MESSAGE = '%s was not found. Please verify installation.'

    def __init__(self, auth_token, expiration, repository_endpoint,
                 domain, repository, subprocess_utils, namespace=None):
        self.auth_token = auth_token
        self.expiration = expiration
        self.repository_endpoint = repository_endpoint
        self.domain = domain
        self.repository = repository
        self.subprocess_utils = subprocess_utils
        self.namespace = namespace

    def login(self, dry_run=False):
        raise NotImplementedError('login()')

    def _dry_run_commands(self, tool, commands):
        for command in commands:
            sys.stdout.write(' '.join(command))
            sys.stdout.write(os.linesep)
            sys.stdout.write(os.linesep)

    def _write_success_message(self, tool):
        # add extra 30 seconds make expiration more reasonable
        # for some corner case
        # e.g. 11 hours 59 minutes 31 seconds should output --> 12 hours.
        remaining = relativedelta(
            self.expiration, datetime.now(tzutc())) + relativedelta(seconds=30)
        expiration_message = get_relative_expiration_time(remaining)

        sys.stdout.write('Successfully configured {} to use '
                         'AWS CodeArtifact repository {} '
                         .format(tool, self.repository_endpoint))
        sys.stdout.write(os.linesep)
        sys.stdout.write('Login expires in {} at {}'.format(
            expiration_message, self.expiration))
        sys.stdout.write(os.linesep)

    def _run_commands(self, tool, commands, dry_run=False):
        if dry_run:
            self._dry_run_commands(tool, commands)
            return

        for command in commands:
            try:
                self.subprocess_utils.check_call(
                    command,
                    stdout=self.subprocess_utils.PIPE,
                    stderr=self.subprocess_utils.PIPE,
                )
            except OSError as ex:
                if ex.errno == errno.ENOENT:
                    raise ValueError(
                        self._TOOL_NOT_FOUND_MESSAGE % tool
                    )
                raise ex

        self._write_success_message(tool)

    @classmethod
    def get_commands(cls, endpoint, auth_token, **kwargs):
        raise NotImplementedError('get_commands()')


class NuGetBaseLogin(BaseLogin):
    _NUGET_INDEX_URL_FMT = '{endpoint}v3/index.json'

    # When adding new sources we can specify that we added the source to the
    # user level NuGet.Config file. However, when updating an existing source
    # we cannot be specific about which level NuGet.Config file was updated
    # because it is possible that the existing source was not in the user
    # level NuGet.Config. The source listing command returns all configured
    # sources from all NuGet.Config levels. The update command updates the
    # source in whichever NuGet.Config file the source was found.
    _SOURCE_ADDED_MESSAGE = 'Added source %s to the user level NuGet.Config\n'
    _SOURCE_UPDATED_MESSAGE = 'Updated source %s in the NuGet.Config\n'

    def login(self, dry_run=False):
        try:
            source_to_url_dict = self._get_source_to_url_dict()
        except OSError as ex:
            if ex.errno == errno.ENOENT:
                raise ValueError(
                    self._TOOL_NOT_FOUND_MESSAGE % self._get_tool_name()
                )
            raise ex

        nuget_index_url = self._NUGET_INDEX_URL_FMT.format(
            endpoint=self.repository_endpoint
        )
        source_name, already_exists = self._get_source_name(
            nuget_index_url, source_to_url_dict
        )

        if already_exists:
            command = self._get_configure_command(
                'update', nuget_index_url, source_name
            )
            source_configured_message = self._SOURCE_UPDATED_MESSAGE
        else:
            command = self._get_configure_command('add', nuget_index_url, source_name)
            source_configured_message = self._SOURCE_ADDED_MESSAGE

        if dry_run:
            dry_run_command = ' '.join([str(cd) for cd in command])
            uni_print(dry_run_command)
            uni_print('\n')
            return

        try:
            self.subprocess_utils.check_output(
                command,
                stderr=self.subprocess_utils.PIPE
            )
        except subprocess.CalledProcessError as e:
            uni_print('Failed to update the NuGet.Config\n')
            raise e

        uni_print(source_configured_message % source_name)
        self._write_success_message('nuget')

    def _get_source_to_url_dict(self):
        # The response from listing sources takes the following form:
        #
        # Registered Sources:
        #   1.  Source Name 1 [Enabled]
        #       https://source1.com/index.json
        #   2.  Source Name 2 [Disabled]
        #       https://source2.com/index.json
        # ...
        #   100. Source Name 100
        #       https://source100.com/index.json

       # Or it can be (blank line after Registered Sources:)

       # Registered Sources:

       #   1.  Source Name 1 [Enabled]
       #       https://source1.com/index.json
       #   2.  Source Name 2 [Disabled]
       #       https://source2.com/index.json
       # ...
       #   100. Source Name 100
       #       https://source100.com/index.json

        response = self.subprocess_utils.check_output(
            self._get_list_command(),
            stderr=self.subprocess_utils.PIPE
        )

        lines = response.decode("utf-8").splitlines()
        lines = [line for line in lines if line.strip() != '']

        source_to_url_dict = {}
        for i in range(1, len(lines), 2):
            source_to_url_dict[self._parse_source_name(lines[i])] = \
                self._parse_source_url(lines[i + 1])

        return source_to_url_dict

    def _parse_source_name(self, line):
        # A source name line takes the following form:
        #   1.  NuGet Source [Enabled]

        # Remove the Enabled/Disabled tag.
        line_without_tag = line.strip().rsplit(' [', 1)[0]

        # Remove the leading number.
        return line_without_tag.split(None, 1)[1]

    def _parse_source_url(self, line):
        # A source url line takes the following form:
        #       https://source.com/index.json
        return line.strip()

    def _get_source_name(self, codeartifact_url, source_dict):
        default_name = '{}/{}'.format(self.domain, self.repository)

        # Check if the CodeArtifact URL is already present in the
        # NuGet.Config file. If the URL already exists, use the source name
        # already assigned to the CodeArtifact URL.
        for source_name, source_url in source_dict.items():
            if source_url == codeartifact_url:
                return source_name, True

        # If the CodeArtifact URL is not present in the NuGet.Config file,
        # check if the default source name already exists so we can know
        # whether we need to add a new entry or update the existing entry.
        for source_name in source_dict.keys():
            if source_name == default_name:
                return source_name, True

        # If neither the source url nor the source name already exist in the
        # NuGet.Config file, use the default source name.
        return default_name, False

    def _get_tool_name(self):
        raise NotImplementedError('_get_tool_name()')

    def _get_list_command(self):
        raise NotImplementedError('_get_list_command()')

    def _get_configure_command(self, operation, nuget_index_url, source_name):
        raise NotImplementedError('_get_configure_command()')


class NuGetLogin(NuGetBaseLogin):

    def _get_tool_name(self):
        return 'nuget'

    def _get_list_command(self):
        return ['nuget', 'sources', 'list', '-format', 'detailed']

    def _get_configure_command(self, operation, nuget_index_url, source_name):
        return [
            'nuget', 'sources', operation,
            '-name', source_name,
            '-source', nuget_index_url,
            '-username', 'aws',
            '-password', self.auth_token
        ]


class DotNetLogin(NuGetBaseLogin):

    def _get_tool_name(self):
        return 'dotnet'

    def _get_list_command(self):
        return ['dotnet', 'nuget', 'list', 'source', '--format', 'detailed']

    def _get_configure_command(self, operation, nuget_index_url, source_name):
        command = ['dotnet', 'nuget', operation, 'source']

        if operation == 'add':
            command.append(nuget_index_url)
            command += ['--name', source_name]
        else:
            command.append(source_name)
            command += ['--source', nuget_index_url]

        command += [
            '--username', 'aws',
            '--password', self.auth_token
        ]

        # Encryption is not supported on non-Windows platforms.
        if not is_windows:
            command.append('--store-password-in-clear-text')

        return command


class NpmLogin(BaseLogin):

    # On Windows we need to be explicit about the .cmd file to execute
    # (unless we execute through the shell, i.e. with shell=True).
    NPM_CMD = 'npm.cmd' if platform.system().lower() == 'windows' else 'npm'

    def login(self, dry_run=False):
        scope = self.get_scope(
            self.namespace
        )
        commands = self.get_commands(
            self.repository_endpoint, self.auth_token, scope=scope
        )
        self._run_commands('npm', commands, dry_run)

    @classmethod
    def get_scope(cls, namespace):
        # Regex for valid scope name
        valid_scope_name = re.compile('^(@[a-z0-9-~][a-z0-9-._~]*)')

        if namespace is None:
            return namespace

        # Add @ prefix to scope if it doesn't exist
        if namespace.startswith('@'):
            scope = namespace
        else:
            scope = '@{}'.format(namespace)

        if not valid_scope_name.match(scope):
            raise ValueError(
                'Invalid scope name, scope must contain URL-safe '
                'characters, no leading dots or underscores'
            )

        return scope

    @classmethod
    def get_commands(cls, endpoint, auth_token, **kwargs):
        commands = []
        scope = kwargs.get('scope')

        # prepend scope if it exists
        registry = '{}:registry'.format(scope) if scope else 'registry'

        # set up the codeartifact repository as the npm registry.
        commands.append(
            [cls.NPM_CMD, 'config', 'set', registry, endpoint]
        )

        repo_uri = urlparse.urlsplit(endpoint)

        # configure npm to always require auth for the repository.
        always_auth_config = '//{}{}:always-auth'.format(
            repo_uri.netloc, repo_uri.path
        )
        commands.append(
            [cls.NPM_CMD, 'config', 'set', always_auth_config, 'true']
        )

        # set auth info for the repository.
        auth_token_config = '//{}{}:_authToken'.format(
            repo_uri.netloc, repo_uri.path
        )
        commands.append(
            [cls.NPM_CMD, 'config', 'set', auth_token_config, auth_token]
        )

        return commands


class PipLogin(BaseLogin):

    PIP_INDEX_URL_FMT = '{scheme}://aws:{auth_token}@{netloc}{path}simple/'

    def login(self, dry_run=False):
        commands = self.get_commands(
            self.repository_endpoint, self.auth_token
        )
        self._run_commands('pip', commands, dry_run)

    @classmethod
    def get_commands(cls, endpoint, auth_token, **kwargs):
        repo_uri = urlparse.urlsplit(endpoint)
        pip_index_url = cls.PIP_INDEX_URL_FMT.format(
            scheme=repo_uri.scheme,
            auth_token=auth_token,
            netloc=repo_uri.netloc,
            path=repo_uri.path
        )

        return [['pip', 'config', 'set', 'global.index-url', pip_index_url]]


class TwineLogin(BaseLogin):

    DEFAULT_PYPI_RC_FMT = u'''\
[distutils]
index-servers=
    pypi
    codeartifact

[codeartifact]
repository: {repository_endpoint}
username: aws
password: {auth_token}'''

    def __init__(
        self,
        auth_token,
        expiration,
        repository_endpoint,
        domain,
        repository,
        subprocess_utils,
        pypi_rc_path=None
    ):
        if pypi_rc_path is None:
            pypi_rc_path = self.get_pypi_rc_path()
        self.pypi_rc_path = pypi_rc_path
        super(TwineLogin, self).__init__(
            auth_token, expiration, repository_endpoint,
            domain, repository, subprocess_utils)

    @classmethod
    def get_commands(cls, endpoint, auth_token, **kwargs):
        # TODO(ujjwalpa@): We don't really have a command to execute for Twine
        # as we directly write to the pypirc file (or to stdout for dryrun)
        # with python itself instead. Nevertheless, we're using this method for
        # testing so we'll keep the interface for now but return a string with
        # the expected pypirc content instead of a list of commands to
        # execute. This definitely reeks of code smell and there is probably
        # room for rethinking and refactoring the interfaces of these adapter
        # helper classes in the future.

        assert 'pypi_rc_path' in kwargs, 'pypi_rc_path must be provided.'
        pypi_rc_path = kwargs['pypi_rc_path']

        default_pypi_rc = cls.DEFAULT_PYPI_RC_FMT.format(
            repository_endpoint=endpoint,
            auth_token=auth_token
        )

        pypi_rc = RawConfigParser()
        if os.path.exists(pypi_rc_path):
            try:
                pypi_rc.read(pypi_rc_path)
                index_servers = pypi_rc.get('distutils', 'index-servers')
                servers = [
                    server.strip()
                    for server in index_servers.split('\n')
                    if server.strip() != ''
                ]

                if 'codeartifact' not in servers:
                    servers.append('codeartifact')
                    pypi_rc.set(
                        'distutils', 'index-servers', '\n' + '\n'.join(servers)
                    )

                if 'codeartifact' not in pypi_rc.sections():
                    pypi_rc.add_section('codeartifact')

                pypi_rc.set('codeartifact', 'repository', endpoint)
                pypi_rc.set('codeartifact', 'username', 'aws')
                pypi_rc.set('codeartifact', 'password', auth_token)
            except Exception as e:  # invalid .pypirc file
                sys.stdout.write('%s is in an invalid state.' % pypi_rc_path)
                sys.stdout.write(os.linesep)
                raise e
        else:
            pypi_rc.readfp(StringIO(default_pypi_rc))

        pypi_rc_stream = StringIO()
        pypi_rc.write(pypi_rc_stream)
        pypi_rc_str = pypi_rc_stream.getvalue()
        pypi_rc_stream.close()

        return pypi_rc_str

    def login(self, dry_run=False):
        # No command to execute for Twine, we get the expected pypirc content
        # instead.
        pypi_rc_str = self.get_commands(
            self.repository_endpoint,
            self.auth_token,
            pypi_rc_path=self.pypi_rc_path
        )

        if dry_run:
            sys.stdout.write('Dryrun mode is enabled, not writing to pypirc.')
            sys.stdout.write(os.linesep)
            sys.stdout.write(
                '%s would have been set to the following:' % self.pypi_rc_path
            )
            sys.stdout.write(os.linesep)
            sys.stdout.write(os.linesep)
            sys.stdout.write(pypi_rc_str)
            sys.stdout.write(os.linesep)
        else:
            with open(self.pypi_rc_path, 'w+') as fp:
                fp.write(pypi_rc_str)

            self._write_success_message('twine')

    @classmethod
    def get_pypi_rc_path(cls):
        return os.path.join(os.path.expanduser("~"), ".pypirc")


class CodeArtifactLogin(BasicCommand):
    '''Log in to the idiomatic tool for the requested package format.'''

    TOOL_MAP = {
        'nuget': {
            'package_format': 'nuget',
            'login_cls': NuGetLogin,
            'namespace_support': False,
        },
        'dotnet': {
            'package_format': 'nuget',
            'login_cls': DotNetLogin,
            'namespace_support': False,
        },
        'npm': {
            'package_format': 'npm',
            'login_cls': NpmLogin,
            'namespace_support': True,
        },
        'pip': {
            'package_format': 'pypi',
            'login_cls': PipLogin,
            'namespace_support': False,
        },
        'twine': {
            'package_format': 'pypi',
            'login_cls': TwineLogin,
            'namespace_support': False,
        }
    }

    NAME = 'login'

    DESCRIPTION = (
        'Sets up the idiomatic tool for your package format to use your '
        'CodeArtifact repository. Your login information is valid for up '
        'to 12 hours after which you must login again.'
    )

    ARG_TABLE = [
        {
            'name': 'tool',
            'help_text': 'The tool you want to connect with your repository',
            'choices': list(TOOL_MAP.keys()),
            'required': True,
        },
        {
            'name': 'domain',
            'help_text': 'Your CodeArtifact domain name',
            'required': True,
        },
        {
            'name': 'domain-owner',
            'help_text': 'The AWS account ID that owns your CodeArtifact '
                         'domain',
            'required': False,
        },
        {
            'name': 'namespace',
            'help_text': 'Associates a namespace with your repository tool',
            'required': False,
        },
        {
            'name': 'duration-seconds',
            'cli_type_name': 'integer',
            'help_text': 'The time, in seconds, that the login information '
                         'is valid',
            'required': False,
        },
        {
            'name': 'repository',
            'help_text': 'Your CodeArtifact repository name',
            'required': True,
        },
        {
            'name': 'dry-run',
            'action': 'store_true',
            'help_text': 'Only print the commands that would be executed '
                         'to connect your tool with your repository without '
                         'making any changes to your configuration',
            'required': False,
            'default': False
        },
    ]

    def _get_namespace(self, tool, parsed_args):
        namespace_compatible = self.TOOL_MAP[tool]['namespace_support']

        if not namespace_compatible and parsed_args.namespace:
            raise ValueError(
                'Argument --namespace is not supported for {}'.format(tool)
            )
        else:
            return parsed_args.namespace

    def _get_repository_endpoint(
        self, codeartifact_client, parsed_args, package_format
    ):
        kwargs = {
            'domain': parsed_args.domain,
            'repository': parsed_args.repository,
            'format': package_format
        }
        if parsed_args.domain_owner:
            kwargs['domainOwner'] = parsed_args.domain_owner

        get_repository_endpoint_response = \
            codeartifact_client.get_repository_endpoint(**kwargs)

        return get_repository_endpoint_response['repositoryEndpoint']

    def _get_authorization_token(self, codeartifact_client, parsed_args):
        kwargs = {
            'domain': parsed_args.domain
        }
        if parsed_args.domain_owner:
            kwargs['domainOwner'] = parsed_args.domain_owner

        if parsed_args.duration_seconds:
            kwargs['durationSeconds'] = parsed_args.duration_seconds

        get_authorization_token_response = \
            codeartifact_client.get_authorization_token(**kwargs)

        return get_authorization_token_response

    def _run_main(self, parsed_args, parsed_globals):
        tool = parsed_args.tool.lower()

        package_format = self.TOOL_MAP[tool]['package_format']

        codeartifact_client = cli_utils.create_client_from_parsed_globals(
            self._session, 'codeartifact', parsed_globals
        )

        auth_token_res = self._get_authorization_token(
            codeartifact_client, parsed_args
        )

        repository_endpoint = self._get_repository_endpoint(
            codeartifact_client, parsed_args, package_format
        )

        domain = parsed_args.domain
        repository = parsed_args.repository
        namespace = self._get_namespace(tool, parsed_args)

        auth_token = auth_token_res['authorizationToken']
        expiration = parse_timestamp(auth_token_res['expiration'])
        login = self.TOOL_MAP[tool]['login_cls'](
            auth_token, expiration, repository_endpoint,
            domain, repository, subprocess, namespace
        )

        login.login(parsed_args.dry_run)

        return 0