Repository URL to install this package:
| 
      
        
        
        Version: 
        
         
          
          4.1.142  ▾
        
         | 
| 
    
    python3-dmapi
  
    /
        
    usr
  
        /
        
    lib
  
        /
        
    python3
  
        /
        
    dist-packages
  
        /
        
    dmapi
  
        /
        
    tests
  
        /
        
    functional
  
        /
        integrated_helpers.py
   | 
|---|
# Copyright 2018 TrilioData Inc.
# All Rights Reserved.
"""
Provides common functionality for integrated unit tests
"""
import random
import string
import time
from oslo_log import log as logging
import dmapi.conf
import nova.image.glance
from nova import test
from nova.tests import fixtures as nova_fixtures
from nova.tests.functional.api import client as api_client
from nova.tests.unit import cast_as_call
import nova.tests.unit.image.fake
from nova.tests import uuidsentinel as uuids
CONF = dmapi.conf.CONF
LOG = logging.getLogger(__name__)
def generate_random_alphanumeric(length):
    """Creates a random alphanumeric string of specified length."""
    return ''.join(random.SystemRandom().choice(string.ascii_uppercase +
        string.digits) for _x in range(length))
def generate_random_numeric(length):
    """Creates a random numeric string of specified length."""
    return ''.join(random.SystemRandom().choice(string.digits)
                   for _x in range(length))
def generate_new_element(items, prefix, numeric=False):
    """Creates a random string with prefix, that is not in 'items' list."""
    while True:
        if numeric:
            candidate = prefix + generate_random_numeric(8)
        else:
            candidate = prefix + generate_random_alphanumeric(8)
        if candidate not in items:
            return candidate
        LOG.debug("Random collision on %s", candidate)
class _IntegratedTestBase(test.TestCase):
    REQUIRES_LOCKING = True
    ADMIN_API = False
    # Override this in subclasses which use the NeutronFixture. New tests
    # should rely on Neutron since nova-network is deprecated. The default
    # value of False here is only temporary while we update the existing
    # functional tests to use Neutron.
    USE_NEUTRON = False
    def setUp(self):
        super(_IntegratedTestBase, self).setUp()
        # TODO(mriedem): Fix the functional tests to work with Neutron.
        self.flags(use_neutron=self.USE_NEUTRON)
        nova.tests.unit.image.fake.stub_out_image_service(self)
        self.useFixture(cast_as_call.CastAsCall(self))
        placement = self.useFixture(nova_fixtures.PlacementFixture())
        self.placement_api = placement.api
        self._setup_services()
        self.addCleanup(nova.tests.unit.image.fake.FakeImageService_reset)
    def _setup_compute_service(self):
        return self.start_service('compute')
    def _setup_scheduler_service(self):
        return self.start_service('scheduler')
    def _setup_services(self):
        # NOTE(danms): Set the global MQ connection to that of our first cell
        # for any cells-ignorant code. Normally this is defaulted in the tests
        # which will result in us not doing the right thing.
        if 'cell1' in self.cell_mappings:
            self.flags(transport_url=self.cell_mappings['cell1'].transport_url)
        self.conductor = self.start_service('conductor')
        self.consoleauth = self.start_service('consoleauth')
        if self.USE_NEUTRON:
            self.neutron = self.useFixture(nova_fixtures.NeutronFixture(self))
        else:
            self.network = self.start_service('network',
                                              manager=CONF.network_manager)
        self.scheduler = self._setup_scheduler_service()
        self.compute = self._setup_compute_service()
        self.api_fixture = self.useFixture(
            nova_fixtures.OSAPIFixture(self.api_major_version))
        # if the class needs to run as admin, make the api endpoint
        # the admin, otherwise it's safer to run as non admin user.
        if self.ADMIN_API:
            self.api = self.api_fixture.admin_api
        else:
            self.api = self.api_fixture.api
        if hasattr(self, 'microversion'):
            self.api.microversion = self.microversion
    def get_unused_server_name(self):
        servers = self.api.get_servers()
        server_names = [server['name'] for server in servers]
        return generate_new_element(server_names, 'server')
    def get_unused_flavor_name_id(self):
        flavors = self.api.get_flavors()
        flavor_names = list()
        flavor_ids = list()
        [(flavor_names.append(flavor['name']),
         flavor_ids.append(flavor['id']))
         for flavor in flavors]
        return (generate_new_element(flavor_names, 'flavor'),
                int(generate_new_element(flavor_ids, '', True)))
    def get_invalid_image(self):
        return uuids.fake
    def _build_minimal_create_server_request(self, image_uuid=None):
        server = {}
        # NOTE(takashin): In API version 2.36, image APIs were deprecated.
        # In API version 2.36 or greater, self.api.get_images() returns
        # a 404 error. In that case, 'image_uuid' should be specified.
        server[self._image_ref_parameter] = (image_uuid or
                                             self.api.get_images()[0]['id'])
        # Set a valid flavorId
        flavor = self.api.get_flavors()[0]
        LOG.debug("Using flavor: %s", flavor)
        server[self._flavor_ref_parameter] = ('http://fake.server/%s'
                                              % flavor['id'])
        # Set a valid server name
        server_name = self.get_unused_server_name()
        server['name'] = server_name
        return server
    def _create_flavor_body(self, name, ram, vcpus, disk, ephemeral, id, swap,
                            rxtx_factor, is_public):
        return {
            "flavor": {
                "name": name,
                "ram": ram,
                "vcpus": vcpus,
                "disk": disk,
                "OS-FLV-EXT-DATA:ephemeral": ephemeral,
                "id": id,
                "swap": swap,
                "rxtx_factor": rxtx_factor,
                "os-flavor-access:is_public": is_public,
            }
        }
    def _create_flavor(self, memory_mb=2048, vcpu=2, disk=10, ephemeral=10,
                       swap=0, rxtx_factor=1.0, is_public=True,
                       extra_spec=None):
        flv_name, flv_id = self.get_unused_flavor_name_id()
        body = self._create_flavor_body(flv_name, memory_mb, vcpu, disk,
                                        ephemeral, flv_id, swap, rxtx_factor,
                                        is_public)
        self.api_fixture.admin_api.post_flavor(body)
        if extra_spec is not None:
            spec = {"extra_specs": extra_spec}
            self.api_fixture.admin_api.post_extra_spec(flv_id, spec)
        return flv_id
    def _build_server(self, flavor_id, image=None):
        server = {}
        if image is None:
            image = self.api.get_images()[0]
            LOG.debug("Image: %s", image)
            # We now have a valid imageId
            server[self._image_ref_parameter] = image['id']
        else:
            server[self._image_ref_parameter] = image
        # Set a valid flavorId
        flavor = self.api.get_flavor(flavor_id)
        LOG.debug("Using flavor: %s", flavor)
        server[self._flavor_ref_parameter] = ('http://fake.server/%s'
                                              % flavor['id'])
        # Set a valid server name
        server_name = self.get_unused_server_name()
        server['name'] = server_name
        return server
    def _check_api_endpoint(self, endpoint, expected_middleware):
        app = self.api_fixture.app().get((None, '/v2'))
        while getattr(app, 'application', False):
            for middleware in expected_middleware:
                if isinstance(app.application, middleware):
                    expected_middleware.remove(middleware)
                    break
            app = app.application
        self.assertEqual([],
                         expected_middleware,
                         ("The expected wsgi middlewares %s are not "
                          "existed") % expected_middleware)
