Remove OS_REGION_NAME from OpenStack credentials check
[functest.git] / testcases / functest_utils.py
1 #!/usr/bin/env python
2 #
3 # jose.lausuch@ericsson.com
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9
10 import re, json, os, urllib2, shutil, subprocess, sys
11
12
13 def check_credentials():
14     """
15     Check if the OpenStack credentials (openrc) are sourced
16     """
17     #TODO: there must be a short way to do this, doing if os.environ["something"] == "" throws an error
18     try:
19        os.environ['OS_AUTH_URL']
20     except KeyError:
21         return False
22     try:
23        os.environ['OS_USERNAME']
24     except KeyError:
25         return False
26     try:
27        os.environ['OS_PASSWORD']
28     except KeyError:
29         return False
30     try:
31        os.environ['OS_TENANT_NAME']
32     except KeyError:
33         return False
34     return True
35
36
37 def get_credentials(service):
38     """Returns a creds dictionary filled with the following keys:
39     * username
40     * password/api_key (depending on the service)
41     * tenant_name/project_id (depending on the service)
42     * auth_url
43     :param service: a string indicating the name of the service
44                     requesting the credentials.
45     """
46     #TODO: get credentials from the openrc file
47     creds = {}
48     # Unfortunately, each of the OpenStack client will request slightly
49     # different entries in their credentials dict.
50     if service.lower() in ("nova", "cinder"):
51         password = "api_key"
52         tenant = "project_id"
53     else:
54         password = "password"
55         tenant = "tenant_name"
56
57     # The most common way to pass these info to the script is to do it through
58     # environment variables.
59     creds.update({
60         "username": os.environ.get('OS_USERNAME', "admin"),                                # add your cloud username details
61         password: os.environ.get("OS_PASSWORD", 'admin'),                                # add password
62         "auth_url": os.environ.get("OS_AUTH_URL","http://192.168.20.71:5000/v2.0"),        # Auth URL
63         tenant: os.environ.get("OS_TENANT_NAME", "admin"),
64     })
65
66     return creds
67
68
69
70 def get_instance_status(nova_client,instance):
71     try:
72         instance = nova_client.servers.get(instance.id)
73         return instance.status
74     except:
75         return None
76
77
78 def get_instance_by_name(nova_client, instance_name):
79     try:
80         instance = nova_client.servers.find(name=instance_name)
81         return instance
82     except:
83         return None
84
85
86
87 def create_neutron_net(neutron_client, name):
88     json_body = {'network': {'name': name,
89                     'admin_state_up': True}}
90     try:
91         network = neutron_client.create_network(body=json_body)
92         network_dict = network['network']
93         return network_dict['id']
94     except:
95         print "Error:", sys.exc_info()[0]
96         return False
97
98 def delete_neutron_net(neutron_client, network_id):
99     try:
100         neutron_client.delete_network(network_id)
101         return True
102     except:
103         print "Error:", sys.exc_info()[0]
104         return False
105
106 def create_neutron_subnet(neutron_client, name, cidr, net_id):
107     json_body = {'subnets': [{'name': name, 'cidr': cidr,
108                            'ip_version': 4, 'network_id': net_id}]}
109     try:
110         subnet = neutron_client.create_subnet(body=json_body)
111         return subnet['subnets'][0]['id']
112     except:
113         print "Error:", sys.exc_info()[0]
114         return False
115
116 def delete_neutron_subnet(neutron_client, subnet_id):
117     try:
118         neutron_client.delete_subnet(subnet_id)
119         return True
120     except:
121         print "Error:", sys.exc_info()[0]
122         return False
123
124 def create_neutron_router(neutron_client, name):
125     json_body = {'router': {'name': name, 'admin_state_up': True}}
126     try:
127         router = neutron_client.create_router(json_body)
128         return router['router']['id']
129     except:
130         print "Error:", sys.exc_info()[0]
131         return False
132
133 def delete_neutron_router(neutron_client, router_id):
134     json_body = {'router': {'id': router_id}}
135     try:
136         neutron_client.delete_router(router=router_id)
137         return True
138     except:
139         print "Error:", sys.exc_info()[0]
140         return False
141
142
143 def add_interface_router(neutron_client, router_id, subnet_id):
144     json_body = {"subnet_id": subnet_id}
145     try:
146         neutron_client.add_interface_router(router=router_id, body=json_body)
147         return True
148     except:
149         print "Error:", sys.exc_info()[0]
150         return False
151
152
153 def remove_interface_router(neutron_client, router_id, subnet_id):
154     json_body = {"subnet_id": subnet_id}
155     try:
156         neutron_client.remove_interface_router(router=router_id, body=json_body)
157         return True
158     except:
159         print "Error:", sys.exc_info()[0]
160         return False
161
162 def create_neutron_port(neutron_client, name, network_id, ip):
163     json_body = {'port': {
164     'admin_state_up': True,
165     'name': name,
166     'network_id': network_id,
167     'fixed_ips': [{"ip_address": ip}]
168     }}
169     try:
170         port = neutron_client.create_port(body=json_body)
171         return port['port']['id']
172     except:
173         print "Error:", sys.exc_info()[0]
174         return False
175
176
177 def get_network_id(neutron_client, network_name):
178     networks = neutron_client.list_networks()['networks']
179     id  = ''
180     for n in networks:
181         if n['name'] == network_name:
182             id = n['id']
183             break
184     return id
185
186 def check_neutron_net(neutron_client, net_name):
187     for network in neutron_client.list_networks()['networks']:
188         if network['name'] == net_name :
189             for subnet in network['subnets']:
190                 return True
191     return False
192
193
194
195 def check_internet_connectivity(url='http://www.opnfv.org/'):
196     """
197     Check if there is access to the internet
198     """
199     try:
200         urllib2.urlopen(url, timeout=5)
201         return True
202     except urllib.URLError:
203         return False
204
205
206 def download_url(url, dest_path):
207     """
208     Download a file to a destination path given a URL
209     """
210     name = url.rsplit('/')[-1]
211     dest = dest_path + name
212     try:
213         response = urllib2.urlopen(url)
214     except (urllib2.HTTPError, urllib2.URLError):
215         return False
216
217     with open(dest, 'wb') as f:
218         f.write(response.read())
219     return True
220
221
222 def execute_command(cmd, logger=None):
223     """
224     Execute Linux command
225     """
226     if logger:
227         logger.debug('Executing command : {}'.format(cmd))
228     output_file = "output.txt"
229     f = open(output_file, 'w+')
230     p = subprocess.call(cmd,shell=True, stdout=f, stderr=subprocess.STDOUT)
231     f.close()
232     f = open(output_file, 'r')
233     result = f.read()
234     if result != "" and logger:
235         logger.debug(result)
236     if p == 0 :
237         return True
238     else:
239         if logger:
240             logger.error("Error when executing command %s" %cmd)
241         exit(-1)