Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
python3-dmapi / usr / lib / python3 / dist-packages / dmapi / tests / functional / integrated_helpers.py
Size: Mime:
# Copyright 2018 TrilioData Inc.
# All Rights Reserved.

"""
Provides common functionality for integrated unit tests
"""

import random
import string
import time

from oslo_log import log as logging

import dmapi.conf
import nova.image.glance
from nova import test
from nova.tests import fixtures as nova_fixtures
from nova.tests.functional.api import client as api_client
from nova.tests.unit import cast_as_call
import nova.tests.unit.image.fake
from nova.tests import uuidsentinel as uuids


CONF = dmapi.conf.CONF
LOG = logging.getLogger(__name__)


def generate_random_alphanumeric(length):
    """Creates a random alphanumeric string of specified length."""
    return ''.join(random.SystemRandom().choice(string.ascii_uppercase +
        string.digits) for _x in range(length))


def generate_random_numeric(length):
    """Creates a random numeric string of specified length."""
    return ''.join(random.SystemRandom().choice(string.digits)
                   for _x in range(length))


def generate_new_element(items, prefix, numeric=False):
    """Creates a random string with prefix, that is not in 'items' list."""
    while True:
        if numeric:
            candidate = prefix + generate_random_numeric(8)
        else:
            candidate = prefix + generate_random_alphanumeric(8)
        if candidate not in items:
            return candidate
        LOG.debug("Random collision on %s", candidate)


class _IntegratedTestBase(test.TestCase):
    REQUIRES_LOCKING = True
    ADMIN_API = False
    # Override this in subclasses which use the NeutronFixture. New tests
    # should rely on Neutron since nova-network is deprecated. The default
    # value of False here is only temporary while we update the existing
    # functional tests to use Neutron.
    USE_NEUTRON = False

    def setUp(self):
        super(_IntegratedTestBase, self).setUp()

        # TODO(mriedem): Fix the functional tests to work with Neutron.
        self.flags(use_neutron=self.USE_NEUTRON)

        nova.tests.unit.image.fake.stub_out_image_service(self)

        self.useFixture(cast_as_call.CastAsCall(self))
        placement = self.useFixture(nova_fixtures.PlacementFixture())
        self.placement_api = placement.api

        self._setup_services()

        self.addCleanup(nova.tests.unit.image.fake.FakeImageService_reset)

    def _setup_compute_service(self):
        return self.start_service('compute')

    def _setup_scheduler_service(self):
        return self.start_service('scheduler')

    def _setup_services(self):
        # NOTE(danms): Set the global MQ connection to that of our first cell
        # for any cells-ignorant code. Normally this is defaulted in the tests
        # which will result in us not doing the right thing.
        if 'cell1' in self.cell_mappings:
            self.flags(transport_url=self.cell_mappings['cell1'].transport_url)
        self.conductor = self.start_service('conductor')
        self.consoleauth = self.start_service('consoleauth')

        if self.USE_NEUTRON:
            self.neutron = self.useFixture(nova_fixtures.NeutronFixture(self))
        else:
            self.network = self.start_service('network',
                                              manager=CONF.network_manager)
        self.scheduler = self._setup_scheduler_service()

        self.compute = self._setup_compute_service()
        self.api_fixture = self.useFixture(
            nova_fixtures.OSAPIFixture(self.api_major_version))

        # if the class needs to run as admin, make the api endpoint
        # the admin, otherwise it's safer to run as non admin user.
        if self.ADMIN_API:
            self.api = self.api_fixture.admin_api
        else:
            self.api = self.api_fixture.api

        if hasattr(self, 'microversion'):
            self.api.microversion = self.microversion

    def get_unused_server_name(self):
        servers = self.api.get_servers()
        server_names = [server['name'] for server in servers]
        return generate_new_element(server_names, 'server')

    def get_unused_flavor_name_id(self):
        flavors = self.api.get_flavors()
        flavor_names = list()
        flavor_ids = list()
        [(flavor_names.append(flavor['name']),
         flavor_ids.append(flavor['id']))
         for flavor in flavors]
        return (generate_new_element(flavor_names, 'flavor'),
                int(generate_new_element(flavor_ids, '', True)))

    def get_invalid_image(self):
        return uuids.fake

    def _build_minimal_create_server_request(self, image_uuid=None):
        server = {}

        # NOTE(takashin): In API version 2.36, image APIs were deprecated.
        # In API version 2.36 or greater, self.api.get_images() returns
        # a 404 error. In that case, 'image_uuid' should be specified.
        server[self._image_ref_parameter] = (image_uuid or
                                             self.api.get_images()[0]['id'])

        # Set a valid flavorId
        flavor = self.api.get_flavors()[0]
        LOG.debug("Using flavor: %s", flavor)
        server[self._flavor_ref_parameter] = ('http://fake.server/%s'
                                              % flavor['id'])

        # Set a valid server name
        server_name = self.get_unused_server_name()
        server['name'] = server_name
        return server

    def _create_flavor_body(self, name, ram, vcpus, disk, ephemeral, id, swap,
                            rxtx_factor, is_public):
        return {
            "flavor": {
                "name": name,
                "ram": ram,
                "vcpus": vcpus,
                "disk": disk,
                "OS-FLV-EXT-DATA:ephemeral": ephemeral,
                "id": id,
                "swap": swap,
                "rxtx_factor": rxtx_factor,
                "os-flavor-access:is_public": is_public,
            }
        }

    def _create_flavor(self, memory_mb=2048, vcpu=2, disk=10, ephemeral=10,
                       swap=0, rxtx_factor=1.0, is_public=True,
                       extra_spec=None):
        flv_name, flv_id = self.get_unused_flavor_name_id()
        body = self._create_flavor_body(flv_name, memory_mb, vcpu, disk,
                                        ephemeral, flv_id, swap, rxtx_factor,
                                        is_public)
        self.api_fixture.admin_api.post_flavor(body)
        if extra_spec is not None:
            spec = {"extra_specs": extra_spec}
            self.api_fixture.admin_api.post_extra_spec(flv_id, spec)
        return flv_id

    def _build_server(self, flavor_id, image=None):
        server = {}
        if image is None:
            image = self.api.get_images()[0]
            LOG.debug("Image: %s", image)

            # We now have a valid imageId
            server[self._image_ref_parameter] = image['id']
        else:
            server[self._image_ref_parameter] = image

        # Set a valid flavorId
        flavor = self.api.get_flavor(flavor_id)
        LOG.debug("Using flavor: %s", flavor)
        server[self._flavor_ref_parameter] = ('http://fake.server/%s'
                                              % flavor['id'])

        # Set a valid server name
        server_name = self.get_unused_server_name()
        server['name'] = server_name
        return server

    def _check_api_endpoint(self, endpoint, expected_middleware):
        app = self.api_fixture.app().get((None, '/v2'))

        while getattr(app, 'application', False):
            for middleware in expected_middleware:
                if isinstance(app.application, middleware):
                    expected_middleware.remove(middleware)
                    break
            app = app.application

        self.assertEqual([],
                         expected_middleware,
                         ("The expected wsgi middlewares %s are not "
                          "existed") % expected_middleware)


