import json
import sys
from copy import deepcopy
-import yaml
-import functest.utils.openstack_utils as os_utils
-import functest.core.vnf as vnf
-import pkg_resources
+from urlparse import urljoin
+import functest.core.vnf as vnf
from functest.opnfv_tests.openstack.snaps import snaps_utils
from functest.utils.constants import CONST
+import functest.utils.openstack_utils as os_utils
-from snaps.openstack.os_credentials import OSCreds
-from snaps.openstack.create_network import (NetworkSettings,
- SubnetSettings, OpenStackNetwork)
-from snaps.openstack.create_router import (RouterSettings, OpenStackRouter)
-from snaps.openstack.create_flavor import (FlavorSettings, OpenStackFlavor)
-from snaps.openstack.create_image import (ImageSettings, OpenStackImage)
-from snaps.openstack.tests import openstack_tests
+import pkg_resources
+from snaps.config.flavor import FlavorConfig
+from snaps.config.image import ImageConfig
+from snaps.config.network import NetworkConfig, SubnetConfig
+from snaps.config.router import RouterConfig
+from snaps.openstack.create_flavor import OpenStackFlavor
+from snaps.openstack.create_image import OpenStackImage
+from snaps.openstack.create_network import OpenStackNetwork
+from snaps.openstack.create_router import OpenStackRouter
from snaps.openstack.utils import keystone_utils
+import yaml
__author__ = "Amarendra Meher <amarendra@rebaca.com>"
__author__ = "Soumaya K Nayek <soumaya.nayek@rebaca.com>"
class JujuEpc(vnf.VnfOnBoarding):
+ # pylint:disable=too-many-instance-attributes
"""Abot EPC deployed with JUJU Orchestrator Case"""
__logger = logging.getLogger(__name__)
+ default_region_name = "RegionOne"
+
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "juju_epc"
)
self.created_object = []
- self.snaps_creds = ''
-
- self.os_creds = openstack_tests.get_credentials(
- os_env_file=CONST.__getattribute__('openstack_creds'))
-
+ self.snaps_creds = snaps_utils.get_credentials()
self.details['orchestrator'] = dict(
name=get_config("orchestrator.name", config_file),
version=get_config("orchestrator.version", config_file),
self.glance_client = os_utils.get_glance_client()
self.neutron_client = os_utils.get_neutron_client()
self.nova_client = os_utils.get_nova_client()
+ self.sec_group_id = None
+ self.public_auth_url = None
+ self.creds = None
+ self.filename = None
def prepare(self):
"""Prepare testcase (Additional pre-configuration steps)."""
self.__logger.info("Additional pre-configuration steps")
self.public_auth_url = keystone_utils.get_endpoint(
self.snaps_creds, 'identity')
+ # it enforces a versioned public identity endpoint as juju simply
+ # adds /auth/tokens wich fails vs an unversioned endpoint.
+ if not self.public_auth_url.endswith(('v3', 'v3/', 'v2.0', 'v2.0/')):
+ self.public_auth_url = urljoin(self.public_auth_url, 'v3')
self.creds = {
"tenant": self.tenant_name,
"auth_url": os_utils.get_credentials()['auth_url']
}
- self.snaps_creds = OSCreds(
- username=self.creds['username'],
- password=self.creds['password'],
- auth_url=self.creds['auth_url'],
- project_name=self.creds['tenant'],
- identity_api_version=int(os_utils.get_keystone_client_version()))
-
cloud_data = {
'url': self.public_auth_url,
'pass': self.tenant_name,
'tenant_n': self.tenant_name,
- 'user_n': self.tenant_name
+ 'user_n': self.tenant_name,
+ 'region': os.environ.get(
+ "OS_REGION_NAME", self.default_region_name)
}
self.__logger.info("Cloud DATA: %s", cloud_data)
self.filename = os.path.join(self.case_dir, 'abot-epc.yaml')
- self.__logger.info("Cretae %s to add cloud info", self.filename)
+ self.__logger.info("Create %s to add cloud info", self.filename)
write_config(self.filename, CLOUD_TEMPLATE, **cloud_data)
if self.snaps_creds.identity_api_version == 3:
- append_config(self.filename, '{}'.format(
- os_utils.get_credentials()['project_domain_name']),
- '{}'.format(os_utils.get_credentials()
- ['user_domain_name']))
+ append_config(
+ self.filename, '{}'.format(
+ os_utils.get_credentials()['project_domain_name']),
+ '{}'.format(os_utils.get_credentials()['user_domain_name']))
self.__logger.info("Upload some OS images if it doesn't exist")
for image_name, image_file in self.images.iteritems():
if image_file and image_name:
image_creator = OpenStackImage(
self.snaps_creds,
- ImageSettings(name=image_name,
- image_user='cloud',
- img_format='qcow2',
- image_file=image_file))
+ ImageConfig(name=image_name,
+ image_user='cloud',
+ img_format='qcow2',
+ image_file=image_file))
image_creator.create()
self.created_object.append(image_creator)
- def deploy_orchestrator(self):
+ def deploy_orchestrator(self): # pylint: disable=too-many-locals
"""
Create network, subnet, router
'vnf_{}_external_network_name'.format(self.case_name))
self.__logger.info("Creating full network ...")
- subnet_settings = SubnetSettings(name=private_subnet_name,
- cidr=private_subnet_cidr,
- dns_nameservers=dns_nameserver)
- network_settings = NetworkSettings(name=private_net_name,
- subnet_settings=[subnet_settings])
+ subnet_settings = SubnetConfig(name=private_subnet_name,
+ cidr=private_subnet_cidr,
+ dns_nameservers=dns_nameserver)
+ network_settings = NetworkConfig(name=private_net_name,
+ subnet_settings=[subnet_settings])
network_creator = OpenStackNetwork(self.snaps_creds, network_settings)
network_creator.create()
self.created_object.append(network_creator)
self.__logger.info("Creating network Router ....")
router_creator = OpenStackRouter(
self.snaps_creds,
- RouterSettings(
+ RouterConfig(
name=abot_router,
external_gateway=ext_net_name,
internal_subnets=[subnet_settings.name]))
router_creator.create()
self.created_object.append(router_creator)
self.__logger.info("Creating Flavor ....")
- flavor_settings = FlavorSettings(
+ flavor_settings = FlavorConfig(
name=self.orchestrator['requirements']['flavor']['name'],
ram=self.orchestrator['requirements']['flavor']['ram_min'],
disk=10,
for image_name in self.images.keys():
self.__logger.info("Generating Metadata for %s", image_name)
image_id = os_utils.get_image_id(self.glance_client, image_name)
- os.system('juju metadata generate-image -d ~ -i {} -s {} -r '
- 'RegionOne -u {}'.format(image_id,
- image_name,
- self.public_auth_url))
+ os.system(
+ 'juju metadata generate-image -d ~ -i {} -s {} -r '
+ '{} -u {}'.format(
+ image_id, image_name,
+ os.environ.get("OS_REGION_NAME", self.default_region_name),
+ self.public_auth_url))
net_id = os_utils.get_network_id(self.neutron_client, private_net_name)
self.__logger.info("Credential information : %s", net_id)
juju_bootstrap_command = ('juju bootstrap abot-epc abot-controller '
'--config network={} --metadata-source ~ '
'--config ssl-hostname-verification=false '
'--constraints mem=2G --bootstrap-series '
- 'trusty '
+ 'xenial '
'--config use-floating-ip=true --debug'.
format(net_id))
os.system(juju_bootstrap_command)
self.__logger.info("Upload VNFD")
descriptor = self.vnf['descriptor']
self.__logger.info("Get or create flavor for all Abot-EPC")
- flavor_settings = FlavorSettings(
+ flavor_settings = FlavorConfig(
name=self.vnf['requirements']['flavor']['name'],
ram=self.vnf['requirements']['flavor']['ram_min'],
disk=10,
count = count + 1
os.system('juju-wait')
return True
- else:
- return False
+ return False
def test_vnf(self):
"""Run test on ABoT."""
for tag in element['tags']:
element[tag['name']] = 1
- except:
- logging.error("Error in updating data, %s" % (sys.exc_info()[0]))
+ except Exception: # pylint: disable=broad-except
+ logging.error("Error in updating data, %s", sys.exc_info()[0])
raise
return obj
try:
instance = nova_client.servers.get(instance.id)
return instance.metadata
- except Exception as e:
- logging.error("Error [get_instance_status(nova_client)]: %s" % e)
+ except Exception as exc: # pylint: disable=broad-except
+ logging.error("Error [get_instance_status(nova_client)]: %s", exc)
return None
auth-types: [userpass]
endpoint: {url}
regions:
- RegionOne:
+ {region}:
endpoint: {url}
credentials:
abot-epc: