Add support for Promise test cases
[functest.git] / testcases / functest_utils.py
1 #!/usr/bin/env python
2 #
3 # jose.lausuch@ericsson.com
4 # valentin.boucher@orange.com
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10
11
12 import os
13 import os.path
14 import urllib2
15 import subprocess
16 import sys
17 import requests
18 import json
19 import shutil
20 import re
21 import yaml
22 import socket
23 from git import Repo
24
25
26 # ############ CREDENTIALS OPENSTACK #############
27 def check_credentials():
28     """
29     Check if the OpenStack credentials (openrc) are sourced
30     """
31     env_vars = ['OS_AUTH_URL', 'OS_USERNAME', 'OS_PASSWORD', 'OS_TENANT_NAME']
32     return all(map(lambda v: v in os.environ and os.environ[v], env_vars))
33
34
35 def get_credentials(service):
36     """Returns a creds dictionary filled with the following keys:
37     * username
38     * password/api_key (depending on the service)
39     * tenant_name/project_id (depending on the service)
40     * auth_url
41     :param service: a string indicating the name of the service
42                     requesting the credentials.
43     """
44     creds = {}
45     # Unfortunately, each of the OpenStack client will request slightly
46     # different entries in their credentials dict.
47     if service.lower() in ("nova", "cinder"):
48         password = "api_key"
49         tenant = "project_id"
50     else:
51         password = "password"
52         tenant = "tenant_name"
53
54     # The most common way to pass these info to the script is to do it through
55     # environment variables.
56     creds.update({
57         "username": os.environ.get('OS_USERNAME', "admin"),
58         password: os.environ.get("OS_PASSWORD", 'admin'),
59         "auth_url": os.environ.get("OS_AUTH_URL",
60                                    "http://192.168.20.71:5000/v2.0"),
61         tenant: os.environ.get("OS_TENANT_NAME", "admin"),
62     })
63
64     return creds
65
66
67 # ################ NOVA #################
68 def get_instances(nova_client):
69     try:
70         instances = nova_client.servers.list(search_opts={'all_tenants': 1})
71         return instances
72     except:
73         return None
74
75
76 def get_instance_status(nova_client, instance):
77     try:
78         instance = nova_client.servers.get(instance.id)
79         return instance.status
80     except:
81         return None
82
83
84 def get_instance_by_name(nova_client, instance_name):
85     try:
86         instance = nova_client.servers.find(name=instance_name)
87         return instance
88     except:
89         return None
90
91 def create_flavor(nova_client, flavor_name, ram, disk, vcpus):
92     try:
93         flavor = nova_client.flavors.create(flavor_name,ram,vcpus,disk)
94     except:
95         return None
96     return flavor.id
97
98 def get_flavor_id(nova_client, flavor_name):
99     flavors = nova_client.flavors.list(detailed=True)
100     id = ''
101     for f in flavors:
102         if f.name == flavor_name:
103             id = f.id
104             break
105     return id
106
107
108 def get_flavor_id_by_ram_range(nova_client, min_ram, max_ram):
109     flavors = nova_client.flavors.list(detailed=True)
110     id = ''
111     for f in flavors:
112         if min_ram <= f.ram and f.ram <= max_ram:
113             id = f.id
114             break
115     return id
116
117
118 def delete_instance(nova_client, instance_id):
119     try:
120         nova_client.servers.force_delete(instance_id)
121         return True
122     except:
123         print "Error:", sys.exc_info()[0]
124         return False
125
126 def create_floating_ip(neutron_client):
127     extnet_id = get_external_net_id(neutron_client)
128     props = {'floating_network_id': extnet_id}
129     try:
130         ip_json = neutron_client.create_floatingip({'floatingip': props})
131         floating_ip = ip_json['floatingip']['floating_ip_address']
132     except:
133         return None
134     return floating_ip
135
136 def get_floating_ips(nova_client):
137     try:
138         floating_ips = nova_client.floating_ips.list()
139         return floating_ips
140     except:
141         return None
142
143 def add_floating_ip(nova_client, server_id, floatingip_id):
144     try:
145         nova_client.servers.add_floating_ip(server_id,floatingip_id)
146         return True
147     except:
148         print "Error:", sys.exc_info()[0]
149         return None
150
151 def delete_floating_ip(nova_client, floatingip_id):
152     try:
153         nova_client.floating_ips.delete(floatingip_id)
154         return True
155     except:
156         print "Error:", sys.exc_info()[0]
157         return None
158
159
160 # ################ NEUTRON #################
161 def create_neutron_net(neutron_client, name):
162     json_body = {'network': {'name': name,
163                              'admin_state_up': True}}
164     try:
165         network = neutron_client.create_network(body=json_body)
166         network_dict = network['network']
167         return network_dict['id']
168     except:
169         print "Error:", sys.exc_info()[0]
170         return False
171
172
173 def update_neutron_net(neutron_client, network_id, shared=False):
174     json_body = {'network': {'shared': shared}}
175     try:
176         neutron_client.update_network(network_id, body=json_body)
177         return True
178     except:
179         print "Error:", sys.exc_info()[0]
180         return False
181
182
183 def delete_neutron_net(neutron_client, network_id):
184     try:
185         neutron_client.delete_network(network_id)
186         return True
187     except:
188         print "Error:", sys.exc_info()[0]
189         return False
190
191
192 def create_neutron_subnet(neutron_client, name, cidr, net_id):
193     json_body = {'subnets': [{'name': name, 'cidr': cidr,
194                               'ip_version': 4, 'network_id': net_id}]}
195     try:
196         subnet = neutron_client.create_subnet(body=json_body)
197         return subnet['subnets'][0]['id']
198     except:
199         print "Error:", sys.exc_info()[0]
200         return False
201
202
203 def delete_neutron_subnet(neutron_client, subnet_id):
204     try:
205         neutron_client.delete_subnet(subnet_id)
206         return True
207     except:
208         print "Error:", sys.exc_info()[0]
209         return False
210
211
212 def create_neutron_router(neutron_client, name):
213     json_body = {'router': {'name': name, 'admin_state_up': True}}
214     try:
215         router = neutron_client.create_router(json_body)
216         return router['router']['id']
217     except:
218         print "Error:", sys.exc_info()[0]
219         return False
220
221
222 def delete_neutron_router(neutron_client, router_id):
223     json_body = {'router': {'id': router_id}}
224     try:
225         neutron_client.delete_router(router=router_id)
226         return True
227     except:
228         print "Error:", sys.exc_info()[0]
229         return False
230
231
232 def add_interface_router(neutron_client, router_id, subnet_id):
233     json_body = {"subnet_id": subnet_id}
234     try:
235         neutron_client.add_interface_router(router=router_id, body=json_body)
236         return True
237     except:
238         print "Error:", sys.exc_info()[0]
239         return False
240
241
242 def remove_interface_router(neutron_client, router_id, subnet_id):
243     json_body = {"subnet_id": subnet_id}
244     try:
245         neutron_client.remove_interface_router(router=router_id,
246                                                body=json_body)
247         return True
248     except:
249         print "Error:", sys.exc_info()[0]
250         return False
251
252 def add_gateway_router(neutron_client, router_id):
253     ext_net_id = get_external_net_id(neutron_client)
254     router_dict = {'network_id': ext_net_id}
255     try:
256         neutron_client.add_gateway_router(router_id,router_dict)
257         return True
258     except:
259         print "Error:", sys.exc_info()[0]
260         return False
261
262 def remove_gateway_router(neutron_client, router_id):
263     try:
264         neutron_client.remove_gateway_router(router_id)
265         return True
266     except:
267         print "Error:", sys.exc_info()[0]
268         return False
269
270
271 def create_neutron_port(neutron_client, name, network_id, ip):
272     json_body = {'port': {
273                  'admin_state_up': True,
274                  'name': name,
275                  'network_id': network_id,
276                  'fixed_ips': [{"ip_address": ip}]
277                  }}
278     try:
279         port = neutron_client.create_port(body=json_body)
280         return port['port']['id']
281     except:
282         print "Error:", sys.exc_info()[0]
283         return False
284
285
286 def update_neutron_port(neutron_client, port_id, device_owner):
287     json_body = {'port': {
288                  'device_owner': device_owner,
289                  }}
290     try:
291         port = neutron_client.update_port(port=port_id,
292                                           body=json_body)
293         return port['port']['id']
294     except:
295         print "Error:", sys.exc_info()[0]
296         return False
297
298
299 def delete_neutron_port(neutron_client, port_id):
300     try:
301         neutron_client.delete_port(port_id)
302         return True
303     except:
304         print "Error:", sys.exc_info()[0]
305         return False
306
307
308 def get_network_id(neutron_client, network_name):
309     networks = neutron_client.list_networks()['networks']
310     id = ''
311     for n in networks:
312         if n['name'] == network_name:
313             id = n['id']
314             break
315     return id
316
317
318 def check_neutron_net(neutron_client, net_name):
319     for network in neutron_client.list_networks()['networks']:
320         if network['name'] == net_name:
321             for subnet in network['subnets']:
322                 return True
323     return False
324
325
326 def get_network_list(neutron_client):
327     network_list = neutron_client.list_networks()['networks']
328     if len(network_list) == 0:
329         return None
330     else:
331         return network_list
332
333
334 def get_router_list(neutron_client):
335     router_list = neutron_client.list_routers()['routers']
336     if len(router_list) == 0:
337         return None
338     else:
339         return router_list
340
341
342 def get_port_list(neutron_client):
343     port_list = neutron_client.list_ports()['ports']
344     if len(port_list) == 0:
345         return None
346     else:
347         return port_list
348
349
350 def get_external_net(neutron_client):
351     for network in neutron_client.list_networks()['networks']:
352         if network['router:external']:
353             return network['name']
354     return False
355
356
357 def get_external_net_id(neutron_client):
358     for network in neutron_client.list_networks()['networks']:
359         if network['router:external']:
360             return network['id']
361     return False
362
363
364 def update_sg_quota(neutron_client, tenant_id, sg_quota, sg_rule_quota):
365     json_body = {"quota": {
366         "security_group": sg_quota,
367         "security_group_rule": sg_rule_quota
368     }}
369
370     try:
371         quota = neutron_client.update_quota(tenant_id=tenant_id,
372                                             body=json_body)
373         return True
374     except:
375         print "Error:", sys.exc_info()[0]
376         return False
377
378
379 def update_cinder_quota(cinder_client, tenant_id, vols_quota,
380                         snapshots_quota, gigabytes_quota):
381     quotas_values = {"volumes": vols_quota,
382                      "snapshots": snapshots_quota,
383                      "gigabytes": gigabytes_quota}
384
385     try:
386         quotas_default = cinder_client.quotas.update(tenant_id,
387                                                       **quotas_values)
388         return True
389     except:
390         print "Error:", sys.exc_info()[0]
391         return False
392
393
394 def get_private_net(neutron_client):
395     # Checks if there is an existing shared private network
396     networks = neutron_client.list_networks()['networks']
397     if len(networks) == 0:
398         return None
399     for net in networks:
400         if (net['router:external'] is False) and (net['shared'] is True):
401             return net
402     return None
403
404
405 # ################ GLANCE #################
406 def get_images(nova_client):
407     try:
408         images = nova_client.images.list()
409         return images
410     except:
411         return None
412
413
414 def get_image_id(glance_client, image_name):
415     images = glance_client.images.list()
416     id = ''
417     for i in images:
418         if i.name == image_name:
419             id = i.id
420             break
421     return id
422
423
424 def create_glance_image(glance_client, image_name, file_path, public=True):
425     if not os.path.isfile(file_path):
426         print "Error: file " + file_path + " does not exist."
427         return False
428     try:
429         with open(file_path) as fimage:
430             image = glance_client.images.create(name=image_name,
431                                                 is_public=public,
432                                                 disk_format="qcow2",
433                                                 container_format="bare",
434                                                 data=fimage)
435         return image.id
436     except:
437         print "Error:", sys.exc_info()[0]
438         return False
439
440
441 def delete_glance_image(nova_client, image_id):
442     try:
443         nova_client.images.delete(image_id)
444         return True
445     except:
446         print "Error:", sys.exc_info()[0]
447         return False
448
449
450 # ################ CINDER #################
451 def get_volumes(cinder_client):
452     try:
453         volumes = cinder_client.volumes.list(search_opts={'all_tenants': 1})
454         return volumes
455     except:
456         return None
457
458
459 def delete_volume(cinder_client, volume_id, forced=False):
460     try:
461         if forced:
462             try:
463                 cinder_client.volumes.detach(volume_id)
464             except:
465                 print "Error:", sys.exc_info()[0]
466             cinder_client.volumes.force_delete(volume_id)
467         else:
468             cinder_client.volumes.delete(volume_id)
469         return True
470     except:
471         print "Error:", sys.exc_info()[0]
472         return False
473
474
475 # ################ CINDER #################
476 def get_security_groups(neutron_client):
477     try:
478         security_groups = neutron_client.list_security_groups()[
479             'security_groups']
480         return security_groups
481     except:
482         return None
483
484
485 def delete_security_group(neutron_client, secgroup_id):
486     try:
487         neutron_client.delete_security_group(secgroup_id)
488         return True
489     except:
490         print "Error:", sys.exc_info()[0]
491         return False
492
493
494 # ################ KEYSTONE #################
495 def get_tenants(keystone_client):
496     try:
497         tenants = keystone_client.tenants.list()
498         return tenants
499     except:
500         return None
501
502
503 def get_tenant_id(keystone_client, tenant_name):
504     tenants = keystone_client.tenants.list()
505     id = ''
506     for t in tenants:
507         if t.name == tenant_name:
508             id = t.id
509             break
510     return id
511
512
513 def get_users(keystone_client):
514     try:
515         users = keystone_client.users.list()
516         return users
517     except:
518         return None
519
520
521 def get_role_id(keystone_client, role_name):
522     roles = keystone_client.roles.list()
523     id = ''
524     for r in roles:
525         if r.name == role_name:
526             id = r.id
527             break
528     return id
529
530
531 def get_user_id(keystone_client, user_name):
532     users = keystone_client.users.list()
533     id = ''
534     for u in users:
535         if u.name == user_name:
536             id = u.id
537             break
538     return id
539
540
541 def create_tenant(keystone_client, tenant_name, tenant_description):
542     try:
543         tenant = keystone_client.tenants.create(tenant_name,
544                                                 tenant_description,
545                                                 enabled=True)
546         return tenant.id
547     except:
548         print "Error:", sys.exc_info()[0]
549         return False
550
551
552 def delete_tenant(keystone_client, tenant_id):
553     try:
554         tenant = keystone_client.tenants.delete(tenant_id)
555         return True
556     except:
557         print "Error:", sys.exc_info()[0]
558         return False
559
560
561 def create_user(keystone_client, user_name, user_password,
562                 user_email, tenant_id):
563     try:
564         user = keystone_client.users.create(user_name, user_password,
565                                             user_email, tenant_id,
566                                             enabled=True)
567         return user.id
568     except:
569         print "Error:", sys.exc_info()[0]
570         return False
571
572
573 def delete_user(keystone_client, user_id):
574     try:
575         tenant = keystone_client.users.delete(user_id)
576         return True
577     except:
578         print "Error:", sys.exc_info()[0]
579         return False
580
581
582 def add_role_user(keystone_client, user_id, role_id, tenant_id):
583     try:
584         keystone_client.roles.add_user_role(user_id, role_id, tenant_id)
585         return True
586     except:
587         print "Error:", sys.exc_info()[0]
588         return False
589
590
591 # ################ UTILS #################
592 def check_internet_connectivity(url='http://www.opnfv.org/'):
593     """
594     Check if there is access to the internet
595     """
596     try:
597         urllib2.urlopen(url, timeout=5)
598         return True
599     except urllib2.URLError:
600         return False
601
602
603 def download_url(url, dest_path):
604     """
605     Download a file to a destination path given a URL
606     """
607     name = url.rsplit('/')[-1]
608     dest = dest_path + "/" + name
609     try:
610         response = urllib2.urlopen(url)
611     except (urllib2.HTTPError, urllib2.URLError):
612         return False
613
614     with open(dest, 'wb') as f:
615         shutil.copyfileobj(response, f)
616     return True
617
618
619 def execute_command(cmd, logger=None, exit_on_error=True):
620     """
621     Execute Linux command
622     """
623     if logger:
624         logger.debug('Executing command : {}'.format(cmd))
625     output_file = "output.txt"
626     f = open(output_file, 'w+')
627     p = subprocess.call(cmd, shell=True, stdout=f, stderr=subprocess.STDOUT)
628     f.close()
629     f = open(output_file, 'r')
630     result = f.read()
631     if result != "" and logger:
632         logger.debug(result)
633     if p == 0:
634         return True
635     else:
636         if logger:
637             logger.error("Error when executing command %s" % cmd)
638         if exit_on_error:
639             exit(-1)
640         return False
641
642
643 def get_git_branch(repo_path):
644     """
645     Get git branch name
646     """
647     repo = Repo(repo_path)
648     branch = repo.active_branch
649     return branch.name
650
651
652 def get_installer_type(logger=None):
653     """
654     Get installer type (fuel, apex, joid, compass)
655     """
656     try:
657         installer = os.environ['INSTALLER_TYPE']
658     except KeyError:
659         if logger:
660             logger.error("Impossible to retrieve the installer type")
661         installer = "Unkown"
662
663     return installer
664
665
666 def get_pod_name(logger=None):
667     """
668     Get PoD Name from env variable NODE_NAME
669     """
670     try:
671         return os.environ['NODE_NAME']
672     except KeyError:
673         if logger:
674             logger.error(
675                 "Unable to retrieve the POD name from environment.Using pod name 'unknown-pod'")
676         return "unknown-pod"
677
678
679 def push_results_to_db(db_url, case_name, logger, pod_name,
680                        git_version, payload):
681     url = db_url + "/results"
682     installer = get_installer_type(logger)
683     params = {"project_name": "functest", "case_name": case_name,
684               "pod_name": pod_name, "installer": installer,
685               "version": git_version, "details": payload}
686
687     headers = {'Content-Type': 'application/json'}
688     try:
689         r = requests.post(url, data=json.dumps(params), headers=headers)
690         if logger:
691             logger.debug(r)
692         return True
693     except:
694         print "Error:", sys.exc_info()[0]
695         return False
696
697
698 def get_resolvconf_ns():
699     nameservers = []
700     rconf = open("/etc/resolv.conf", "r")
701     line = rconf.readline()
702     while line:
703         ip = re.search(r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b", line)
704         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
705         if ip:
706             result = sock.connect_ex((ip.group(),53))
707             if result == 0:
708                 nameservers.append(ip.group())
709         line = rconf.readline()
710     return nameservers
711
712 def getTestEnv(test, functest_yaml):
713     # get the config of the testcase based on functest_config.yaml
714     # 2 options
715     # - test = test project e.g; ovno
716     # - test = testcase e.g. functest/odl
717     # look for the / to see if it is a test project or a testcase
718     try:
719         TEST_ENV = functest_yaml.get("test-dependencies")
720
721         if test.find("/") < 0:
722             config_test = TEST_ENV[test]
723         else:
724             test_split = test.split("/")
725             testproject = test_split[0]
726             testcase = test_split[1]
727             config_test = TEST_ENV[testproject][testcase]
728     except KeyError:
729         # if not defined in dependencies => no dependencies
730         config_test = ""
731     except:
732         print "Error getTestEnv:", sys.exc_info()[0]
733
734     return config_test
735
736
737 def get_ci_envvars():
738     """
739     Get the CI env variables
740     """
741     ci_env_var = {
742         "installer": os.environ.get('INSTALLER_TYPE'),
743         "scenario": os.environ.get('DEPLOY_SCENARIO')}
744     return ci_env_var
745
746
747 def isTestRunnable(test, functest_yaml):
748     # By default we assume that all the tests are always runnable...
749     is_runnable = True
750     # Retrieve CI environment
751     ci_env = get_ci_envvars()
752     # Retrieve test environement from config file
753     test_env = getTestEnv(test, functest_yaml)
754
755     # if test_env not empty => dependencies to be checked
756     if test_env is not None and len(test_env) > 0:
757         # possible criteria = ["installer", "scenario"]
758         # consider test criteria from config file
759         # compare towards CI env through CI en variable
760         for criteria in test_env:
761             if re.search(test_env[criteria], ci_env[criteria]) is None:
762                 # print "Test "+ test + " cannot be run on the environment"
763                 is_runnable = False
764     return is_runnable
765
766
767 def generateTestcaseList(functest_yaml):
768     test_list = ""
769     # get testcases
770     testcase_list = functest_yaml.get("test-dependencies")
771     projects = testcase_list.keys()
772
773     for project in projects:
774         testcases = testcase_list[project]
775         # 1 or 2 levels for testcases project[/case]
776         # if only project name without controller or scenario
777         # => shall be runnable on any controller/scenario
778         if testcases is None:
779             test_list += project + " "
780         else:
781             for testcase in testcases:
782                 if testcase == "installer" or testcase == "scenario":
783                     # project (1 level)
784                     if isTestRunnable(project, functest_yaml):
785                         test_list += project + " "
786                 else:
787                     # project/testcase (2 levels)
788                     thetest = project + "/" + testcase
789                     if isTestRunnable(thetest, functest_yaml):
790                         test_list += testcase + " "
791
792     # sort the list to execute the test in the right order
793     test_order_list = functest_yaml.get("test_exec_priority")
794     test_sorted_list = ""
795     for test in test_order_list:
796         if test_order_list[test] in test_list:
797             test_sorted_list += test_order_list[test] + " "
798
799     # create a file that could be consumed by run-test.sh
800     # this method is used only for CI
801     # so it can be run only in container
802     # reuse default conf directory to store the list of runnable tests
803     file = open("/home/opnfv/functest/conf/testcase-list.txt", 'w')
804     file.write(test_sorted_list)
805     file.close()
806
807     return test_sorted_list
808