Use new environment script to setup env to run onos function test
[functest.git] / testcases / functest_utils.py
1 #!/usr/bin/env python
2 #
3 # jose.lausuch@ericsson.com
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9
10 import re, json, os, urllib2, shutil, subprocess, sys
11
12 ############# CREDENTIALS OPENSTACK #############
13 def check_credentials():
14     """
15     Check if the OpenStack credentials (openrc) are sourced
16     """
17     #TODO: there must be a short way to do this, doing if os.environ["something"] == "" throws an error
18     try:
19        os.environ['OS_AUTH_URL']
20     except KeyError:
21         return False
22     try:
23        os.environ['OS_USERNAME']
24     except KeyError:
25         return False
26     try:
27        os.environ['OS_PASSWORD']
28     except KeyError:
29         return False
30     try:
31        os.environ['OS_TENANT_NAME']
32     except KeyError:
33         return False
34     return True
35
36
37 def get_credentials(service):
38     """Returns a creds dictionary filled with the following keys:
39     * username
40     * password/api_key (depending on the service)
41     * tenant_name/project_id (depending on the service)
42     * auth_url
43     :param service: a string indicating the name of the service
44                     requesting the credentials.
45     """
46     #TODO: get credentials from the openrc file
47     creds = {}
48     # Unfortunately, each of the OpenStack client will request slightly
49     # different entries in their credentials dict.
50     if service.lower() in ("nova", "cinder"):
51         password = "api_key"
52         tenant = "project_id"
53     else:
54         password = "password"
55         tenant = "tenant_name"
56
57     # The most common way to pass these info to the script is to do it through
58     # environment variables.
59     creds.update({
60         "username": os.environ.get('OS_USERNAME', "admin"),                                # add your cloud username details
61         password: os.environ.get("OS_PASSWORD", 'admin'),                                # add password
62         "auth_url": os.environ.get("OS_AUTH_URL","http://192.168.20.71:5000/v2.0"),        # Auth URL
63         tenant: os.environ.get("OS_TENANT_NAME", "admin"),
64     })
65
66     return creds
67
68
69 ################# NOVA #################
70 def get_instance_status(nova_client,instance):
71     try:
72         instance = nova_client.servers.get(instance.id)
73         return instance.status
74     except:
75         return None
76
77
78 def get_instance_by_name(nova_client, instance_name):
79     try:
80         instance = nova_client.servers.find(name=instance_name)
81         return instance
82     except:
83         return None
84
85
86 def get_flavor_id(nova_client, flavor_name):
87     flavors = nova_client.flavors.list(detailed=True)
88     id = ''
89     for f in flavors:
90         if f.name == flavor_name:
91             id = f.id
92             break
93     return id
94
95
96 def get_flavor_id_by_ram_range(nova_client, min_ram, max_ram):
97     flavors = nova_client.flavors.list(detailed=True)
98     id = ''
99     for f in flavors:
100         if min_ram <= f.ram and f.ram <= max_ram:
101             id = f.id
102             break
103     return id
104
105
106 ################# NEUTRON #################
107 def create_neutron_net(neutron_client, name):
108     json_body = {'network': {'name': name,
109                     'admin_state_up': True}}
110     try:
111         network = neutron_client.create_network(body=json_body)
112         network_dict = network['network']
113         return network_dict['id']
114     except:
115         print "Error:", sys.exc_info()[0]
116         return False
117
118 def delete_neutron_net(neutron_client, network_id):
119     try:
120         neutron_client.delete_network(network_id)
121         return True
122     except:
123         print "Error:", sys.exc_info()[0]
124         return False
125
126 def create_neutron_subnet(neutron_client, name, cidr, net_id):
127     json_body = {'subnets': [{'name': name, 'cidr': cidr,
128                            'ip_version': 4, 'network_id': net_id}]}
129     try:
130         subnet = neutron_client.create_subnet(body=json_body)
131         return subnet['subnets'][0]['id']
132     except:
133         print "Error:", sys.exc_info()[0]
134         return False
135
136 def delete_neutron_subnet(neutron_client, subnet_id):
137     try:
138         neutron_client.delete_subnet(subnet_id)
139         return True
140     except:
141         print "Error:", sys.exc_info()[0]
142         return False
143
144 def create_neutron_router(neutron_client, name):
145     json_body = {'router': {'name': name, 'admin_state_up': True}}
146     try:
147         router = neutron_client.create_router(json_body)
148         return router['router']['id']
149     except:
150         print "Error:", sys.exc_info()[0]
151         return False
152
153 def delete_neutron_router(neutron_client, router_id):
154     json_body = {'router': {'id': router_id}}
155     try:
156         neutron_client.delete_router(router=router_id)
157         return True
158     except:
159         print "Error:", sys.exc_info()[0]
160         return False
161
162
163 def add_interface_router(neutron_client, router_id, subnet_id):
164     json_body = {"subnet_id": subnet_id}
165     try:
166         neutron_client.add_interface_router(router=router_id, body=json_body)
167         return True
168     except:
169         print "Error:", sys.exc_info()[0]
170         return False
171
172
173 def remove_interface_router(neutron_client, router_id, subnet_id):
174     json_body = {"subnet_id": subnet_id}
175     try:
176         neutron_client.remove_interface_router(router=router_id, body=json_body)
177         return True
178     except:
179         print "Error:", sys.exc_info()[0]
180         return False
181
182 def create_neutron_port(neutron_client, name, network_id, ip):
183     json_body = {'port': {
184     'admin_state_up': True,
185     'name': name,
186     'network_id': network_id,
187     'fixed_ips': [{"ip_address": ip}]
188     }}
189     try:
190         port = neutron_client.create_port(body=json_body)
191         return port['port']['id']
192     except:
193         print "Error:", sys.exc_info()[0]
194         return False
195
196
197 def get_network_id(neutron_client, network_name):
198     networks = neutron_client.list_networks()['networks']
199     id  = ''
200     for n in networks:
201         if n['name'] == network_name:
202             id = n['id']
203             break
204     return id
205
206 def check_neutron_net(neutron_client, net_name):
207     for network in neutron_client.list_networks()['networks']:
208         if network['name'] == net_name :
209             for subnet in network['subnets']:
210                 return True
211     return False
212
213 def get_network_list(neutron_client):
214     network_list = neutron_client.list_networks()['networks']
215     if len(network_list) == 0 :
216         return None
217     else :
218         return network_list
219
220
221 ################# GLANCE #################
222 def get_image_id(glance_client, image_name):
223     images = glance_client.images.list()
224     id = ''
225     for i in images:
226         if i.name == image_name:
227             id = i.id
228             break
229     return id
230
231 def create_glance_image(glance_client, image_name, file_path):
232     try:
233         with open(file_path) as fimage:
234             image = glance_client.images.create(name=image_name, is_public=True, disk_format="qcow2",
235                              container_format="bare", data=fimage)
236         return image.id
237     except:
238         return False
239
240
241 ################# KEYSTONE #################
242 def get_tenant_id(keystone_client, tenant_name):
243     tenants = keystone_client.tenants.list()
244     id = ''
245     for t in tenants:
246         if t.name == tenant_name:
247             id = t.id
248             break
249     return id
250
251 def get_role_id(keystone_client, role_name):
252     roles = keystone_client.roles.list()
253     id = ''
254     for r in roles:
255         if r.name == role_name:
256             id = r.id
257             break
258     return id
259
260 def get_user_id(keystone_client, user_name):
261     users = keystone_client.users.list()
262     id = ''
263     for u in users:
264         if u.name == user_name:
265             id = u.id
266             break
267     return id
268
269 def create_tenant(keystone_client, tenant_name, tenant_description):
270     try:
271         tenant = keystone_client.tenants.create(tenant_name, tenant_description, enabled=True)
272         return tenant.id
273     except:
274         print "Error:", sys.exc_info()[0]
275         return False
276
277 def delete_tenant(keystone_client, tenant_id):
278     try:
279         tenant = keystone_client.tenants.delete(tenant_id)
280         return True
281     except:
282         print "Error:", sys.exc_info()[0]
283         return False
284
285 def add_role_user(keystone_client, user_id, role_id, tenant_id):
286     try:
287         keystone_client.roles.add_user_role(user_id, role_id, tenant_id)
288         return True
289     except:
290         print "Error:", sys.exc_info()[0]
291         return False
292
293
294 ################# UTILS #################
295 def check_internet_connectivity(url='http://www.opnfv.org/'):
296     """
297     Check if there is access to the internet
298     """
299     try:
300         urllib2.urlopen(url, timeout=5)
301         return True
302     except urllib.URLError:
303         return False
304
305
306 def download_url(url, dest_path):
307     """
308     Download a file to a destination path given a URL
309     """
310     name = url.rsplit('/')[-1]
311     dest = dest_path + name
312     try:
313         response = urllib2.urlopen(url)
314     except (urllib2.HTTPError, urllib2.URLError):
315         return False
316
317     with open(dest, 'wb') as f:
318         f.write(response.read())
319     return True
320
321
322 def execute_command(cmd, logger=None):
323     """
324     Execute Linux command
325     """
326     if logger:
327         logger.debug('Executing command : {}'.format(cmd))
328     output_file = "output.txt"
329     f = open(output_file, 'w+')
330     p = subprocess.call(cmd,shell=True, stdout=f, stderr=subprocess.STDOUT)
331     f.close()
332     f = open(output_file, 'r')
333     result = f.read()
334     if result != "" and logger:
335         logger.debug(result)
336     if p == 0 :
337         return True
338     else:
339         if logger:
340             logger.error("Error when executing command %s" %cmd)
341         exit(-1)