Merge "creation of scripts to generate Dashboard compatible json result files"
[functest.git] / testcases / functest_utils.py
1 #!/usr/bin/env python
2 #
3 # jose.lausuch@ericsson.com
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9
10 import re, json, os, urllib2, shutil, subprocess, sys
11
12
13 def check_credentials():
14     """
15     Check if the OpenStack credentials (openrc) are sourced
16     """
17     #TODO: there must be a short way to do this, doing if os.environ["something"] == "" throws an error
18     try:
19        os.environ['OS_AUTH_URL']
20     except KeyError:
21         return False
22     try:
23        os.environ['OS_USERNAME']
24     except KeyError:
25         return False
26     try:
27        os.environ['OS_PASSWORD']
28     except KeyError:
29         return False
30     try:
31        os.environ['OS_TENANT_NAME']
32     except KeyError:
33         return False
34     return True
35
36
37 def get_credentials(service):
38     """Returns a creds dictionary filled with the following keys:
39     * username
40     * password/api_key (depending on the service)
41     * tenant_name/project_id (depending on the service)
42     * auth_url
43     :param service: a string indicating the name of the service
44                     requesting the credentials.
45     """
46     #TODO: get credentials from the openrc file
47     creds = {}
48     # Unfortunately, each of the OpenStack client will request slightly
49     # different entries in their credentials dict.
50     if service.lower() in ("nova", "cinder"):
51         password = "api_key"
52         tenant = "project_id"
53     else:
54         password = "password"
55         tenant = "tenant_name"
56
57     # The most common way to pass these info to the script is to do it through
58     # environment variables.
59     creds.update({
60         "username": os.environ.get('OS_USERNAME', "admin"),                                # add your cloud username details
61         password: os.environ.get("OS_PASSWORD", 'admin'),                                # add password
62         "auth_url": os.environ.get("OS_AUTH_URL","http://192.168.20.71:5000/v2.0"),        # Auth URL
63         tenant: os.environ.get("OS_TENANT_NAME", "admin"),
64     })
65
66     return creds
67
68
69
70 def get_instance_status(nova_client,instance):
71     try:
72         instance = nova_client.servers.get(instance.id)
73         return instance.status
74     except:
75         return None
76
77
78 def get_instance_by_name(nova_client, instance_name):
79     try:
80         instance = nova_client.servers.find(name=instance_name)
81         return instance
82     except:
83         return None
84
85
86
87 def create_neutron_net(neutron_client, name):
88     json_body = {'network': {'name': name,
89                     'admin_state_up': True}}
90     try:
91         network = neutron_client.create_network(body=json_body)
92         network_dict = network['network']
93         return network_dict['id']
94     except:
95         print "Error:", sys.exc_info()[0]
96         return False
97
98 def delete_neutron_net(neutron_client, network_id):
99     try:
100         neutron_client.delete_network(network_id)
101         return True
102     except:
103         print "Error:", sys.exc_info()[0]
104         return False
105
106 def create_neutron_subnet(neutron_client, name, cidr, net_id):
107     json_body = {'subnets': [{'name': name, 'cidr': cidr,
108                            'ip_version': 4, 'network_id': net_id}]}
109     try:
110         subnet = neutron_client.create_subnet(body=json_body)
111         return subnet['subnets'][0]['id']
112     except:
113         print "Error:", sys.exc_info()[0]
114         return False
115
116 def delete_neutron_subnet(neutron_client, subnet_id):
117     try:
118         neutron_client.delete_subnet(subnet_id)
119         return True
120     except:
121         print "Error:", sys.exc_info()[0]
122         return False
123
124 def create_neutron_router(neutron_client, name):
125     json_body = {'router': {'name': name, 'admin_state_up': True}}
126     try:
127         router = neutron_client.create_router(json_body)
128         return router['router']['id']
129     except:
130         print "Error:", sys.exc_info()[0]
131         return False
132
133 def delete_neutron_router(neutron_client, router_id):
134     json_body = {'router': {'id': router_id}}
135     try:
136         neutron_client.delete_router(router=router_id)
137         return True
138     except:
139         print "Error:", sys.exc_info()[0]
140         return False
141
142
143 def add_interface_router(neutron_client, router_id, subnet_id):
144     json_body = {"subnet_id": subnet_id}
145     try:
146         neutron_client.add_interface_router(router=router_id, body=json_body)
147         return True
148     except:
149         print "Error:", sys.exc_info()[0]
150         return False
151
152
153 def remove_interface_router(neutron_client, router_id, subnet_id):
154     json_body = {"subnet_id": subnet_id}
155     try:
156         neutron_client.remove_interface_router(router=router_id, body=json_body)
157         return True
158     except:
159         print "Error:", sys.exc_info()[0]
160         return False
161
162 def create_neutron_port(neutron_client, name, network_id, ip):
163     json_body = {'port': {
164     'admin_state_up': True,
165     'name': name,
166     'network_id': network_id,
167     'fixed_ips': [{"ip_address": ip}]
168     }}
169     try:
170         port = neutron_client.create_port(body=json_body)
171         return port['port']['id']
172     except:
173         print "Error:", sys.exc_info()[0]
174         return False
175
176
177 def get_network_id(neutron_client, network_name):
178     networks = neutron_client.list_networks()['networks']
179     id  = ''
180     for n in networks:
181         if n['name'] == network_name:
182             id = n['id']
183             break
184     return id
185
186 def check_neutron_net(neutron_client, net_name):
187     for network in neutron_client.list_networks()['networks']:
188         if network['name'] == net_name :
189             for subnet in network['subnets']:
190                 return True
191     return False
192
193 def get_image_id(glance_client, image_name):
194     images = glance_client.images.list()
195     id = ''
196     for i in images:
197         if i.name == image_name:
198             id = i.id
199             break
200     return id
201
202 def create_glance_image(glance_client, image_name, file_path):
203     try:
204         with open(file_path) as fimage:
205             image = glance_client.images.create(name=image_name, is_public=True, disk_format="qcow2",
206                              container_format="bare", data=fimage)
207         return image.id
208     except:
209         return False
210
211 def get_flavor_id(nova_client, flavor_name):
212     flavors = nova_client.flavors.list(detailed=True)
213     id = ''
214     for f in flavors:
215         if f.name == flavor_name:
216             id = f.id
217             break
218     return id
219
220 def get_flavor_id_by_ram_range(nova_client, min_ram, max_ram):
221     flavors = nova_client.flavors.list(detailed=True)
222     id = ''
223     for f in flavors:
224         if min_ram <= f.ram and f.ram <= max_ram:
225             id = f.id
226             break
227     return id
228
229
230 def get_tenant_id(keystone_client, tenant_name):
231     tenants = keystone_client.tenants.list()
232     id = ''
233     for t in tenants:
234         if t.name == tenant_name:
235             id = t.id
236             break
237     return id
238
239 def get_role_id(keystone_client, role_name):
240     roles = keystone_client.roles.list()
241     id = ''
242     for r in roles:
243         if r.name == role_name:
244             id = r.id
245             break
246     return id
247
248 def get_user_id(keystone_client, user_name):
249     users = keystone_client.users.list()
250     id = ''
251     for u in users:
252         if u.name == user_name:
253             id = u.id
254             break
255     return id
256
257 def create_tenant(keystone_client, tenant_name, tenant_description):
258     try:
259         tenant = keystone_client.tenants.create(tenant_name, tenant_description, enabled=True)
260         return tenant.id
261     except:
262         print "Error:", sys.exc_info()[0]
263         return False
264
265 def delete_tenant(keystone_client, tenant_id):
266     try:
267         tenant = keystone_client.tenants.delete(tenant_id)
268         return True
269     except:
270         print "Error:", sys.exc_info()[0]
271         return False
272
273 def add_role_user(keystone_client, user_id, role_id, tenant_id):
274     try:
275         keystone_client.roles.add_user_role(user_id, role_id, tenant_id)
276         return True
277     except:
278         print "Error:", sys.exc_info()[0]
279         return False
280
281
282 def check_internet_connectivity(url='http://www.opnfv.org/'):
283     """
284     Check if there is access to the internet
285     """
286     try:
287         urllib2.urlopen(url, timeout=5)
288         return True
289     except urllib.URLError:
290         return False
291
292
293 def download_url(url, dest_path):
294     """
295     Download a file to a destination path given a URL
296     """
297     name = url.rsplit('/')[-1]
298     dest = dest_path + name
299     try:
300         response = urllib2.urlopen(url)
301     except (urllib2.HTTPError, urllib2.URLError):
302         return False
303
304     with open(dest, 'wb') as f:
305         f.write(response.read())
306     return True
307
308
309 def execute_command(cmd, logger=None):
310     """
311     Execute Linux command
312     """
313     if logger:
314         logger.debug('Executing command : {}'.format(cmd))
315     output_file = "output.txt"
316     f = open(output_file, 'w+')
317     p = subprocess.call(cmd,shell=True, stdout=f, stderr=subprocess.STDOUT)
318     f.close()
319     f = open(output_file, 'r')
320     result = f.read()
321     if result != "" and logger:
322         logger.debug(result)
323     if p == 0 :
324         return True
325     else:
326         if logger:
327             logger.error("Error when executing command %s" %cmd)
328         exit(-1)