class InstanceHelperMixin(object):
    def _wait_for_server_parameter(self, admin_api, server, expected_params,
                                   max_retries=10):
        retry_count = 0
        while True:
            server = admin_api.get_server(server['id'])
            if all([server[attr] == expected_params[attr]
                    for attr in expected_params]):
                break
            retry_count += 1
            if retry_count == max_retries:
                self.fail('Wait for state change failed, '
                          'expected_params=%s, server=%s'
                          % (expected_params, server))
            time.sleep(0.5)

        return server

    def _wait_for_state_change(self, admin_api, server, expected_status,
                               max_retries=10):
        return self._wait_for_server_parameter(
            admin_api, server, {'status': expected_status}, max_retries)

    def _build_minimal_create_server_request(self, api, name, image_uuid=None,
                                             flavor_id=None, networks=None):
        server = {}

        # We now have a valid imageId
        server['imageRef'] = image_uuid or api.get_images()[0]['id']

        if not flavor_id:
            # Set a valid flavorId
            flavor_id = api.get_flavors()[1]['id']
        server['flavorRef'] = ('http://fake.server/%s' % flavor_id)
        server['name'] = name
        if networks is not None:
            server['networks'] = networks
        return server

    def _wait_until_deleted(self, server):
        try:
            for i in range(40):
                server = self.api.get_server(server['id'])
                if server['status'] == 'ERROR':
                    self.fail('Server went to error state instead of'
                              'disappearing.')
                time.sleep(0.5)

            self.fail('Server failed to delete.')
        except api_client.OpenStackApiNotFoundException:
            return

    def _wait_for_action_fail_completion(
            self, server, expected_action, event_name, api=None):
        """Polls instance action events for the given instance, action and
        action event name until it finds the action event with an error
        result.
        """
        if api is None:
            api = self.api
        completion_event = None
        for attempt in range(10):
            actions = api.get_instance_actions(server['id'])
            # Look for the migrate action.
            for action in actions:
                if action['action'] == expected_action:
                    events = (
                        api.api_get(
                            '/servers/%s/os-instance-actions/%s' %
                            (server['id'], action['request_id'])
                        ).body['instanceAction']['events'])
                    # Look for the action event being in error state.
                    for event in events:
                        if (event['event'] == event_name and
                                event['result'] is not None and
                                event['result'].lower() == 'error'):
                            completion_event = event
                            # Break out of the events loop.
                            break
                    if completion_event:
                        # Break out of the actions loop.
                        break
            # We didn't find the completion event yet, so wait a bit.
            time.sleep(0.5)

        if completion_event is None:
            self.fail('Timed out waiting for %s failure event. Current '
                      'instance actions: %s' % (event_name, actions))

    def _wait_for_migration_status(self, server, expected_statuses):
        """Waits for a migration record with the given statuses to be found
        for the given server, else the test fails. The migration record, if
        found, is returned.
        """
        api = getattr(self, 'admin_api', None)
        if api is None:
            api = self.api

        statuses = [status.lower() for status in expected_statuses]
        for attempt in range(10):
            migrations = api.api_get('/os-migrations').body['migrations']
            for migration in migrations:
                if (migration['instance_uuid'] == server['id'] and
                        migration['status'].lower() in statuses):
                    return migration
            time.sleep(0.5)
        self.fail('Timed out waiting for migration with status "%s" for '
                  'instance: %s' % (expected_statuses, server['id']))