Merge "Improvements - Added: functest_utils.py for common util functions - Moved...
[functest.git] / testcases / functest_utils.py
1 #!/usr/bin/env python
2 #
3 # jose.lausuch@ericsson.com
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9
10 import re, json, os, urllib2, shutil, subprocess, sys
11
12
13 def check_credentials():
14     """
15     Check if the OpenStack credentials (openrc) are sourced
16     """
17     #TODO: there must be a short way to do this, doing if os.environ["something"] == "" throws an error
18     try:
19        os.environ['OS_AUTH_URL']
20     except KeyError:
21         return False
22     try:
23        os.environ['OS_USERNAME']
24     except KeyError:
25         return False
26     try:
27        os.environ['OS_PASSWORD']
28     except KeyError:
29         return False
30     try:
31        os.environ['OS_TENANT_NAME']
32     except KeyError:
33         return False
34     try:
35        os.environ['OS_REGION_NAME']
36     except KeyError:
37         return False
38     return True
39
40
41 def get_credentials(service):
42     """Returns a creds dictionary filled with the following keys:
43     * username
44     * password/api_key (depending on the service)
45     * tenant_name/project_id (depending on the service)
46     * auth_url
47     :param service: a string indicating the name of the service
48                     requesting the credentials.
49     """
50     #TODO: get credentials from the openrc file
51     creds = {}
52     # Unfortunately, each of the OpenStack client will request slightly
53     # different entries in their credentials dict.
54     if service.lower() in ("nova", "cinder"):
55         password = "api_key"
56         tenant = "project_id"
57     else:
58         password = "password"
59         tenant = "tenant_name"
60
61     # The most common way to pass these info to the script is to do it through
62     # environment variables.
63     creds.update({
64         "username": os.environ.get('OS_USERNAME', "admin"),                                # add your cloud username details
65         password: os.environ.get("OS_PASSWORD", 'admin'),                                # add password
66         "auth_url": os.environ.get("OS_AUTH_URL","http://192.168.20.71:5000/v2.0"),        # Auth URL
67         tenant: os.environ.get("OS_TENANT_NAME", "admin"),
68     })
69
70     return creds
71
72
73
74 def get_instance_status(nova_client,instance):
75     try:
76         instance = nova_client.servers.get(instance.id)
77         return instance.status
78     except:
79         return None
80
81
82 def get_instance_by_name(nova_client, instance_name):
83     try:
84         instance = nova_client.servers.find(name=instance_name)
85         return instance
86     except:
87         return None
88
89
90
91 def create_neutron_net(neutron_client, name):
92     json_body = {'network': {'name': name,
93                     'admin_state_up': True}}
94     try:
95         network = neutron_client.create_network(body=json_body)
96         network_dict = network['network']
97         return network_dict['id']
98     except:
99         print "Error:", sys.exc_info()[0]
100         return False
101
102 def delete_neutron_net(neutron_client, network_id):
103     try:
104         neutron_client.delete_network(network_id)
105         return True
106     except:
107         print "Error:", sys.exc_info()[0]
108         return False
109
110 def create_neutron_subnet(neutron_client, name, cidr, net_id):
111     json_body = {'subnets': [{'name': name, 'cidr': cidr,
112                            'ip_version': 4, 'network_id': net_id}]}
113     try:
114         subnet = neutron_client.create_subnet(body=json_body)
115         return subnet['subnets'][0]['id']
116     except:
117         print "Error:", sys.exc_info()[0]
118         return False
119
120 def delete_neutron_subnet(neutron_client, subnet_id):
121     try:
122         neutron_client.delete_subnet(subnet_id)
123         return True
124     except:
125         print "Error:", sys.exc_info()[0]
126         return False
127
128 def create_neutron_router(neutron_client, name):
129     json_body = {'router': {'name': name, 'admin_state_up': True}}
130     try:
131         router = neutron_client.create_router(json_body)
132         return router['router']['id']
133     except:
134         print "Error:", sys.exc_info()[0]
135         return False
136
137 def delete_neutron_router(neutron_client, router_id):
138     json_body = {'router': {'id': router_id}}
139     try:
140         neutron_client.delete_router(router=router_id)
141         return True
142     except:
143         print "Error:", sys.exc_info()[0]
144         return False
145
146
147 def add_interface_router(neutron_client, router_id, subnet_id):
148     json_body = {"subnet_id": subnet_id}
149     try:
150         neutron_client.add_interface_router(router=router_id, body=json_body)
151         return True
152     except:
153         print "Error:", sys.exc_info()[0]
154         return False
155
156
157 def remove_interface_router(neutron_client, router_id, subnet_id):
158     json_body = {"subnet_id": subnet_id}
159     try:
160         neutron_client.remove_interface_router(router=router_id, body=json_body)
161         return True
162     except:
163         print "Error:", sys.exc_info()[0]
164         return False
165
166
167 def get_network_id(neutron_client, network_name):
168     networks = neutron_client.list_networks()['networks']
169     id  = ''
170     for n in networks:
171         if n['name'] == network_name:
172             id = n['id']
173             break
174     return id
175
176 def check_neutron_net(neutron_client, net_name):
177     for network in neutron_client.list_networks()['networks']:
178         if network['name'] == net_name :
179             for subnet in network['subnets']:
180                 return True
181     return False
182
183
184
185 def check_internet_connectivity(url='http://www.google.com/'):
186     """
187     Check if there is access to the internet
188     """
189     try:
190         urllib2.urlopen(url, timeout=5)
191         return True
192     except urllib.request.URLError:
193         return False
194
195
196 def download_url(url, dest_path):
197     """
198     Download a file to a destination path given a URL
199     """
200     name = url.rsplit('/')[-1]
201     dest = dest_path + name
202     try:
203         response = urllib2.urlopen(url)
204     except (urllib2.HTTPError, urllib2.URLError):
205         return False
206
207     with open(dest, 'wb') as f:
208         f.write(response.read())
209     return True
210
211
212 def execute_command(cmd, logger=None):
213     """
214     Execute Linux command
215     """
216     if logger:
217         logger.debug('Executing command : {}'.format(cmd))
218     output_file = "output.txt"
219     f = open(output_file, 'w+')
220     p = subprocess.call(cmd,shell=True, stdout=f, stderr=subprocess.STDOUT)
221     f.close()
222     f = open(output_file, 'r')
223     result = f.read()
224     if result != "" and logger:
225         logger.debug(result)
226     if p == 0 :
227         return True
228     else:
229         if loger:
230             logger.error("Error when executing command %s" %cmd)
231         exit(-1)