class InstanceHelperMixin(object):
    def _wait_for_server_parameter(self, admin_api, server, expected_params,
                                   max_retries=10):
        retry_count = 0
        while True:
            server = admin_api.get_server(server['id'])
            if all([server[attr] == expected_params[attr]
                    for attr in expected_params]):
                break
            retry_count += 1
            if retry_count == max_retries:
                self.fail('Wait for state change failed, '
                          'expected_params=%s, server=%s'
                          % (expected_params, server))
            time.sleep(0.5)
        return server
    def _wait_for_state_change(self, admin_api, server, expected_status,
                               max_retries=10):
        return self._wait_for_server_parameter(
            admin_api, server, {'status': expected_status}, max_retries)
    def _build_minimal_create_server_request(self, api, name, image_uuid=None,
                                             flavor_id=None, networks=None):
        server = {}
        # We now have a valid imageId
        server['imageRef'] = image_uuid or api.get_images()[0]['id']
        if not flavor_id:
            # Set a valid flavorId
            flavor_id = api.get_flavors()[1]['id']
        server['flavorRef'] = ('http://fake.server/%s' % flavor_id)
        server['name'] = name
        if networks is not None:
            server['networks'] = networks
        return server
    def _wait_until_deleted(self, server):
        try:
            for i in range(40):
                server = self.api.get_server(server['id'])
                if server['status'] == 'ERROR':
                    self.fail('Server went to error state instead of'
                              'disappearing.')
                time.sleep(0.5)
            self.fail('Server failed to delete.')
        except api_client.OpenStackApiNotFoundException:
            return
    def _wait_for_action_fail_completion(
            self, server, expected_action, event_name, api=None):
        """Polls instance action events for the given instance, action and
        action event name until it finds the action event with an error
        result.
        """
        if api is None:
            api = self.api
        completion_event = None
        for attempt in range(10):
            actions = api.get_instance_actions(server['id'])
            # Look for the migrate action.
            for action in actions:
                if action['action'] == expected_action:
                    events = (
                        api.api_get(
                            '/servers/%s/os-instance-actions/%s' %
                            (server['id'], action['request_id'])
                        ).body['instanceAction']['events'])
                    # Look for the action event being in error state.
                    for event in events:
                        if (event['event'] == event_name and
                                event['result'] is not None and
                                event['result'].lower() == 'error'):
                            completion_event = event
                            # Break out of the events loop.
                            break
                    if completion_event:
                        # Break out of the actions loop.
                        break
            # We didn't find the completion event yet, so wait a bit.
            time.sleep(0.5)
        if completion_event is None:
            self.fail('Timed out waiting for %s failure event. Current '
                      'instance actions: %s' % (event_name, actions))
    def _wait_for_migration_status(self, server, expected_statuses):
        """Waits for a migration record with the given statuses to be found
        for the given server, else the test fails. The migration record, if
        found, is returned.
        """
        api = getattr(self, 'admin_api', None)
        if api is None:
            api = self.api
        statuses = [status.lower() for status in expected_statuses]
        for attempt in range(10):
            migrations = api.api_get('/os-migrations').body['migrations']
            for migration in migrations:
                if (migration['instance_uuid'] == server['id'] and
                        migration['status'].lower() in statuses):
                    return migration
            time.sleep(0.5)
        self.fail('Timed out waiting for migration with status "%s" for '
                  'instance: %s' % (expected_statuses, server['id']))