e76ef991b2bac05adcd95c59a41108ab742dca7f
[functest.git] / testcases / functest_utils.py
1 #!/usr/bin/env python
2 #
3 # jose.lausuch@ericsson.com
4 # valentin.boucher@orange.com
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10
11
12 import os
13 import os.path
14 import urllib2
15 import subprocess
16 import sys
17 import requests
18 import json
19 import shutil
20 import re
21 import yaml
22 import socket
23 from git import Repo
24
25
26 # ############ CREDENTIALS OPENSTACK #############
27 def check_credentials():
28     """
29     Check if the OpenStack credentials (openrc) are sourced
30     """
31     env_vars = ['OS_AUTH_URL', 'OS_USERNAME', 'OS_PASSWORD', 'OS_TENANT_NAME']
32     return all(map(lambda v: v in os.environ and os.environ[v], env_vars))
33
34
35 def get_credentials(service):
36     """Returns a creds dictionary filled with the following keys:
37     * username
38     * password/api_key (depending on the service)
39     * tenant_name/project_id (depending on the service)
40     * auth_url
41     :param service: a string indicating the name of the service
42                     requesting the credentials.
43     """
44     creds = {}
45     # Unfortunately, each of the OpenStack client will request slightly
46     # different entries in their credentials dict.
47     if service.lower() in ("nova", "cinder"):
48         password = "api_key"
49         tenant = "project_id"
50     else:
51         password = "password"
52         tenant = "tenant_name"
53
54     # The most common way to pass these info to the script is to do it through
55     # environment variables.
56     creds.update({
57         "username": os.environ.get('OS_USERNAME', "admin"),
58         password: os.environ.get("OS_PASSWORD", 'admin'),
59         "auth_url": os.environ.get("OS_AUTH_URL",
60                                    "http://192.168.20.71:5000/v2.0"),
61         tenant: os.environ.get("OS_TENANT_NAME", "admin"),
62     })
63
64     return creds
65
66
67 # ################ NOVA #################
68 def get_instances(nova_client):
69     try:
70         instances = nova_client.servers.list(search_opts={'all_tenants': 1})
71         return instances
72     except:
73         return None
74
75
76 def get_instance_status(nova_client, instance):
77     try:
78         instance = nova_client.servers.get(instance.id)
79         return instance.status
80     except:
81         return None
82
83
84 def get_instance_by_name(nova_client, instance_name):
85     try:
86         instance = nova_client.servers.find(name=instance_name)
87         return instance
88     except:
89         return None
90
91
92 def get_flavor_id(nova_client, flavor_name):
93     flavors = nova_client.flavors.list(detailed=True)
94     id = ''
95     for f in flavors:
96         if f.name == flavor_name:
97             id = f.id
98             break
99     return id
100
101
102 def get_flavor_id_by_ram_range(nova_client, min_ram, max_ram):
103     flavors = nova_client.flavors.list(detailed=True)
104     id = ''
105     for f in flavors:
106         if min_ram <= f.ram and f.ram <= max_ram:
107             id = f.id
108             break
109     return id
110
111
112 def delete_instance(nova_client, instance_id):
113     try:
114         nova_client.servers.force_delete(instance_id)
115         return True
116     except:
117         print "Error:", sys.exc_info()[0]
118         return False
119
120 def create_floating_ip(neutron_client):
121     extnet_id = get_external_net_id(neutron_client)
122     props = {'floating_network_id': extnet_id}
123     try:
124         ip_json = neutron_client.create_floatingip({'floatingip': props})
125         floating_ip = ip_json['floatingip']['floating_ip_address']
126     except:
127         return None
128     return floating_ip
129
130 def get_floating_ips(nova_client):
131     try:
132         floating_ips = nova_client.floating_ips.list()
133         return floating_ips
134     except:
135         return None
136
137 def add_floating_ip(nova_client, server_id, floatingip_id):
138     try:
139         nova_client.servers.add_floating_ip(server_id,floatingip_id)
140         return True
141     except:
142         print "Error:", sys.exc_info()[0]
143         return None
144
145 def delete_floating_ip(nova_client, floatingip_id):
146     try:
147         nova_client.floating_ips.delete(floatingip_id)
148         return True
149     except:
150         print "Error:", sys.exc_info()[0]
151         return None
152
153
154 # ################ NEUTRON #################
155 def create_neutron_net(neutron_client, name):
156     json_body = {'network': {'name': name,
157                              'admin_state_up': True}}
158     try:
159         network = neutron_client.create_network(body=json_body)
160         network_dict = network['network']
161         return network_dict['id']
162     except:
163         print "Error:", sys.exc_info()[0]
164         return False
165
166
167 def update_neutron_net(neutron_client, network_id, shared=False):
168     json_body = {'network': {'shared': shared}}
169     try:
170         neutron_client.update_network(network_id, body=json_body)
171         return True
172     except:
173         print "Error:", sys.exc_info()[0]
174         return False
175
176
177 def delete_neutron_net(neutron_client, network_id):
178     try:
179         neutron_client.delete_network(network_id)
180         return True
181     except:
182         print "Error:", sys.exc_info()[0]
183         return False
184
185
186 def create_neutron_subnet(neutron_client, name, cidr, net_id):
187     json_body = {'subnets': [{'name': name, 'cidr': cidr,
188                               'ip_version': 4, 'network_id': net_id}]}
189     try:
190         subnet = neutron_client.create_subnet(body=json_body)
191         return subnet['subnets'][0]['id']
192     except:
193         print "Error:", sys.exc_info()[0]
194         return False
195
196
197 def delete_neutron_subnet(neutron_client, subnet_id):
198     try:
199         neutron_client.delete_subnet(subnet_id)
200         return True
201     except:
202         print "Error:", sys.exc_info()[0]
203         return False
204
205
206 def create_neutron_router(neutron_client, name):
207     json_body = {'router': {'name': name, 'admin_state_up': True}}
208     try:
209         router = neutron_client.create_router(json_body)
210         return router['router']['id']
211     except:
212         print "Error:", sys.exc_info()[0]
213         return False
214
215
216 def delete_neutron_router(neutron_client, router_id):
217     json_body = {'router': {'id': router_id}}
218     try:
219         neutron_client.delete_router(router=router_id)
220         return True
221     except:
222         print "Error:", sys.exc_info()[0]
223         return False
224
225
226 def add_interface_router(neutron_client, router_id, subnet_id):
227     json_body = {"subnet_id": subnet_id}
228     try:
229         neutron_client.add_interface_router(router=router_id, body=json_body)
230         return True
231     except:
232         print "Error:", sys.exc_info()[0]
233         return False
234
235
236 def remove_interface_router(neutron_client, router_id, subnet_id):
237     json_body = {"subnet_id": subnet_id}
238     try:
239         neutron_client.remove_interface_router(router=router_id,
240                                                body=json_body)
241         return True
242     except:
243         print "Error:", sys.exc_info()[0]
244         return False
245
246 def add_gateway_router(neutron_client, router_id):
247     ext_net_id = get_external_net_id(neutron_client)
248     router_dict = {'network_id': ext_net_id}
249     try:
250         neutron_client.add_gateway_router(router_id,router_dict)
251         return True
252     except:
253         print "Error:", sys.exc_info()[0]
254         return False
255
256 def remove_gateway_router(neutron_client, router_id):
257     try:
258         neutron_client.remove_gateway_router(router_id)
259         return True
260     except:
261         print "Error:", sys.exc_info()[0]
262         return False
263
264
265 def create_neutron_port(neutron_client, name, network_id, ip):
266     json_body = {'port': {
267                  'admin_state_up': True,
268                  'name': name,
269                  'network_id': network_id,
270                  'fixed_ips': [{"ip_address": ip}]
271                  }}
272     try:
273         port = neutron_client.create_port(body=json_body)
274         return port['port']['id']
275     except:
276         print "Error:", sys.exc_info()[0]
277         return False
278
279
280 def update_neutron_port(neutron_client, port_id, device_owner):
281     json_body = {'port': {
282                  'device_owner': device_owner,
283                  }}
284     try:
285         port = neutron_client.update_port(port=port_id,
286                                           body=json_body)
287         return port['port']['id']
288     except:
289         print "Error:", sys.exc_info()[0]
290         return False
291
292
293 def delete_neutron_port(neutron_client, port_id):
294     try:
295         neutron_client.delete_port(port_id)
296         return True
297     except:
298         print "Error:", sys.exc_info()[0]
299         return False
300
301
302 def get_network_id(neutron_client, network_name):
303     networks = neutron_client.list_networks()['networks']
304     id = ''
305     for n in networks:
306         if n['name'] == network_name:
307             id = n['id']
308             break
309     return id
310
311
312 def check_neutron_net(neutron_client, net_name):
313     for network in neutron_client.list_networks()['networks']:
314         if network['name'] == net_name:
315             for subnet in network['subnets']:
316                 return True
317     return False
318
319
320 def get_network_list(neutron_client):
321     network_list = neutron_client.list_networks()['networks']
322     if len(network_list) == 0:
323         return None
324     else:
325         return network_list
326
327
328 def get_router_list(neutron_client):
329     router_list = neutron_client.list_routers()['routers']
330     if len(router_list) == 0:
331         return None
332     else:
333         return router_list
334
335
336 def get_port_list(neutron_client):
337     port_list = neutron_client.list_ports()['ports']
338     if len(port_list) == 0:
339         return None
340     else:
341         return port_list
342
343
344 def get_external_net(neutron_client):
345     for network in neutron_client.list_networks()['networks']:
346         if network['router:external']:
347             return network['name']
348     return False
349
350
351 def get_external_net_id(neutron_client):
352     for network in neutron_client.list_networks()['networks']:
353         if network['router:external']:
354             return network['id']
355     return False
356
357
358 def update_sg_quota(neutron_client, tenant_id, sg_quota, sg_rule_quota):
359     json_body = {"quota": {
360         "security_group": sg_quota,
361         "security_group_rule": sg_rule_quota
362     }}
363
364     try:
365         quota = neutron_client.update_quota(tenant_id=tenant_id,
366                                             body=json_body)
367         return True
368     except:
369         print "Error:", sys.exc_info()[0]
370         return False
371
372
373 def update_cinder_quota(cinder_client, tenant_id, vols_quota,
374                         snapshots_quota, gigabytes_quota):
375     quotas_values = {"volumes": vols_quota,
376                      "snapshots": snapshots_quota,
377                      "gigabytes": gigabytes_quota}
378
379     try:
380         quotas_default = cinder_client.quotas.update(tenant_id,
381                                                       **quotas_values)
382         return True
383     except:
384         print "Error:", sys.exc_info()[0]
385         return False
386
387
388 def get_private_net(neutron_client):
389     # Checks if there is an existing shared private network
390     networks = neutron_client.list_networks()['networks']
391     if len(networks) == 0:
392         return None
393     for net in networks:
394         if (net['router:external'] is False) and (net['shared'] is True):
395             return net
396     return None
397
398
399 # ################ GLANCE #################
400 def get_images(nova_client):
401     try:
402         images = nova_client.images.list()
403         return images
404     except:
405         return None
406
407
408 def get_image_id(glance_client, image_name):
409     images = glance_client.images.list()
410     id = ''
411     for i in images:
412         if i.name == image_name:
413             id = i.id
414             break
415     return id
416
417
418 def create_glance_image(glance_client, image_name, file_path, public=True):
419     if not os.path.isfile(file_path):
420         print "Error: file " + file_path + " does not exist."
421         return False
422     try:
423         with open(file_path) as fimage:
424             image = glance_client.images.create(name=image_name,
425                                                 is_public=public,
426                                                 disk_format="qcow2",
427                                                 container_format="bare",
428                                                 data=fimage)
429         return image.id
430     except:
431         print "Error:", sys.exc_info()[0]
432         return False
433
434
435 def delete_glance_image(nova_client, image_id):
436     try:
437         nova_client.images.delete(image_id)
438         return True
439     except:
440         print "Error:", sys.exc_info()[0]
441         return False
442
443
444 # ################ CINDER #################
445 def get_volumes(cinder_client):
446     try:
447         volumes = cinder_client.volumes.list(search_opts={'all_tenants': 1})
448         return volumes
449     except:
450         return None
451
452
453 def delete_volume(cinder_client, volume_id, forced=False):
454     try:
455         if forced:
456             try:
457                 cinder_client.volumes.detach(volume_id)
458             except:
459                 print "Error:", sys.exc_info()[0]
460             cinder_client.volumes.force_delete(volume_id)
461         else:
462             cinder_client.volumes.delete(volume_id)
463         return True
464     except:
465         print "Error:", sys.exc_info()[0]
466         return False
467
468
469 # ################ CINDER #################
470 def get_security_groups(neutron_client):
471     try:
472         security_groups = neutron_client.list_security_groups()[
473             'security_groups']
474         return security_groups
475     except:
476         return None
477
478
479 def delete_security_group(neutron_client, secgroup_id):
480     try:
481         neutron_client.delete_security_group(secgroup_id)
482         return True
483     except:
484         print "Error:", sys.exc_info()[0]
485         return False
486
487
488 # ################ KEYSTONE #################
489 def get_tenants(keystone_client):
490     try:
491         tenants = keystone_client.tenants.list()
492         return tenants
493     except:
494         return None
495
496
497 def get_tenant_id(keystone_client, tenant_name):
498     tenants = keystone_client.tenants.list()
499     id = ''
500     for t in tenants:
501         if t.name == tenant_name:
502             id = t.id
503             break
504     return id
505
506
507 def get_users(keystone_client):
508     try:
509         users = keystone_client.users.list()
510         return users
511     except:
512         return None
513
514
515 def get_role_id(keystone_client, role_name):
516     roles = keystone_client.roles.list()
517     id = ''
518     for r in roles:
519         if r.name == role_name:
520             id = r.id
521             break
522     return id
523
524
525 def get_user_id(keystone_client, user_name):
526     users = keystone_client.users.list()
527     id = ''
528     for u in users:
529         if u.name == user_name:
530             id = u.id
531             break
532     return id
533
534
535 def create_tenant(keystone_client, tenant_name, tenant_description):
536     try:
537         tenant = keystone_client.tenants.create(tenant_name,
538                                                 tenant_description,
539                                                 enabled=True)
540         return tenant.id
541     except:
542         print "Error:", sys.exc_info()[0]
543         return False
544
545
546 def delete_tenant(keystone_client, tenant_id):
547     try:
548         tenant = keystone_client.tenants.delete(tenant_id)
549         return True
550     except:
551         print "Error:", sys.exc_info()[0]
552         return False
553
554
555 def create_user(keystone_client, user_name, user_password,
556                 user_email, tenant_id):
557     try:
558         user = keystone_client.users.create(user_name, user_password,
559                                             user_email, tenant_id,
560                                             enabled=True)
561         return user.id
562     except:
563         print "Error:", sys.exc_info()[0]
564         return False
565
566
567 def delete_user(keystone_client, user_id):
568     try:
569         tenant = keystone_client.users.delete(user_id)
570         return True
571     except:
572         print "Error:", sys.exc_info()[0]
573         return False
574
575
576 def add_role_user(keystone_client, user_id, role_id, tenant_id):
577     try:
578         keystone_client.roles.add_user_role(user_id, role_id, tenant_id)
579         return True
580     except:
581         print "Error:", sys.exc_info()[0]
582         return False
583
584
585 # ################ UTILS #################
586 def check_internet_connectivity(url='http://www.opnfv.org/'):
587     """
588     Check if there is access to the internet
589     """
590     try:
591         urllib2.urlopen(url, timeout=5)
592         return True
593     except urllib2.URLError:
594         return False
595
596
597 def download_url(url, dest_path):
598     """
599     Download a file to a destination path given a URL
600     """
601     name = url.rsplit('/')[-1]
602     dest = dest_path + "/" + name
603     try:
604         response = urllib2.urlopen(url)
605     except (urllib2.HTTPError, urllib2.URLError):
606         return False
607
608     with open(dest, 'wb') as f:
609         shutil.copyfileobj(response, f)
610     return True
611
612
613 def execute_command(cmd, logger=None, exit_on_error=True):
614     """
615     Execute Linux command
616     """
617     if logger:
618         logger.debug('Executing command : {}'.format(cmd))
619     output_file = "output.txt"
620     f = open(output_file, 'w+')
621     p = subprocess.call(cmd, shell=True, stdout=f, stderr=subprocess.STDOUT)
622     f.close()
623     f = open(output_file, 'r')
624     result = f.read()
625     if result != "" and logger:
626         logger.debug(result)
627     if p == 0:
628         return True
629     else:
630         if logger:
631             logger.error("Error when executing command %s" % cmd)
632         if exit_on_error:
633             exit(-1)
634         return False
635
636
637 def get_git_branch(repo_path):
638     """
639     Get git branch name
640     """
641     repo = Repo(repo_path)
642     branch = repo.active_branch
643     return branch.name
644
645
646 def get_installer_type(logger=None):
647     """
648     Get installer type (fuel, apex, joid, compass)
649     """
650     try:
651         installer = os.environ['INSTALLER_TYPE']
652     except KeyError:
653         if logger:
654             logger.error("Impossible to retrieve the installer type")
655         installer = "Unkown"
656
657     return installer
658
659
660 def get_pod_name(logger=None):
661     """
662     Get PoD Name from env variable NODE_NAME
663     """
664     try:
665         return os.environ['NODE_NAME']
666     except KeyError:
667         if logger:
668             logger.error(
669                 "Unable to retrieve the POD name from environment.Using pod name 'unknown-pod'")
670         return "unknown-pod"
671
672
673 def push_results_to_db(db_url, case_name, logger, pod_name,
674                        git_version, payload):
675     url = db_url + "/results"
676     installer = get_installer_type(logger)
677     params = {"project_name": "functest", "case_name": case_name,
678               "pod_name": pod_name, "installer": installer,
679               "version": git_version, "details": payload}
680
681     headers = {'Content-Type': 'application/json'}
682     try:
683         r = requests.post(url, data=json.dumps(params), headers=headers)
684         if logger:
685             logger.debug(r)
686         return True
687     except:
688         print "Error:", sys.exc_info()[0]
689         return False
690
691
692 def get_resolvconf_ns():
693     nameservers = []
694     rconf = open("/etc/resolv.conf", "r")
695     line = rconf.readline()
696     while line:
697         ip = re.search(r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b", line)
698         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
699         if ip:
700             result = sock.connect_ex((ip.group(),53))
701             if result == 0:
702                 nameservers.append(ip.group())
703         line = rconf.readline()
704     return nameservers
705
706 def getTestEnv(test, functest_yaml):
707     # get the config of the testcase based on functest_config.yaml
708     # 2 options
709     # - test = test project e.g; ovno
710     # - test = testcase e.g. functest/odl
711     # look for the / to see if it is a test project or a testcase
712     try:
713         TEST_ENV = functest_yaml.get("test-dependencies")
714
715         if test.find("/") < 0:
716             config_test = TEST_ENV[test]
717         else:
718             test_split = test.split("/")
719             testproject = test_split[0]
720             testcase = test_split[1]
721             config_test = TEST_ENV[testproject][testcase]
722     except KeyError:
723         # if not defined in dependencies => no dependencies
724         config_test = ""
725     except:
726         print "Error getTestEnv:", sys.exc_info()[0]
727
728     return config_test
729
730
731 def get_ci_envvars():
732     """
733     Get the CI env variables
734     """
735     ci_env_var = {
736         "installer": os.environ.get('INSTALLER_TYPE'),
737         "scenario": os.environ.get('DEPLOY_SCENARIO')}
738     return ci_env_var
739
740
741 def isTestRunnable(test, functest_yaml):
742     # By default we assume that all the tests are always runnable...
743     is_runnable = True
744     # Retrieve CI environment
745     ci_env = get_ci_envvars()
746     # Retrieve test environement from config file
747     test_env = getTestEnv(test, functest_yaml)
748
749     # if test_env not empty => dependencies to be checked
750     if test_env is not None and len(test_env) > 0:
751         # possible criteria = ["installer", "scenario"]
752         # consider test criteria from config file
753         # compare towards CI env through CI en variable
754         for criteria in test_env:
755             if re.search(test_env[criteria], ci_env[criteria]) is None:
756                 # print "Test "+ test + " cannot be run on the environment"
757                 is_runnable = False
758     return is_runnable
759
760
761 def generateTestcaseList(functest_yaml):
762     test_list = ""
763     # get testcases
764     testcase_list = functest_yaml.get("test-dependencies")
765     projects = testcase_list.keys()
766
767     for project in projects:
768         testcases = testcase_list[project]
769         # 1 or 2 levels for testcases project[/case]
770         # if only project name without controller or scenario
771         # => shall be runnable on any controller/scenario
772         if testcases is None:
773             test_list += project + " "
774         else:
775             for testcase in testcases:
776                 if testcase == "installer" or testcase == "scenario":
777                     # project (1 level)
778                     if isTestRunnable(project, functest_yaml):
779                         test_list += project + " "
780                 else:
781                     # project/testcase (2 levels)
782                     thetest = project + "/" + testcase
783                     if isTestRunnable(thetest, functest_yaml):
784                         test_list += testcase + " "
785
786     # sort the list to execute the test in the right order
787     test_order_list = functest_yaml.get("test_exec_priority")
788     test_sorted_list = ""
789     for test in test_order_list:
790         if test_order_list[test] in test_list:
791             test_sorted_list += test_order_list[test] + " "
792
793     # create a file that could be consumed by run-test.sh
794     # this method is used only for CI
795     # so it can be run only in container
796     # reuse default conf directory to store the list of runnable tests
797     file = open("/home/opnfv/functest/conf/testcase-list.txt", 'w')
798     file.write(test_sorted_list)
799     file.close()
800
801     return test_sorted_list
802