Merge "Put ToC after heading"
[functest.git] / testcases / functest_utils.py
1 #!/usr/bin/env python
2 #
3 # jose.lausuch@ericsson.com
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9
10 import re, json, os, urllib2, shutil, subprocess, sys
11
12
13 def check_credentials():
14     """
15     Check if the OpenStack credentials (openrc) are sourced
16     """
17     #TODO: there must be a short way to do this, doing if os.environ["something"] == "" throws an error
18     try:
19        os.environ['OS_AUTH_URL']
20     except KeyError:
21         return False
22     try:
23        os.environ['OS_USERNAME']
24     except KeyError:
25         return False
26     try:
27        os.environ['OS_PASSWORD']
28     except KeyError:
29         return False
30     try:
31        os.environ['OS_TENANT_NAME']
32     except KeyError:
33         return False
34     try:
35        os.environ['OS_REGION_NAME']
36     except KeyError:
37         return False
38     return True
39
40
41 def get_credentials(service):
42     """Returns a creds dictionary filled with the following keys:
43     * username
44     * password/api_key (depending on the service)
45     * tenant_name/project_id (depending on the service)
46     * auth_url
47     :param service: a string indicating the name of the service
48                     requesting the credentials.
49     """
50     #TODO: get credentials from the openrc file
51     creds = {}
52     # Unfortunately, each of the OpenStack client will request slightly
53     # different entries in their credentials dict.
54     if service.lower() in ("nova", "cinder"):
55         password = "api_key"
56         tenant = "project_id"
57     else:
58         password = "password"
59         tenant = "tenant_name"
60
61     # The most common way to pass these info to the script is to do it through
62     # environment variables.
63     creds.update({
64         "username": os.environ.get('OS_USERNAME', "admin"),                                # add your cloud username details
65         password: os.environ.get("OS_PASSWORD", 'admin'),                                # add password
66         "auth_url": os.environ.get("OS_AUTH_URL","http://192.168.20.71:5000/v2.0"),        # Auth URL
67         tenant: os.environ.get("OS_TENANT_NAME", "admin"),
68     })
69
70     return creds
71
72
73
74 def get_instance_status(nova_client,instance):
75     try:
76         instance = nova_client.servers.get(instance.id)
77         return instance.status
78     except:
79         return None
80
81
82 def get_instance_by_name(nova_client, instance_name):
83     try:
84         instance = nova_client.servers.find(name=instance_name)
85         return instance
86     except:
87         return None
88
89
90
91 def create_neutron_net(neutron_client, name):
92     json_body = {'network': {'name': name,
93                     'admin_state_up': True}}
94     try:
95         network = neutron_client.create_network(body=json_body)
96         network_dict = network['network']
97         return network_dict['id']
98     except:
99         print "Error:", sys.exc_info()[0]
100         return False
101
102 def delete_neutron_net(neutron_client, network_id):
103     try:
104         neutron_client.delete_network(network_id)
105         return True
106     except:
107         print "Error:", sys.exc_info()[0]
108         return False
109
110 def create_neutron_subnet(neutron_client, name, cidr, net_id):
111     json_body = {'subnets': [{'name': name, 'cidr': cidr,
112                            'ip_version': 4, 'network_id': net_id}]}
113     try:
114         subnet = neutron_client.create_subnet(body=json_body)
115         return subnet['subnets'][0]['id']
116     except:
117         print "Error:", sys.exc_info()[0]
118         return False
119
120 def delete_neutron_subnet(neutron_client, subnet_id):
121     try:
122         neutron_client.delete_subnet(subnet_id)
123         return True
124     except:
125         print "Error:", sys.exc_info()[0]
126         return False
127
128 def create_neutron_router(neutron_client, name):
129     json_body = {'router': {'name': name, 'admin_state_up': True}}
130     try:
131         router = neutron_client.create_router(json_body)
132         return router['router']['id']
133     except:
134         print "Error:", sys.exc_info()[0]
135         return False
136
137 def delete_neutron_router(neutron_client, router_id):
138     json_body = {'router': {'id': router_id}}
139     try:
140         neutron_client.delete_router(router=router_id)
141         return True
142     except:
143         print "Error:", sys.exc_info()[0]
144         return False
145
146
147 def add_interface_router(neutron_client, router_id, subnet_id):
148     json_body = {"subnet_id": subnet_id}
149     try:
150         neutron_client.add_interface_router(router=router_id, body=json_body)
151         return True
152     except:
153         print "Error:", sys.exc_info()[0]
154         return False
155
156
157 def remove_interface_router(neutron_client, router_id, subnet_id):
158     json_body = {"subnet_id": subnet_id}
159     try:
160         neutron_client.remove_interface_router(router=router_id, body=json_body)
161         return True
162     except:
163         print "Error:", sys.exc_info()[0]
164         return False
165
166 def create_neutron_port(neutron_client, name, network_id, ip):
167     json_body = {'port': {
168     'admin_state_up': True,
169     'name': name,
170     'network_id': network_id,
171     'fixed_ips': [{"ip_address": ip}]
172     }}
173     try:
174         port = neutron_client.create_port(body=json_body)
175         return port['port']['id']
176     except:
177         print "Error:", sys.exc_info()[0]
178         return False
179
180
181 def get_network_id(neutron_client, network_name):
182     networks = neutron_client.list_networks()['networks']
183     id  = ''
184     for n in networks:
185         if n['name'] == network_name:
186             id = n['id']
187             break
188     return id
189
190 def check_neutron_net(neutron_client, net_name):
191     for network in neutron_client.list_networks()['networks']:
192         if network['name'] == net_name :
193             for subnet in network['subnets']:
194                 return True
195     return False
196
197
198
199 def check_internet_connectivity(url='http://www.google.com/'):
200     """
201     Check if there is access to the internet
202     """
203     try:
204         urllib2.urlopen(url, timeout=5)
205         return True
206     except urllib.request.URLError:
207         return False
208
209
210 def download_url(url, dest_path):
211     """
212     Download a file to a destination path given a URL
213     """
214     name = url.rsplit('/')[-1]
215     dest = dest_path + name
216     try:
217         response = urllib2.urlopen(url)
218     except (urllib2.HTTPError, urllib2.URLError):
219         return False
220
221     with open(dest, 'wb') as f:
222         f.write(response.read())
223     return True
224
225
226 def execute_command(cmd, logger=None):
227     """
228     Execute Linux command
229     """
230     if logger:
231         logger.debug('Executing command : {}'.format(cmd))
232     output_file = "output.txt"
233     f = open(output_file, 'w+')
234     p = subprocess.call(cmd,shell=True, stdout=f, stderr=subprocess.STDOUT)
235     f.close()
236     f = open(output_file, 'r')
237     result = f.read()
238     if result != "" and logger:
239         logger.debug(result)
240     if p == 0 :
241         return True
242     else:
243         if logger:
244             logger.error("Error when executing command %s" %cmd)
245         exit(-1)