Repository URL to install this package:
| 
          
        
        Version: 
           
    
          5.1.2  ▾
        
   | 
from systemtests.tests.systemtest import WorkloadMgrSystemTest
import time
Description = 'Test18:                                           \n'\
              '      Create workload type                       \n'\
              '      Workload metadata changes                  \n'\
              '      Delete the workloadtype that is created    \n'\
              '                                               '
class test18(WorkloadMgrSystemTest):
    def __init__(self, testshell):
        super(test18, self).__init__(testshell, Description)
        self._workloadtype = None
    """
    Setup the conditions for test to run
    """
    def prepare(self, *args, **kwargs):
        # Cleanup swift first
        super(test18, self).prepare(args, kwargs)
        # Make sure vm as specified in the argument vm1 exists
        # on the production
        workloads = self._testshell.cs.workloads.list()
        self.serialtype = None
        for type in self._testshell.cs.workload_types.list():
            if type.name == 'Serial':
                self.serialtype = type
                break
        if self.serialtype is None:
            raise Exception("Serial workloadtype not found")
        # We will use VM4
        self.vm = None
        for vm in self._testshell.novaclient.servers.list():
            if str(vm.name).lower() == "vm4":
                self.vm = vm
                break
        # May be I need to create a VM with in the test itself
        if self.vm is None:
            raise Exception("VM4 is not found in the vm inventory")
    """
    run the test
    """
    def run(self, *args, **kwargs):
        # Create serial workload with the VM
        # Make sure that the workload is created
        instances = []
        instances.append({'instance-id': self.vm.id})
        self._metadata = {
            'key1': 'string',
            'key2': 'boolean',
            'key3': 'password',
            'key4': 'boolean'}
        self._name = "Workload1"
        self._description = "Test Workload"
        self._workloadtype = self._testshell.cs.workload_types.create(
            metadata=self._metadata, name=self._name, description=self._description)
        status = self._workloadtype.status
        while True:
            self._workloadtype = self._testshell.cs.workload_types.get(
                self._workloadtype.id)
            status = self._workloadtype.status
            if status == 'available' or status == 'error':
                break
            time.sleep(5)
        self._metadata = {
            'key1': 'key1',
            'key2': 'False',
            'key3': 'key3',
            'key4': 'True'}
        self.workload = self._testshell.cs.workloads.create(
            self._name,
            self._description,
            self._workloadtype.id,
            instances,
            {},
            self._metadata)
        status = self.workload.status
        while True:
            self.workload = self._testshell.cs.workloads.get(self.workload.id)
            status = self.workload.status
            if status == 'available' or status == 'error':
                break
            time.sleep(5)
    def verify(self, *args, **kwargs):
        if self._name != self.workload.name:
            raise Exception("workload type name is not same")
        if self.workload.description != self._description:
            raise Exception("Workload type description did not match")
        if self.workload.metadata['key1'] != 'key1':
            raise Exception("Workload type key1 did not match")
        if self.workload.metadata['key2'] != 'False':
            raise Exception("Workload type key2 did not match")
        if self.workload.metadata['key3'] != 'key3':
            raise Exception("Workload type key3 did not match")
        if self.workload.metadata['key4'] != 'True':
            raise Exception("Workload type key4 did not match")
    """
    cleanup the test
    """
    def cleanup(self, *args, **kwargs):
        # Delete the workload that is created
        if self.workload:
            wid = self.workload.id
            self._testshell.cs.workloads.delete(self.workload.id)
        if self._workloadtype:
            wid = self._workloadtype.id
            self._testshell.cs.workload_types.delete(self._workloadtype.id)
        for wload in self._testshell.cs.workloads.list():
            if wload.id == self.workload.id:
                raise Exception("Workload exists after delete")
        for wload in self._testshell.cs.workload_types.list():
            if wload.id == self._workloadtype.id:
                raise Exception("Workload exists after delete")