Add missing directories env variables in common.sh
[functest.git] / testcases / functest_utils.py
1 #!/usr/bin/env python
2 #
3 # jose.lausuch@ericsson.com
4 # valentin.boucher@orange.com
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10
11
12 import os
13 import os.path
14 import urllib2
15 import subprocess
16 import sys
17 import requests
18 import json
19 import shutil
20 import re
21 import yaml
22 import socket
23 from git import Repo
24
25
26 # ############ CREDENTIALS OPENSTACK #############
27 def check_credentials():
28     """
29     Check if the OpenStack credentials (openrc) are sourced
30     """
31     env_vars = ['OS_AUTH_URL', 'OS_USERNAME', 'OS_PASSWORD', 'OS_TENANT_NAME']
32     return all(map(lambda v: v in os.environ and os.environ[v], env_vars))
33
34
35 def get_credentials(service):
36     """Returns a creds dictionary filled with the following keys:
37     * username
38     * password/api_key (depending on the service)
39     * tenant_name/project_id (depending on the service)
40     * auth_url
41     :param service: a string indicating the name of the service
42                     requesting the credentials.
43     """
44     creds = {}
45     # Unfortunately, each of the OpenStack client will request slightly
46     # different entries in their credentials dict.
47     if service.lower() in ("nova", "cinder"):
48         password = "api_key"
49         tenant = "project_id"
50     else:
51         password = "password"
52         tenant = "tenant_name"
53
54     # The most common way to pass these info to the script is to do it through
55     # environment variables.
56     creds.update({
57         "username": os.environ.get('OS_USERNAME', "admin"),
58         password: os.environ.get("OS_PASSWORD", 'admin'),
59         "auth_url": os.environ.get("OS_AUTH_URL",
60                                    "http://192.168.20.71:5000/v2.0"),
61         tenant: os.environ.get("OS_TENANT_NAME", "admin"),
62     })
63
64     return creds
65
66
67 # ################ NOVA #################
68 def get_instances(nova_client):
69     try:
70         instances = nova_client.servers.list(search_opts={'all_tenants': 1})
71         return instances
72     except:
73         return None
74
75
76 def get_instance_status(nova_client, instance):
77     try:
78         instance = nova_client.servers.get(instance.id)
79         return instance.status
80     except:
81         return None
82
83
84 def get_instance_by_name(nova_client, instance_name):
85     try:
86         instance = nova_client.servers.find(name=instance_name)
87         return instance
88     except:
89         return None
90
91 def create_flavor(nova_client, flavor_name, ram, disk, vcpus):
92     try:
93         flavor = nova_client.flavors.create(flavor_name,ram,vcpus,disk)
94     except:
95         return None
96     return flavor.id
97
98 def get_flavor_id(nova_client, flavor_name):
99     flavors = nova_client.flavors.list(detailed=True)
100     id = ''
101     for f in flavors:
102         if f.name == flavor_name:
103             id = f.id
104             break
105     return id
106
107
108 def get_flavor_id_by_ram_range(nova_client, min_ram, max_ram):
109     flavors = nova_client.flavors.list(detailed=True)
110     id = ''
111     for f in flavors:
112         if min_ram <= f.ram and f.ram <= max_ram:
113             id = f.id
114             break
115     return id
116
117
118 def delete_instance(nova_client, instance_id):
119     try:
120         nova_client.servers.force_delete(instance_id)
121         return True
122     except:
123         print "Error:", sys.exc_info()[0]
124         return False
125
126 def create_floating_ip(neutron_client):
127     extnet_id = get_external_net_id(neutron_client)
128     props = {'floating_network_id': extnet_id}
129     try:
130         ip_json = neutron_client.create_floatingip({'floatingip': props})
131         floating_ip = ip_json['floatingip']['floating_ip_address']
132     except:
133         return None
134     return floating_ip
135
136 def get_floating_ips(nova_client):
137     try:
138         floating_ips = nova_client.floating_ips.list()
139         return floating_ips
140     except:
141         return None
142
143 def add_floating_ip(nova_client, server_id, floatingip_id):
144     try:
145         nova_client.servers.add_floating_ip(server_id,floatingip_id)
146         return True
147     except:
148         print "Error:", sys.exc_info()[0]
149         return None
150
151 def delete_floating_ip(nova_client, floatingip_id):
152     try:
153         nova_client.floating_ips.delete(floatingip_id)
154         return True
155     except:
156         print "Error:", sys.exc_info()[0]
157         return None
158
159
160 # ################ NEUTRON #################
161 def create_neutron_net(neutron_client, name):
162     json_body = {'network': {'name': name,
163                              'admin_state_up': True}}
164     try:
165         network = neutron_client.create_network(body=json_body)
166         network_dict = network['network']
167         return network_dict['id']
168     except:
169         print "Error:", sys.exc_info()[0]
170         return False
171
172
173 def update_neutron_net(neutron_client, network_id, shared=False):
174     json_body = {'network': {'shared': shared}}
175     try:
176         neutron_client.update_network(network_id, body=json_body)
177         return True
178     except:
179         print "Error:", sys.exc_info()[0]
180         return False
181
182
183 def delete_neutron_net(neutron_client, network_id):
184     try:
185         neutron_client.delete_network(network_id)
186         return True
187     except:
188         print "Error:", sys.exc_info()[0]
189         return False
190
191
192 def create_neutron_subnet(neutron_client, name, cidr, net_id):
193     json_body = {'subnets': [{'name': name, 'cidr': cidr,
194                               'ip_version': 4, 'network_id': net_id}]}
195     try:
196         subnet = neutron_client.create_subnet(body=json_body)
197         return subnet['subnets'][0]['id']
198     except:
199         print "Error:", sys.exc_info()[0]
200         return False
201
202
203 def delete_neutron_subnet(neutron_client, subnet_id):
204     try:
205         neutron_client.delete_subnet(subnet_id)
206         return True
207     except:
208         print "Error:", sys.exc_info()[0]
209         return False
210
211
212 def create_neutron_router(neutron_client, name):
213     json_body = {'router': {'name': name, 'admin_state_up': True}}
214     try:
215         router = neutron_client.create_router(json_body)
216         return router['router']['id']
217     except:
218         print "Error:", sys.exc_info()[0]
219         return False
220
221
222 def delete_neutron_router(neutron_client, router_id):
223     json_body = {'router': {'id': router_id}}
224     try:
225         neutron_client.delete_router(router=router_id)
226         return True
227     except:
228         print "Error:", sys.exc_info()[0]
229         return False
230
231
232 def add_interface_router(neutron_client, router_id, subnet_id):
233     json_body = {"subnet_id": subnet_id}
234     try:
235         neutron_client.add_interface_router(router=router_id, body=json_body)
236         return True
237     except:
238         print "Error:", sys.exc_info()[0]
239         return False
240
241
242 def remove_interface_router(neutron_client, router_id, subnet_id):
243     json_body = {"subnet_id": subnet_id}
244     try:
245         neutron_client.remove_interface_router(router=router_id,
246                                                body=json_body)
247         return True
248     except:
249         print "Error:", sys.exc_info()[0]
250         return False
251
252 def add_gateway_router(neutron_client, router_id):
253     ext_net_id = get_external_net_id(neutron_client)
254     router_dict = {'network_id': ext_net_id}
255     try:
256         neutron_client.add_gateway_router(router_id,router_dict)
257         return True
258     except:
259         print "Error:", sys.exc_info()[0]
260         return False
261
262 def remove_gateway_router(neutron_client, router_id):
263     try:
264         neutron_client.remove_gateway_router(router_id)
265         return True
266     except:
267         print "Error:", sys.exc_info()[0]
268         return False
269
270
271 def create_neutron_port(neutron_client, name, network_id, ip):
272     json_body = {'port': {
273                  'admin_state_up': True,
274                  'name': name,
275                  'network_id': network_id,
276                  'fixed_ips': [{"ip_address": ip}]
277                  }}
278     try:
279         port = neutron_client.create_port(body=json_body)
280         return port['port']['id']
281     except:
282         print "Error:", sys.exc_info()[0]
283         return False
284
285
286 def update_neutron_port(neutron_client, port_id, device_owner):
287     json_body = {'port': {
288                  'device_owner': device_owner,
289                  }}
290     try:
291         port = neutron_client.update_port(port=port_id,
292                                           body=json_body)
293         return port['port']['id']
294     except:
295         print "Error:", sys.exc_info()[0]
296         return False
297
298
299 def delete_neutron_port(neutron_client, port_id):
300     try:
301         neutron_client.delete_port(port_id)
302         return True
303     except:
304         print "Error:", sys.exc_info()[0]
305         return False
306
307
308 def get_network_id(neutron_client, network_name):
309     networks = neutron_client.list_networks()['networks']
310     id = ''
311     for n in networks:
312         if n['name'] == network_name:
313             id = n['id']
314             break
315     return id
316
317
318 def check_neutron_net(neutron_client, net_name):
319     for network in neutron_client.list_networks()['networks']:
320         if network['name'] == net_name:
321             for subnet in network['subnets']:
322                 return True
323     return False
324
325
326 def get_network_list(neutron_client):
327     network_list = neutron_client.list_networks()['networks']
328     if len(network_list) == 0:
329         return None
330     else:
331         return network_list
332
333
334 def get_router_list(neutron_client):
335     router_list = neutron_client.list_routers()['routers']
336     if len(router_list) == 0:
337         return None
338     else:
339         return router_list
340
341
342 def get_port_list(neutron_client):
343     port_list = neutron_client.list_ports()['ports']
344     if len(port_list) == 0:
345         return None
346     else:
347         return port_list
348
349
350 def get_external_net(neutron_client):
351     for network in neutron_client.list_networks()['networks']:
352         if network['router:external']:
353             return network['name']
354     return False
355
356
357 def get_external_net_id(neutron_client):
358     for network in neutron_client.list_networks()['networks']:
359         if network['router:external']:
360             return network['id']
361     return False
362
363
364 def update_sg_quota(neutron_client, tenant_id, sg_quota, sg_rule_quota):
365     json_body = {"quota": {
366         "security_group": sg_quota,
367         "security_group_rule": sg_rule_quota
368     }}
369
370     try:
371         quota = neutron_client.update_quota(tenant_id=tenant_id,
372                                             body=json_body)
373         return True
374     except:
375         print "Error:", sys.exc_info()[0]
376         return False
377
378
379 def update_cinder_quota(cinder_client, tenant_id, vols_quota,
380                         snapshots_quota, gigabytes_quota):
381     quotas_values = {"volumes": vols_quota,
382                      "snapshots": snapshots_quota,
383                      "gigabytes": gigabytes_quota}
384
385     try:
386         quotas_default = cinder_client.quotas.update(tenant_id,
387                                                       **quotas_values)
388         return True
389     except:
390         print "Error:", sys.exc_info()[0]
391         return False
392
393
394 def get_private_net(neutron_client):
395     # Checks if there is an existing shared private network
396     networks = neutron_client.list_networks()['networks']
397     if len(networks) == 0:
398         return None
399     for net in networks:
400         if (net['router:external'] is False) and (net['shared'] is True):
401             return net
402     return None
403
404
405 # ################ GLANCE #################
406 def get_images(nova_client):
407     try:
408         images = nova_client.images.list()
409         return images
410     except:
411         return None
412
413
414 def get_image_id(glance_client, image_name):
415     images = glance_client.images.list()
416     id = ''
417     for i in images:
418         if i.name == image_name:
419             id = i.id
420             break
421     return id
422
423
424 def create_glance_image(glance_client, image_name, file_path, public=True):
425     if not os.path.isfile(file_path):
426         print "Error: file " + file_path + " does not exist."
427         return False
428     try:
429         with open(file_path) as fimage:
430             image = glance_client.images.create(name=image_name,
431                                                 is_public=public,
432                                                 disk_format="qcow2",
433                                                 container_format="bare",
434                                                 data=fimage)
435         return image.id
436     except:
437         print "Error:", sys.exc_info()[0]
438         return False
439
440
441 def delete_glance_image(nova_client, image_id):
442     try:
443         nova_client.images.delete(image_id)
444         return True
445     except:
446         print "Error:", sys.exc_info()[0]
447         return False
448
449
450 # ################ CINDER #################
451 def get_volumes(cinder_client):
452     try:
453         volumes = cinder_client.volumes.list(search_opts={'all_tenants': 1})
454         return volumes
455     except:
456         return None
457
458
459 def delete_volume(cinder_client, volume_id, forced=False):
460     try:
461         if forced:
462             try:
463                 cinder_client.volumes.detach(volume_id)
464             except:
465                 print "Error:", sys.exc_info()[0]
466             cinder_client.volumes.force_delete(volume_id)
467         else:
468             cinder_client.volumes.delete(volume_id)
469         return True
470     except:
471         print "Error:", sys.exc_info()[0]
472         return False
473
474
475 # ################ CINDER #################
476 def get_security_groups(neutron_client):
477     try:
478         security_groups = neutron_client.list_security_groups()[
479             'security_groups']
480         return security_groups
481     except:
482         return None
483
484
485 def delete_security_group(neutron_client, secgroup_id):
486     try:
487         neutron_client.delete_security_group(secgroup_id)
488         return True
489     except:
490         print "Error:", sys.exc_info()[0]
491         return False
492
493
494 # ################ KEYSTONE #################
495 def get_tenants(keystone_client):
496     try:
497         tenants = keystone_client.tenants.list()
498         return tenants
499     except:
500         return None
501
502
503 def get_tenant_id(keystone_client, tenant_name):
504     tenants = keystone_client.tenants.list()
505     id = ''
506     for t in tenants:
507         if t.name == tenant_name:
508             id = t.id
509             break
510     return id
511
512
513 def get_users(keystone_client):
514     try:
515         users = keystone_client.users.list()
516         return users
517     except:
518         return None
519
520
521 def get_role_id(keystone_client, role_name):
522     roles = keystone_client.roles.list()
523     id = ''
524     for r in roles:
525         if r.name == role_name:
526             id = r.id
527             break
528     return id
529
530
531 def get_user_id(keystone_client, user_name):
532     users = keystone_client.users.list()
533     id = ''
534     for u in users:
535         if u.name == user_name:
536             id = u.id
537             break
538     return id
539
540
541 def create_tenant(keystone_client, tenant_name, tenant_description):
542     try:
543         tenant = keystone_client.tenants.create(tenant_name,
544                                                 tenant_description,
545                                                 enabled=True)
546         return tenant.id
547     except:
548         print "Error:", sys.exc_info()[0]
549         return False
550
551
552 def delete_tenant(keystone_client, tenant_id):
553     try:
554         tenant = keystone_client.tenants.delete(tenant_id)
555         return True
556     except:
557         print "Error:", sys.exc_info()[0]
558         return False
559
560
561 def create_user(keystone_client, user_name, user_password,
562                 user_email, tenant_id):
563     try:
564         user = keystone_client.users.create(user_name, user_password,
565                                             user_email, tenant_id,
566                                             enabled=True)
567         return user.id
568     except:
569         print "Error:", sys.exc_info()[0]
570         return False
571
572
573 def delete_user(keystone_client, user_id):
574     try:
575         tenant = keystone_client.users.delete(user_id)
576         return True
577     except:
578         print "Error:", sys.exc_info()[0]
579         return False
580
581
582 def add_role_user(keystone_client, user_id, role_id, tenant_id):
583     try:
584         keystone_client.roles.add_user_role(user_id, role_id, tenant_id)
585         return True
586     except:
587         print "Error:", sys.exc_info()[0]
588         return False
589
590
591 # ################ UTILS #################
592 def check_internet_connectivity(url='http://www.opnfv.org/'):
593     """
594     Check if there is access to the internet
595     """
596     try:
597         urllib2.urlopen(url, timeout=5)
598         return True
599     except urllib2.URLError:
600         return False
601
602
603 def download_url(url, dest_path):
604     """
605     Download a file to a destination path given a URL
606     """
607     name = url.rsplit('/')[-1]
608     dest = dest_path + "/" + name
609     try:
610         response = urllib2.urlopen(url)
611     except (urllib2.HTTPError, urllib2.URLError):
612         return False
613
614     with open(dest, 'wb') as f:
615         shutil.copyfileobj(response, f)
616     return True
617
618
619 def execute_command(cmd, logger=None, exit_on_error=True):
620     """
621     Execute Linux command
622     """
623     if logger:
624         logger.debug('Executing command : {}'.format(cmd))
625     output_file = "output.txt"
626     f = open(output_file, 'w+')
627     p = subprocess.call(cmd, shell=True, stdout=f, stderr=subprocess.STDOUT)
628     f.close()
629     f = open(output_file, 'r')
630     result = f.read()
631     if result != "" and logger:
632         logger.debug(result)
633     if p == 0:
634         return True
635     else:
636         if logger:
637             logger.error("Error when executing command %s" % cmd)
638         if exit_on_error:
639             exit(-1)
640         return False
641
642
643 def get_git_branch(repo_path):
644     """
645     Get git branch name
646     """
647     repo = Repo(repo_path)
648     branch = repo.active_branch
649     return branch.name
650
651
652 def get_installer_type(logger=None):
653     """
654     Get installer type (fuel, apex, joid, compass)
655     """
656     try:
657         installer = os.environ['INSTALLER_TYPE']
658     except KeyError:
659         if logger:
660             logger.error("Impossible to retrieve the installer type")
661         installer = "Unknown_installer"
662
663     return installer
664
665
666 def get_scenario(logger=None):
667     """
668     Get scenario
669     """
670     try:
671         scenario = os.environ['DEPLOY_SCENARIO']
672     except KeyError:
673         if logger:
674             logger.error("Impossible to retrieve the scenario")
675         scenario = "Unknown_scenario"
676
677     return scenario
678
679
680 def get_pod_name(logger=None):
681     """
682     Get PoD Name from env variable NODE_NAME
683     """
684     try:
685         return os.environ['NODE_NAME']
686     except KeyError:
687         if logger:
688             logger.error(
689                 "Unable to retrieve the POD name from environment.Using pod name 'unknown-pod'")
690         return "unknown-pod"
691
692
693 def push_results_to_db(db_url, case_name, logger, pod_name,
694                        version, payload):
695     url = db_url + "/results"
696     installer = get_installer_type(logger)
697     params = {"project_name": "functest", "case_name": case_name,
698               "pod_name": pod_name, "installer": installer,
699               "version": version, "details": payload}
700
701     headers = {'Content-Type': 'application/json'}
702     try:
703         r = requests.post(url, data=json.dumps(params), headers=headers)
704         if logger:
705             logger.debug(r)
706         return True
707     except:
708         print "Error:", sys.exc_info()[0]
709         return False
710
711
712 def get_resolvconf_ns():
713     nameservers = []
714     rconf = open("/etc/resolv.conf", "r")
715     line = rconf.readline()
716     while line:
717         ip = re.search(r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b", line)
718         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
719         if ip:
720             result = sock.connect_ex((ip.group(),53))
721             if result == 0:
722                 nameservers.append(ip.group())
723         line = rconf.readline()
724     return nameservers
725
726 def getTestEnv(test, functest_yaml):
727     # get the config of the testcase based on functest_config.yaml
728     # 2 options
729     # - test = test project e.g; ovno
730     # - test = testcase e.g. functest/odl
731     # look for the / to see if it is a test project or a testcase
732     try:
733         TEST_ENV = functest_yaml.get("test-dependencies")
734
735         if test.find("/") < 0:
736             config_test = TEST_ENV[test]
737         else:
738             test_split = test.split("/")
739             testproject = test_split[0]
740             testcase = test_split[1]
741             config_test = TEST_ENV[testproject][testcase]
742     except KeyError:
743         # if not defined in dependencies => no dependencies
744         config_test = ""
745     except:
746         print "Error getTestEnv:", sys.exc_info()[0]
747
748     return config_test
749
750
751 def get_ci_envvars():
752     """
753     Get the CI env variables
754     """
755     ci_env_var = {
756         "installer": os.environ.get('INSTALLER_TYPE'),
757         "scenario": os.environ.get('DEPLOY_SCENARIO')}
758     return ci_env_var
759
760
761 def isTestRunnable(test, functest_yaml):
762     # By default we assume that all the tests are always runnable...
763     is_runnable = True
764     # Retrieve CI environment
765     ci_env = get_ci_envvars()
766     # Retrieve test environement from config file
767     test_env = getTestEnv(test, functest_yaml)
768
769     # if test_env not empty => dependencies to be checked
770     if test_env is not None and len(test_env) > 0:
771         # possible criteria = ["installer", "scenario"]
772         # consider test criteria from config file
773         # compare towards CI env through CI en variable
774         for criteria in test_env:
775             if re.search(test_env[criteria], ci_env[criteria]) is None:
776                 # print "Test "+ test + " cannot be run on the environment"
777                 is_runnable = False
778     return is_runnable
779
780
781 def generateTestcaseList(functest_yaml):
782     test_list = ""
783     # get testcases
784     testcase_list = functest_yaml.get("test-dependencies")
785     projects = testcase_list.keys()
786
787     for project in projects:
788         testcases = testcase_list[project]
789         # 1 or 2 levels for testcases project[/case]
790         # if only project name without controller or scenario
791         # => shall be runnable on any controller/scenario
792         if testcases is None:
793             test_list += project + " "
794         else:
795             for testcase in testcases:
796                 if testcase == "installer" or testcase == "scenario":
797                     # project (1 level)
798                     if isTestRunnable(project, functest_yaml):
799                         test_list += project + " "
800                 else:
801                     # project/testcase (2 levels)
802                     thetest = project + "/" + testcase
803                     if isTestRunnable(thetest, functest_yaml):
804                         test_list += testcase + " "
805
806     # sort the list to execute the test in the right order
807     test_order_list = functest_yaml.get("test_exec_priority")
808     test_sorted_list = ""
809     for test in test_order_list:
810         if test_order_list[test] in test_list:
811             test_sorted_list += test_order_list[test] + " "
812
813     # create a file that could be consumed by run-test.sh
814     # this method is used only for CI
815     # so it can be run only in container
816     # reuse default conf directory to store the list of runnable tests
817     file = open("/home/opnfv/functest/conf/testcase-list.txt", 'w')
818     file.write(test_sorted_list)
819     file.close()
820
821     return test_sorted_list
822