3 # Copyright (c) 2018 Orange and others.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 """Ease deploying tenant networks
12 It offers a simple way to create all tenant network ressources required by a
13 testcase (including all Functest ones):
14 - TenantNetwork1 selects the user and the project set as env vars
15 - TenantNetwork2 creates a user and project to isolate the same ressources
17 This classes could be reused by more complexed scenarios (Single VM)
25 import os_client_config
27 from xtesting.core import testcase
29 from functest.utils import config
30 from functest.utils import env
31 from functest.utils import functest_utils
34 class TenantNetwork1(testcase.TestCase):
35 # pylint: disable=too-many-instance-attributes
36 """Create a tenant network (scenario1)
38 It creates and configures all tenant network ressources required by
39 advanced testcases (subnet, network and router).
41 It ensures that all testcases inheriting from TenantNetwork1 could work
42 without network specific configurations (or at least read the same config
46 __logger = logging.getLogger(__name__)
47 cidr = '192.168.0.0/24'
49 def __init__(self, **kwargs):
50 if "case_name" not in kwargs:
51 kwargs["case_name"] = 'tenantnetwork1'
52 super(TenantNetwork1, self).__init__(**kwargs)
53 self.res_dir = os.path.join(
54 getattr(config.CONF, 'dir_results'), self.case_name)
56 cloud_config = os_client_config.get_config()
57 self.cloud = shade.OpenStackCloud(cloud_config=cloud_config)
58 except Exception: # pylint: disable=broad-except
61 self.__logger.exception("Cannot connect to Cloud")
63 self.ext_net = functest_utils.get_external_network(self.cloud)
64 except Exception: # pylint: disable=broad-except
65 self.__logger.exception("Cannot get the external network")
66 self.guid = str(uuid.uuid4())
71 def _create_network_ressources(self):
75 if hasattr(config.CONF, '{}_network_type'.format(self.case_name)):
76 provider["network_type"] = getattr(
77 config.CONF, '{}_network_type'.format(self.case_name))
78 if hasattr(config.CONF, '{}_physical_network'.format(self.case_name)):
79 provider["physical_network"] = getattr(
80 config.CONF, '{}_physical_network'.format(self.case_name))
81 if hasattr(config.CONF, '{}_segmentation_id'.format(self.case_name)):
82 provider["segmentation_id"] = getattr(
83 config.CONF, '{}_segmentation_id'.format(self.case_name))
84 self.network = self.cloud.create_network(
85 '{}-net_{}'.format(self.case_name, self.guid),
87 self.__logger.debug("network: %s", self.network)
89 self.subnet = self.cloud.create_subnet(
91 subnet_name='{}-subnet_{}'.format(self.case_name, self.guid),
93 config.CONF, '{}_private_subnet_cidr'.format(self.case_name),
96 dns_nameservers=[env.get('NAMESERVER')])
97 self.__logger.debug("subnet: %s", self.subnet)
99 self.router = self.cloud.create_router(
100 name='{}-router_{}'.format(self.case_name, self.guid),
101 ext_gateway_net_id=self.ext_net.id)
102 self.__logger.debug("router: %s", self.router)
103 self.cloud.add_router_interface(self.router, subnet_id=self.subnet.id)
105 def run(self, **kwargs):
106 status = testcase.TestCase.EX_RUN_ERROR
109 self.start_time = time.time()
110 self._create_network_ressources()
112 status = testcase.TestCase.EX_OK
113 except Exception: # pylint: disable=broad-except
114 self.__logger.exception('Cannot run %s', self.case_name)
116 self.stop_time = time.time()
122 self.cloud.remove_router_interface(self.router, self.subnet.id)
123 self.cloud.delete_router(self.router.id)
124 self.cloud.delete_network(self.network.id)
125 except Exception: # pylint: disable=broad-except
126 self.__logger.exception("cannot clean all ressources")
129 class TenantNetwork2(TenantNetwork1):
130 """Create a tenant network (scenario2)
132 It creates new user/project before creating and configuring all tenant
133 network ressources required by a testcase (subnet, network and router).
135 It ensures that all testcases inheriting from TenantNetwork2 could work
136 without network specific configurations (or at least read the same config
140 __logger = logging.getLogger(__name__)
142 def __init__(self, **kwargs):
143 if "case_name" not in kwargs:
144 kwargs["case_name"] = 'tenantnetwork2'
145 super(TenantNetwork2, self).__init__(**kwargs)
148 self.domain = self.cloud.get_domain(
149 name_or_id=self.cloud.auth.get(
150 "project_domain_name", "Default"))
151 except Exception: # pylint: disable=broad-except
153 self.__logger.exception("Cannot connect to Cloud")
156 self.orig_cloud = None
157 self.password = str(uuid.uuid4())
159 def run(self, **kwargs):
163 self.project = self.cloud.create_project(
164 name='{}-project_{}'.format(self.case_name, self.guid),
165 description="Created by OPNFV Functest: {}".format(
167 domain_id=self.domain.id)
168 self.__logger.debug("project: %s", self.project)
169 self.user = self.cloud.create_user(
170 name='{}-user_{}'.format(self.case_name, self.guid),
171 password=self.password,
172 default_project=self.project.id,
173 domain_id=self.domain.id)
174 self.__logger.debug("user: %s", self.user)
175 self.orig_cloud = self.cloud
176 os.environ["OS_USERNAME"] = self.user.name
177 os.environ["OS_PROJECT_NAME"] = self.user.default_project_id
178 cloud_config = os_client_config.get_config()
179 self.cloud = shade.OpenStackCloud(cloud_config=cloud_config)
180 os.environ["OS_USERNAME"] = self.orig_cloud.auth["username"]
181 os.environ["OS_PROJECT_NAME"] = self.orig_cloud.auth[
183 except Exception: # pylint: disable=broad-except
184 self.__logger.exception("Cannot create user or project")
185 return testcase.TestCase.EX_RUN_ERROR
186 return super(TenantNetwork2, self).run(**kwargs)
191 super(TenantNetwork2, self).clean()
193 assert self.project.id
194 self.orig_cloud.delete_user(self.user.id)
195 self.orig_cloud.delete_project(self.project.id)
196 except Exception: # pylint: disable=broad-except
197 self.__logger.exception("cannot clean all ressources")